You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/10 22:53:44 UTC
[2/2] activemq-artemis git commit: [ARTEMIS-2105] Discovery group
connectors can delay broker shutdown
[ARTEMIS-2105] Discovery group connectors can delay broker shutdown
Issue: https://issues.apache.org/jira/browse/ARTEMIS-2105
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2450f6a3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2450f6a3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2450f6a3
Branch: refs/heads/master
Commit: 2450f6a3769e83b204ec72354ed5fabb5053d3a1
Parents: d441e75
Author: Ingo Weiss <in...@redhat.com>
Authored: Tue Oct 2 15:43:52 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 10 18:53:38 2018 -0400
----------------------------------------------------------------------
.../artemis/ra/inflow/ActiveMQActivation.java | 139 ++++++++++---------
1 file changed, 72 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2450f6a3/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
index b0f0aff..57bf5c4 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java
@@ -123,6 +123,8 @@ public class ActiveMQActivation {
private boolean lastReceived = false;
+ private final Object teardownLock = new Object();
+
// Whether we are in the failure recovery loop
private final AtomicBoolean inReconnect = new AtomicBoolean(false);
private XARecoveryConfig resourceRecovery;
@@ -352,98 +354,102 @@ public class ActiveMQActivation {
/**
* Teardown the activation
*/
- protected synchronized void teardown(boolean useInterrupt) {
- logger.debug("Tearing down " + spec);
+ protected void teardown(boolean useInterrupt) {
- long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout();
+ synchronized (teardownLock) {
- if (resourceRecovery != null) {
- ra.getRecoveryManager().unRegister(resourceRecovery);
- }
+ logger.debug("Tearing down " + spec);
- final ActiveMQMessageHandler[] handlersCopy = new ActiveMQMessageHandler[handlers.size()];
+ long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout();
- // We need to do from last to first as any temporary queue will have been created on the first handler
- // So we invert the handlers here
- for (int i = 0; i < handlers.size(); i++) {
- // The index here is the complimentary so it's inverting the array
- handlersCopy[i] = handlers.get(handlers.size() - i - 1);
- }
+ if (resourceRecovery != null) {
+ ra.getRecoveryManager().unRegister(resourceRecovery);
+ }
- handlers.clear();
+ final ActiveMQMessageHandler[] handlersCopy = new ActiveMQMessageHandler[handlers.size()];
- FutureLatch future = new FutureLatch(handlersCopy.length);
- for (ActiveMQMessageHandler handler : handlersCopy) {
- handler.interruptConsumer(future);
- }
+ // We need to do from last to first as any temporary queue will have been created on the first handler
+ // So we invert the handlers here
+ for (int i = 0; i < handlers.size(); i++) {
+ // The index here is the complimentary so it's inverting the array
+ handlersCopy[i] = handlers.get(handlers.size() - i - 1);
+ }
+
+ handlers.clear();
- //wait for all the consumers to complete any onmessage calls
- boolean stuckThreads = !future.await(timeout);
- //if any are stuck then we need to interrupt them
- if (stuckThreads && useInterrupt) {
+ FutureLatch future = new FutureLatch(handlersCopy.length);
for (ActiveMQMessageHandler handler : handlersCopy) {
- Thread interruptThread = handler.getCurrentThread();
- if (interruptThread != null) {
- try {
- logger.tracef("Interrupting thread %s", interruptThread.getName());
- } catch (Throwable justLog) {
- logger.warn(justLog);
- }
- try {
- interruptThread.interrupt();
- } catch (Throwable e) {
- //ok
- }
- }
+ handler.interruptConsumer(future);
}
- }
- Runnable runTearDown = new Runnable() {
- @Override
- public void run() {
+ //wait for all the consumers to complete any onmessage calls
+ boolean stuckThreads = !future.await(timeout);
+ //if any are stuck then we need to interrupt them
+ if (stuckThreads && useInterrupt) {
for (ActiveMQMessageHandler handler : handlersCopy) {
- handler.teardown();
+ Thread interruptThread = handler.getCurrentThread();
+ if (interruptThread != null) {
+ try {
+ logger.tracef("Interrupting thread %s", interruptThread.getName());
+ } catch (Throwable justLog) {
+ logger.warn(justLog);
+ }
+ try {
+ interruptThread.interrupt();
+ } catch (Throwable e) {
+ //ok
+ }
+ }
}
}
- };
- Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown);
+ Runnable runTearDown = new Runnable() {
+ @Override
+ public void run() {
+ for (ActiveMQMessageHandler handler : handlersCopy) {
+ handler.teardown();
+ }
+ }
+ };
- try {
- threadTearDown.join(timeout);
- } catch (InterruptedException e) {
- // nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up
- }
+ Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown);
- if (factory != null) {
try {
- // closing the factory will help making sure pending threads are closed
- factory.close();
- } catch (Throwable e) {
- ActiveMQRALogger.LOGGER.unableToCloseFactory(e);
+ threadTearDown.join(timeout);
+ } catch (InterruptedException e) {
+ // nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up
}
- factory = null;
- }
-
- if (threadTearDown.isAlive()) {
- threadTearDown.interrupt();
+ if (factory != null) {
+ try {
+ // closing the factory will help making sure pending threads are closed
+ factory.close();
+ } catch (Throwable e) {
+ ActiveMQRALogger.LOGGER.unableToCloseFactory(e);
+ }
- try {
- threadTearDown.join(5000);
- } catch (InterruptedException e) {
- // nothing to be done here.. we are going down anyways
+ factory = null;
}
if (threadTearDown.isAlive()) {
- ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString());
+ threadTearDown.interrupt();
+
+ try {
+ threadTearDown.join(5000);
+ } catch (InterruptedException e) {
+ // nothing to be done here.. we are going down anyways
+ }
+
+ if (threadTearDown.isAlive()) {
+ ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString());
+ }
}
- }
- nodes.clear();
- lastReceived = false;
+ nodes.clear();
+ lastReceived = false;
- logger.debug("Tearing down complete " + this);
+ logger.debug("Tearing down complete " + this);
+ }
}
protected void setupCF() throws Exception {
@@ -467,7 +473,6 @@ public class ActiveMQActivation {
} else {
factory = ra.newConnectionFactory(spec);
}
-
}
/**