You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/12/01 19:17:42 UTC

[qpid-broker-j] branch master updated (59b1b8a -> 7193e26)

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.


    from 59b1b8a  NO-JIRA: Add extra debug to understand sproradic test failures in KerberosAuthenticationManagerTest
     new a597b90  QPID-8387: [Broker-J] Handle exceptions thrown on asynchromous message removal in JDBC-based message stores
     new 7193e26  QPID-8387: [Broker-J] Await for asyncroronous message removal tasks to complete on message store close

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../store/jdbc/AbstractJDBCMessageStore.java       | 96 +++++++++++++++++++---
 1 file changed, 83 insertions(+), 13 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 01/02: QPID-8387: [Broker-J] Handle exceptions thrown on asynchromous message removal in JDBC-based message stores

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit a597b9051f67287ab64cd7c1a966bfeab239088d
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Sun Dec 1 18:00:00 2019 +0000

    QPID-8387: [Broker-J] Handle exceptions thrown on asynchromous message removal in JDBC-based message stores
---
 .../store/jdbc/AbstractJDBCMessageStore.java       | 75 ++++++++++++++++++----
 1 file changed, 62 insertions(+), 13 deletions(-)

diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index df02e55..e1181b2 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -38,6 +38,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -108,6 +109,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     private final Set<StoredJDBCMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Set<MessageDeleteListener> _messageDeleteListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Set<Action<Connection>> _deleteActions = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private final Thread.UncaughtExceptionHandler _uncaughtExceptionHandler;
 
     protected abstract boolean isMessageStoreOpen();
 
@@ -117,6 +119,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
     public AbstractJDBCMessageStore()
     {
+        _uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
     }
 
     protected void setMaximumMessageId()
@@ -453,21 +456,67 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     {
         if(_messageRemovalScheduled.compareAndSet(false, true))
         {
-            _executor.submit(() -> {
-                List<Long> messageIds;
-                do
-                {
-                    messageIds = _messagesToDelete.getAndSet(EMPTY_LIST);
-                    removeMessages(messageIds);
-                } while(!messageIds.isEmpty());
-
+            try
+            {
+                _executor.submit(this::removeScheduledMessages);
+            }
+            catch (RejectedExecutionException e)
+            {
                 _messageRemovalScheduled.set(false);
-                if(!_messagesToDelete.get().isEmpty())
-                {
-                    scheduleMessageRemoval();
-                }
+                throw new IllegalStateException("Cannot schedule removal of messages", e);
+            }
+        }
+    }
 
-            });
+    private void removeScheduledMessages()
+    {
+        try
+        {
+            removeScheduledMessagesAndRescheduleIfRequired();
+        }
+        catch (RuntimeException e)
+        {
+            handleExceptionOnScheduledMessageRemoval(e);
+        }
+    }
+
+    private void removeScheduledMessagesAndRescheduleIfRequired()
+    {
+        List<Long> messageIds;
+        try
+        {
+            do
+            {
+                messageIds = _messagesToDelete.getAndSet(EMPTY_LIST);
+                removeMessages(messageIds);
+            } while (!messageIds.isEmpty());
+        }
+        finally
+        {
+            _messageRemovalScheduled.set(false);
+        }
+        if (!_messagesToDelete.get().isEmpty() && isMessageStoreOpen())
+        {
+            scheduleMessageRemoval();
+        }
+    }
+
+    private void handleExceptionOnScheduledMessageRemoval(final RuntimeException e)
+    {
+        if (isMessageStoreOpen())
+        {
+            if (_uncaughtExceptionHandler == null)
+            {
+                getLogger().error("Unexpected exception on asynchronous message removal", e);
+            }
+            else
+            {
+                _uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
+            }
+        }
+        else
+        {
+            getLogger().warn("Ignoring unexpected exception on asynchronous message removal as store is not open", e);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 02/02: QPID-8387: [Broker-J] Await for asyncroronous message removal tasks to complete on message store close

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 7193e26f6fad83f5ce81fd9ef855edb0fd5594ea
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Sun Dec 1 18:41:22 2019 +0000

    QPID-8387: [Broker-J] Await for asyncroronous message removal tasks to complete on message store close
---
 .../server/store/jdbc/AbstractJDBCMessageStore.java | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index e1181b2..21c7d87 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -41,6 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -91,6 +92,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
     private static final int EXECUTOR_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors();
     private static final String EXECUTOR_THREADS = "qpid.jdbcstore.executorThreads";
+    private static final String EXECUTOR_SHUTDOWN_TIMEOUT = "qpid.jdbcstore.executorShutdownTimeoutInSeconds";
+    private static final int EXECUTOR_SHUTDOWN_TIMEOUT_DEFAULT = 5;
 
     private static final int DB_VERSION = 8;
 
@@ -116,6 +119,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     protected abstract void checkMessageStoreOpen();
     private ScheduledThreadPoolExecutor _executor;
     private volatile int _inClauseMaxSize;
+    private volatile int _executorShutdownTimeOut;
 
     public AbstractJDBCMessageStore()
     {
@@ -230,6 +234,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
         int corePoolSize = getContextValue(Integer.class, EXECUTOR_THREADS, EXECUTOR_THREADS_DEFAULT);
 
+        _executorShutdownTimeOut = getContextValue(Integer.class, EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIMEOUT_DEFAULT);
+
         _executor = new ScheduledThreadPoolExecutor(corePoolSize, new ThreadFactory()
         {
             private final AtomicInteger _count = new AtomicInteger();
@@ -259,6 +265,21 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         if(_executor != null)
         {
             _executor.shutdown();
+            if (_executorShutdownTimeOut > 0)
+            {
+                try
+                {
+                    if (!_executor.awaitTermination(_executorShutdownTimeOut, TimeUnit.SECONDS))
+                    {
+                        _executor.shutdownNow();
+                    }
+                }
+                catch (InterruptedException e)
+                {
+                    getLogger().warn("Interrupted during store executor shutdown:", e);
+                    Thread.currentThread().interrupt();
+                }
+            }
         }
 
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org