You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/10 22:53:44 UTC

[2/2] activemq-artemis git commit: [ARTEMIS-2105] Discovery group connectors can delay broker shutdown

[ARTEMIS-2105] Discovery group connectors can delay broker shutdown

Issue: https://issues.apache.org/jira/browse/ARTEMIS-2105


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2450f6a3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2450f6a3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2450f6a3

Branch: refs/heads/master
Commit: 2450f6a3769e83b204ec72354ed5fabb5053d3a1
Parents: d441e75
Author: Ingo Weiss <in...@redhat.com>
Authored: Tue Oct 2 15:43:52 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 10 18:53:38 2018 -0400

----------------------------------------------------------------------
 .../artemis/ra/inflow/ActiveMQActivation.java   | 139 ++++++++++---------
 1 file changed, 72 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2450f6a3/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
index b0f0aff..57bf5c4 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
@@ -123,6 +123,8 @@ public class ActiveMQActivation {
 
    private boolean lastReceived = false;
 
+   private final Object teardownLock = new Object();
+
    // Whether we are in the failure recovery loop
    private final AtomicBoolean inReconnect = new AtomicBoolean(false);
    private XARecoveryConfig resourceRecovery;
@@ -352,98 +354,102 @@ public class ActiveMQActivation {
    /**
     * Teardown the activation
     */
-   protected synchronized void teardown(boolean useInterrupt) {
-      logger.debug("Tearing down " + spec);
+   protected void teardown(boolean useInterrupt) {
 
-      long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout();
+      synchronized (teardownLock) {
 
-      if (resourceRecovery != null) {
-         ra.getRecoveryManager().unRegister(resourceRecovery);
-      }
+         logger.debug("Tearing down " + spec);
 
-      final ActiveMQMessageHandler[] handlersCopy = new ActiveMQMessageHandler[handlers.size()];
+         long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout();
 
-      // We need to do from last to first as any temporary queue will have been created on the first handler
-      // So we invert the handlers here
-      for (int i = 0; i < handlers.size(); i++) {
-         // The index here is the complimentary so it's inverting the array
-         handlersCopy[i] = handlers.get(handlers.size() - i - 1);
-      }
+         if (resourceRecovery != null) {
+            ra.getRecoveryManager().unRegister(resourceRecovery);
+         }
 
-      handlers.clear();
+         final ActiveMQMessageHandler[] handlersCopy = new ActiveMQMessageHandler[handlers.size()];
 
-      FutureLatch future = new FutureLatch(handlersCopy.length);
-      for (ActiveMQMessageHandler handler : handlersCopy) {
-         handler.interruptConsumer(future);
-      }
+         // We need to do from last to first as any temporary queue will have been created on the first handler
+         // So we invert the handlers here
+         for (int i = 0; i < handlers.size(); i++) {
+            // The index here is the complimentary so it's inverting the array
+            handlersCopy[i] = handlers.get(handlers.size() - i - 1);
+         }
+
+         handlers.clear();
 
-      //wait for all the consumers to complete any onmessage calls
-      boolean stuckThreads = !future.await(timeout);
-      //if any are stuck then we need to interrupt them
-      if (stuckThreads && useInterrupt) {
+         FutureLatch future = new FutureLatch(handlersCopy.length);
          for (ActiveMQMessageHandler handler : handlersCopy) {
-            Thread interruptThread = handler.getCurrentThread();
-            if (interruptThread != null) {
-               try {
-                  logger.tracef("Interrupting thread %s", interruptThread.getName());
-               } catch (Throwable justLog) {
-                  logger.warn(justLog);
-               }
-               try {
-                  interruptThread.interrupt();
-               } catch (Throwable e) {
-                  //ok
-               }
-            }
+            handler.interruptConsumer(future);
          }
-      }
 
-      Runnable runTearDown = new Runnable() {
-         @Override
-         public void run() {
+         //wait for all the consumers to complete any onmessage calls
+         boolean stuckThreads = !future.await(timeout);
+         //if any are stuck then we need to interrupt them
+         if (stuckThreads && useInterrupt) {
             for (ActiveMQMessageHandler handler : handlersCopy) {
-               handler.teardown();
+               Thread interruptThread = handler.getCurrentThread();
+               if (interruptThread != null) {
+                  try {
+                     logger.tracef("Interrupting thread %s", interruptThread.getName());
+                  } catch (Throwable justLog) {
+                     logger.warn(justLog);
+                  }
+                  try {
+                     interruptThread.interrupt();
+                  } catch (Throwable e) {
+                     //ok
+                  }
+               }
             }
          }
-      };
 
-      Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown);
+         Runnable runTearDown = new Runnable() {
+            @Override
+            public void run() {
+               for (ActiveMQMessageHandler handler : handlersCopy) {
+                  handler.teardown();
+               }
+            }
+         };
 
-      try {
-         threadTearDown.join(timeout);
-      } catch (InterruptedException e) {
-         // nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up
-      }
+         Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown);
 
-      if (factory != null) {
          try {
-            // closing the factory will help making sure pending threads are closed
-            factory.close();
-         } catch (Throwable e) {
-            ActiveMQRALogger.LOGGER.unableToCloseFactory(e);
+            threadTearDown.join(timeout);
+         } catch (InterruptedException e) {
+            // nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up
          }
 
-         factory = null;
-      }
-
-      if (threadTearDown.isAlive()) {
-         threadTearDown.interrupt();
+         if (factory != null) {
+            try {
+               // closing the factory will help making sure pending threads are closed
+               factory.close();
+            } catch (Throwable e) {
+               ActiveMQRALogger.LOGGER.unableToCloseFactory(e);
+            }
 
-         try {
-            threadTearDown.join(5000);
-         } catch (InterruptedException e) {
-            // nothing to be done here.. we are going down anyways
+            factory = null;
          }
 
          if (threadTearDown.isAlive()) {
-            ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString());
+            threadTearDown.interrupt();
+
+            try {
+               threadTearDown.join(5000);
+            } catch (InterruptedException e) {
+               // nothing to be done here.. we are going down anyways
+            }
+
+            if (threadTearDown.isAlive()) {
+               ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString());
+            }
          }
-      }
 
-      nodes.clear();
-      lastReceived = false;
+         nodes.clear();
+         lastReceived = false;
 
-      logger.debug("Tearing down complete " + this);
+         logger.debug("Tearing down complete " + this);
+      }
    }
 
    protected void setupCF() throws Exception {
@@ -467,7 +473,6 @@ public class ActiveMQActivation {
       } else {
          factory = ra.newConnectionFactory(spec);
       }
-
    }
 
    /**