You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/10/24 11:52:20 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6476
Repository: activemq
Updated Branches:
refs/heads/master 1a811b72d -> b9cb02ae5
https://issues.apache.org/jira/browse/AMQ-6476
Moving BrokerSubscriptionInfo processing into a new thread to prevent a
deadlock of the network bridge on startup
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b9cb02ae
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b9cb02ae
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b9cb02ae
Branch: refs/heads/master
Commit: b9cb02ae54a27e6c2f3ff934a1f01b88b7af2f11
Parents: 1a811b7
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon Oct 24 07:51:34 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon Oct 24 07:52:09 2016 -0400
----------------------------------------------------------------------
.../network/DemandForwardingBridgeSupport.java | 76 +++++++++++++-------
1 file changed, 52 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/b9cb02ae/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index a8c45b0..a849f62 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -163,6 +163,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected BrokerService brokerService = null;
private ObjectName mbeanObjectName;
private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
+ //Use a new executor for processing BrokerSubscriptionInfo so we don't block other threads
+ private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor();
private Transport duplexInboundLocalBroker = null;
private ProducerInfo duplexInboundLocalProducerInfo;
@@ -295,6 +297,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
List<Runnable> pendingTasks = serialExecutor.shutdownNow();
LOG.info("pending tasks on stop {}", pendingTasks);
}
+ //Shutdown the syncExecutor, call countDown to make sure a thread can
+ //terminate if it is waiting
+ staticDestinationsLatch.countDown();
+ syncExecutor.shutdown();
+ if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ List<Runnable> pendingTasks = syncExecutor.shutdownNow();
+ LOG.info("pending tasks on stop {}", pendingTasks);
+ }
localBroker.oneway(new ShutdownInfo());
remoteBroker.oneway(new ShutdownInfo());
} catch (Throwable e) {
@@ -648,34 +658,52 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else if (command.isBrokerInfo()) {
futureRemoteBrokerInfo.set((BrokerInfo) command);
} else if (command instanceof BrokerSubscriptionInfo) {
- staticDestinationsLatch.await();
- BrokerSubscriptionInfo subInfo = (BrokerSubscriptionInfo) command;
- LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
- this.brokerService.getBrokerName(), subInfo.getBrokerName());
-
- if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
- && !configuration.isDynamicOnly()) {
- if (started.get()) {
- if (subInfo.getSubscriptionInfos() != null) {
- for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
- //re-add any process any non-NC consumers that match the
- //dynamicallyIncludedDestinations list
- if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
- NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
- serviceRemoteConsumerAdvisory(info);
- }
- }
- }
+ final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command;
+
+ //Start in a new thread so we don't block the transport waiting for staticDestinations
+ syncExecutor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ staticDestinationsLatch.await();
+ //Make sure after the countDown of staticDestinationsLatch we aren't stopping
+ if (!disposed.get()) {
+ BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo;
+ LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
+ brokerService.getBrokerName(), subInfo.getBrokerName());
+
+ if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
+ && !configuration.isDynamicOnly()) {
+ if (started.get()) {
+ if (subInfo.getSubscriptionInfos() != null) {
+ for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
+ //re-add any process any non-NC consumers that match the
+ //dynamicallyIncludedDestinations list
+ if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
+ NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
+ serviceRemoteConsumerAdvisory(info);
+ }
+ }
+ }
- //After re-added, clean up any empty durables
- for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
- DemandSubscription ds = i.next();
- if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
- cleanupDurableSub(ds, i);
+ //After re-added, clean up any empty durables
+ for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
+ DemandSubscription ds = i.next();
+ if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
+ cleanupDurableSub(ds, i);
+ }
+ }
+ }
+ }
}
+ } catch (Exception e) {
+ LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e);
+ LOG.debug(e.getMessage(), e);
}
}
- }
+ });
+
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceRemoteException(ce.getException());