You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/08/14 04:02:56 UTC

activemq-artemis git commit: ARTEMIS-2030 only use interrupt during shutdown on RA

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x b71827aa2 -> 4104986d0


ARTEMIS-2030 only use interrupt during shutdown on RA

(cherry picked from commit 99d091a0eaa07f042ce5d689bff9a3bd5880f1b3)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4104986d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4104986d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4104986d

Branch: refs/heads/2.6.x
Commit: 4104986d04d50cd402f7986e785ce0d44b68a6fe
Parents: b71827a
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Aug 13 20:30:54 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Aug 14 00:02:50 2018 -0400

----------------------------------------------------------------------
 .../core/client/impl/ClientConsumerImpl.java    |   9 ++
 .../client/impl/ClientConsumerInternal.java     |   2 +
 .../artemis/ra/inflow/ActiveMQActivation.java   | 113 +++++++++++++------
 .../ra/inflow/ActiveMQMessageHandler.java       |  16 ++-
 .../client/impl/LargeMessageBufferTest.java     |   5 +
 5 files changed, 105 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4104986d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 19987ff..5e3d95e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -409,6 +409,15 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
       return handler;
    }
 
+   @Override
+   public Thread getCurrentThread() {
+      if (onMessageThread != null) {
+         return onMessageThread;
+      }
+      return receiverThread;
+   }
+
+
    // Must be synchronized since messages may be arriving while handler is being set and might otherwise end
    // up not queueing enough executors - so messages get stranded
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4104986d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
index 177732e..986f397 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
@@ -41,6 +41,8 @@ public interface ClientConsumerInternal extends ClientConsumer {
 
    void clear(boolean waitForOnMessage) throws ActiveMQException;
 
+   Thread getCurrentThread();
+
    /**
     * To be called by things like MDBs during shutdown of the server
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4104986d/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
index 204d5d0..9d4b096 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
@@ -26,9 +26,12 @@ import javax.naming.InitialContext;
 import javax.resource.ResourceException;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
 import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkException;
 import javax.resource.spi.work.WorkManager;
 import javax.transaction.xa.XAResource;
 import java.lang.reflect.Method;
+import java.security.AccessController;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -56,6 +59,7 @@ import org.apache.activemq.artemis.ra.ActiveMQRALogger;
 import org.apache.activemq.artemis.ra.ActiveMQRaUtils;
 import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
 import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.FutureLatch;
 import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
 import org.jboss.logging.Logger;
@@ -242,7 +246,7 @@ public class ActiveMQActivation {
          logger.trace("start()");
       }
       deliveryActive.set(true);
-      ra.getWorkManager().scheduleWork(new SetupActivation());
+      scheduleWork(new SetupActivation());
    }
 
    /**
@@ -282,7 +286,7 @@ public class ActiveMQActivation {
       }
 
       deliveryActive.set(false);
-      teardown();
+      teardown(true);
    }
 
    /**
@@ -348,7 +352,7 @@ public class ActiveMQActivation {
    /**
     * Teardown the activation
     */
-   protected synchronized void teardown() {
+   protected synchronized void teardown(boolean useInterrupt) {
       logger.debug("Tearing down " + spec);
 
       long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout();
@@ -369,28 +373,27 @@ public class ActiveMQActivation {
       handlers.clear();
 
       FutureLatch future = new FutureLatch(handlersCopy.length);
-      List<Thread> interruptThreads = new ArrayList<>();
       for (ActiveMQMessageHandler handler : handlersCopy) {
-         Thread thread = handler.interruptConsumer(future);
-         if (thread != null) {
-            interruptThreads.add(thread);
-         }
+         handler.interruptConsumer(future);
       }
 
       //wait for all the consumers to complete any onmessage calls
       boolean stuckThreads = !future.await(timeout);
       //if any are stuck then we need to interrupt them
-      if (stuckThreads) {
-         for (Thread interruptThread : interruptThreads) {
-            try {
-               interruptThread.interrupt();
-            } catch (Exception e) {
-               //ok
+      if (stuckThreads && useInterrupt) {
+         for (ActiveMQMessageHandler handler : handlersCopy) {
+            Thread interruptThread = handler.getCurrentThread();
+            if (interruptThread != null) {
+               try {
+                  interruptThread.interrupt();
+               } catch (Throwable e) {
+                  //ok
+               }
             }
          }
       }
 
-      Thread threadTearDown = new Thread("TearDown/ActiveMQActivation") {
+      Runnable runTearDown = new Runnable() {
          @Override
          public void run() {
             for (ActiveMQMessageHandler handler : handlersCopy) {
@@ -399,10 +402,7 @@ public class ActiveMQActivation {
          }
       };
 
-      // We will first start a new thread that will call tearDown on all the instances, trying to graciously shutdown everything.
-      // We will then use the call-timeout to determine a timeout.
-      // if that failed we will then close the connection factory, and interrupt the thread
-      threadTearDown.start();
+      Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown);
 
       try {
          threadTearDown.join(timeout);
@@ -550,9 +550,7 @@ public class ActiveMQActivation {
                   calculatedDestinationName = spec.getQueuePrefix() + calculatedDestinationName;
                }
 
-               logger.debug("Unable to retrieve " + destinationName +
-                                                " from JNDI. Creating a new " + destinationType.getName() +
-                                                " named " + calculatedDestinationName + " to be used by the MDB.");
+               logger.debug("Unable to retrieve " + destinationName + " from JNDI. Creating a new " + destinationType.getName() + " named " + calculatedDestinationName + " to be used by the MDB.");
 
                // If there is no binding on naming, we will just create a new instance
                if (isTopic) {
@@ -602,18 +600,41 @@ public class ActiveMQActivation {
       return buffer.toString();
    }
 
-   public void startReconnectThread(final String threadName) {
+   public void startReconnectThread(final String cause) {
       if (logger.isTraceEnabled()) {
-         logger.trace("Starting reconnect Thread " + threadName + " on MDB activation " + this);
+         logger.trace("Starting reconnect Thread " + cause + " on MDB activation " + this);
       }
-      Runnable runnable = new Runnable() {
-         @Override
-         public void run() {
-            reconnect(null);
-         }
-      };
-      Thread t = new Thread(runnable, threadName);
+      try {
+         // We have to use the worker otherwise we may get the wrong classLoader
+         scheduleWork(new ReconnectWork(cause));
+      } catch (Exception e) {
+         logger.warn("Could not reconnect because worker is down", e);
+      }
+   }
+
+   private static Thread startThread(String name, Runnable run) {
+      ClassLoader tccl;
+
+      try {
+         tccl = AccessController.doPrivileged(new PrivilegedExceptionAction<ClassLoader>() {
+            @Override
+            public ClassLoader run() {
+               return ActiveMQActivation.class.getClassLoader();
+            }
+         });
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+         tccl = null;
+      }
+
+      ActiveMQThreadFactory factory = new ActiveMQThreadFactory(name, true, tccl);
+      Thread t = factory.newThread(run);
       t.start();
+      return t;
+   }
+
+   private void scheduleWork(Work run) throws WorkException {
+      ra.getWorkManager().scheduleWork(run);
    }
 
    /**
@@ -621,7 +642,7 @@ public class ActiveMQActivation {
     *
     * @param failure if reconnecting in the event of a failure
     */
-   public void reconnect(Throwable failure) {
+   public void reconnect(Throwable failure, boolean useInterrupt) {
       if (logger.isTraceEnabled()) {
          logger.trace("reconnecting activation " + this);
       }
@@ -644,7 +665,7 @@ public class ActiveMQActivation {
       try {
          Throwable lastException = failure;
          while (deliveryActive.get() && (setupAttempts == -1 || reconnectCount < setupAttempts)) {
-            teardown();
+            teardown(useInterrupt);
 
             try {
                Thread.sleep(setupInterval);
@@ -697,7 +718,7 @@ public class ActiveMQActivation {
          try {
             setup();
          } catch (Throwable t) {
-            reconnect(t);
+            reconnect(t, false);
          }
       }
 
@@ -706,6 +727,30 @@ public class ActiveMQActivation {
       }
    }
 
+   /**
+    * Handles reconnecting
+    */
+   private class ReconnectWork implements Work {
+
+      final String cause;
+
+      ReconnectWork(String cause) {
+         this.cause = cause;
+      }
+
+      @Override
+      public void release() {
+
+      }
+
+      @Override
+      public void run() {
+         logger.tracef("Starting reconnect for %s", cause);
+         reconnect(null, false);
+      }
+
+   }
+
    private class RebalancingListener implements ClusterTopologyListener {
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4104986d/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
index f343ec9..0183896 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
@@ -124,17 +124,13 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
                if (!spec.isShareSubscriptions()) {
                   throw ActiveMQRALogger.LOGGER.canNotCreatedNonSharedSubscriber();
                } else if (ActiveMQRALogger.LOGGER.isDebugEnabled()) {
-                  logger.debug("the mdb on destination " + queueName + " already had " +
-                                                   subResponse.getConsumerCount() +
-                                                   " consumers but the MDB is configured to share subscriptions, so no exceptions are thrown");
+                  logger.debug("the mdb on destination " + queueName + " already had " + subResponse.getConsumerCount() + " consumers but the MDB is configured to share subscriptions, so no exceptions are thrown");
                }
             }
 
             SimpleString oldFilterString = subResponse.getFilterString();
 
-            boolean selectorChanged = selector == null && oldFilterString != null ||
-               oldFilterString == null && selector != null ||
-               (oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector));
+            boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || (oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector));
 
             SimpleString oldTopicName = subResponse.getAddress();
 
@@ -198,6 +194,14 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
       return useXA ? session : null;
    }
 
+   public Thread getCurrentThread() {
+      if (consumer == null) {
+         return null;
+      }
+
+      return consumer.getCurrentThread();
+   }
+
    public Thread interruptConsumer(FutureLatch future) {
       try {
          if (consumer != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4104986d/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
index d78f070..c1790d0 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
@@ -676,6 +676,11 @@ public class LargeMessageBufferTest extends ActiveMQTestBase {
       }
 
       @Override
+      public Thread getCurrentThread() {
+         return null;
+      }
+
+      @Override
       public ClientMessage receive(final long timeout) throws ActiveMQException {
          return null;
       }