You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/11/29 23:05:46 UTC

svn commit: r1415406 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java

Author: gtully
Date: Thu Nov 29 22:05:45 2012
New Revision: 1415406

URL: http://svn.apache.org/viewvc?rev=1415406&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4196 - fix order of removedest and removesubscription commands in network bridge async advisory processing - https://issues.apache.org/jira/browse/AMQ-3038 revisit - temp dest should not be deleted client side before close b/c open consumers will not get advisories due to no destination - just clearing the map is sufficient; the broker will deal with removal of the temp dest as part of the removeInfo

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1415406&r1=1415405&r2=1415406&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Thu Nov 29 22:05:45 2012
@@ -25,6 +25,9 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -137,6 +140,7 @@ public abstract class DemandForwardingBr
     private TransportConnection duplexInitiatingConnection;
     private BrokerService brokerService = null;
     private ObjectName mbeanObjectName;
+    private ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
 
     public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
         this.configuration = configuration;
@@ -355,6 +359,11 @@ public abstract class DemandForwardingBr
                     brokerService.getTaskRunnerFactory().execute(new Runnable() {
                         public void run() {
                             try {
+                                serialExecutor.shutdown();
+                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
+                                    LOG.info("pending tasks on stop" + pendingTasks);
+                                }
                                 localBroker.oneway(new ShutdownInfo());
                                 remoteBroker.oneway(new ShutdownInfo());
                             } catch (Throwable e) {
@@ -594,7 +603,7 @@ public abstract class DemandForwardingBr
             }
         } else if (data.getClass() == DestinationInfo.class) {
             // It's a destination info - we want to pass up information about temporary destinations
-            DestinationInfo destInfo = (DestinationInfo) data;
+            final DestinationInfo destInfo = (DestinationInfo) data;
             BrokerId[] path = destInfo.getBrokerPath();
             if (path != null && path.length >= networkTTL) {
                 if (LOG.isDebugEnabled()) {
@@ -619,7 +628,20 @@ public abstract class DemandForwardingBr
             if (LOG.isTraceEnabled()) {
                 LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo);
             }
-            localBroker.oneway(destInfo);
+            if (destInfo.isRemoveOperation()) {
+                // serialise with removeSub operations such that all removeSub advisories are generated
+                serialExecutor.execute(new Runnable() {
+                    public void run() {
+                        try {
+                            localBroker.oneway(destInfo);
+                        } catch (IOException e) {
+                            LOG.warn("failed to deliver remove command for destination:" + destInfo.getDestination(), e);
+                        }
+                    }
+                });
+            } else {
+                localBroker.oneway(destInfo);
+            }
         } else if (data.getClass() == RemoveInfo.class) {
             ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
             removeDemandSubscription(id);
@@ -658,7 +680,9 @@ public abstract class DemandForwardingBr
             subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
 
             // continue removal in separate thread to free up this thread for outstanding responses
-            brokerService.getTaskRunnerFactory().execute(new Runnable() {
+            // serialise with removeDestination operations so that removeSubs are serialised with removeDestinations
+            // such that all removeSub advisories are generated
+            serialExecutor.execute(new Runnable() {
                 public void run() {
                     sub.waitForCompletion();
                     try {

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1415406&r1=1415405&r2=1415406&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Nov 29 22:05:45 2012
@@ -662,10 +662,7 @@ public class ActiveMQConnection implemen
                         c.dispose();
                     }
 
-                    // As TemporaryQueue and TemporaryTopic instances are bound
-                    // to a connection we should just delete them after the connection
-                    // is closed to free up memory
-                    cleanUpTempDestinations();
+                    this.activeTempDestinations.clear();
 
                     if (isConnectionInfoSentToBroker) {
                         // If we announced ourselves to the broker.. Try to let the broker
@@ -2527,6 +2524,7 @@ public class ActiveMQConnection implemen
      * Removes any TempDestinations that this connection has cached, ignoring
      * any exceptions generated because the destination is in use as they should
      * not be removed.
+     * Used from a pooled connection, b/c it will not be explicitly closed.
      */
     public void cleanUpTempDestinations() {
 
@@ -2612,7 +2610,7 @@ public class ActiveMQConnection implemen
      * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
      * have been configured with optimizeAcknowledge enabled.
      *
-     * @param scheduledOptimizedAckInterval the scheduledOptimizedAckInterval to set
+     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
      */
     public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
         this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;