You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/10/24 11:52:20 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6476

Repository: activemq
Updated Branches:
  refs/heads/master 1a811b72d -> b9cb02ae5


https://issues.apache.org/jira/browse/AMQ-6476

Moving BrokerSubscriptionInfo processing into a new thread to prevent a
deadlock of the network bridge on startup


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b9cb02ae
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b9cb02ae
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b9cb02ae

Branch: refs/heads/master
Commit: b9cb02ae54a27e6c2f3ff934a1f01b88b7af2f11
Parents: 1a811b7
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon Oct 24 07:51:34 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon Oct 24 07:52:09 2016 -0400

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  | 76 +++++++++++++-------
 1 file changed, 52 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b9cb02ae/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index a8c45b0..a849f62 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -163,6 +163,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     protected BrokerService brokerService = null;
     private ObjectName mbeanObjectName;
     private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
+    //Use a new executor for processing BrokerSubscriptionInfo so we don't block other threads
+    private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor();
     private Transport duplexInboundLocalBroker = null;
     private ProducerInfo duplexInboundLocalProducerInfo;
 
@@ -295,6 +297,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                                     List<Runnable> pendingTasks = serialExecutor.shutdownNow();
                                     LOG.info("pending tasks on stop {}", pendingTasks);
                                 }
+                                //Shutdown the syncExecutor, call countDown to make sure a thread can
+                                //terminate if it is waiting
+                                staticDestinationsLatch.countDown();
+                                syncExecutor.shutdown();
+                                if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+                                    List<Runnable> pendingTasks = syncExecutor.shutdownNow();
+                                    LOG.info("pending tasks on stop {}", pendingTasks);
+                                }
                                 localBroker.oneway(new ShutdownInfo());
                                 remoteBroker.oneway(new ShutdownInfo());
                             } catch (Throwable e) {
@@ -648,34 +658,52 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 } else if (command.isBrokerInfo()) {
                     futureRemoteBrokerInfo.set((BrokerInfo) command);
                 } else if (command instanceof BrokerSubscriptionInfo) {
-                    staticDestinationsLatch.await();
-                    BrokerSubscriptionInfo subInfo = (BrokerSubscriptionInfo) command;
-                    LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
-                            this.brokerService.getBrokerName(), subInfo.getBrokerName());
-
-                    if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
-                            && !configuration.isDynamicOnly()) {
-                        if (started.get()) {
-                            if (subInfo.getSubscriptionInfos() != null) {
-                                for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
-                                    //re-add any process any non-NC consumers that match the
-                                    //dynamicallyIncludedDestinations list
-                                    if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
-                                            NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
-                                        serviceRemoteConsumerAdvisory(info);
-                                    }
-                                }
-                            }
+                    final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command;
+
+                    //Start in a new thread so we don't block the transport waiting for staticDestinations
+                    syncExecutor.execute(new Runnable() {
+
+                        @Override
+                        public void run() {
+                            try {
+                                staticDestinationsLatch.await();
+                                //Make sure after the countDown of staticDestinationsLatch we aren't stopping
+                                if (!disposed.get()) {
+                                    BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo;
+                                    LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
+                                            brokerService.getBrokerName(), subInfo.getBrokerName());
+
+                                    if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
+                                            && !configuration.isDynamicOnly()) {
+                                        if (started.get()) {
+                                            if (subInfo.getSubscriptionInfos() != null) {
+                                                for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
+                                                    //re-add any process any non-NC consumers that match the
+                                                    //dynamicallyIncludedDestinations list
+                                                    if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
+                                                            NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
+                                                        serviceRemoteConsumerAdvisory(info);
+                                                    }
+                                                }
+                                            }
 
-                            //After re-added, clean up any empty durables
-                            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
-                                DemandSubscription ds = i.next();
-                                if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
-                                    cleanupDurableSub(ds, i);
+                                            //After re-added, clean up any empty durables
+                                            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
+                                                DemandSubscription ds = i.next();
+                                                if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
+                                                    cleanupDurableSub(ds, i);
+                                                }
+                                            }
+                                        }
+                                    }
                                 }
+                            } catch (Exception e) {
+                                LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e);
+                                LOG.debug(e.getMessage(), e);
                             }
                         }
-                    }
+                    });
+
                 } else if (command.getClass() == ConnectionError.class) {
                     ConnectionError ce = (ConnectionError) command;
                     serviceRemoteException(ce.getException());