You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2013/05/17 16:27:48 UTC

svn commit: r1483827 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/network/ activemq-client/src/main/java/org/apache/activemq/ activemq-client/src/main/java/org/apache/activemq/command/ activemq-unit-tests/src/test/java/org/ap...

Author: dejanb
Date: Fri May 17 14:27:48 2013
New Revision: 1483827

URL: http://svn.apache.org/r1483827
Log:
https://issues.apache.org/jira/browse/AMQ-4000 - initial implementation of keeping track of durable subscribers over network and unregister them appropriately

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java Fri May 17 14:27:48 2013
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.transport.Transport;
 import org.slf4j.Logger;
@@ -84,7 +85,11 @@ public class ConduitBridge extends Deman
                 }
                 // add the interest in the subscription
                 if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
-                    ds.add(info.getConsumerId());
+                    if (!info.isDurable()) {
+                        ds.add(info.getConsumerId());
+                    } else {
+                       ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
+                    }
                 }
                 matched = true;
                 // continue - we want interest to any existing DemandSubscriptions

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri May 17 14:27:48 2013
@@ -19,10 +19,7 @@ package org.apache.activemq.network;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.security.cert.X509Certificate;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -50,32 +47,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTempDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.NetworkBridgeFilter;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.*;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.security.SecurityContext;
@@ -147,7 +119,7 @@ public abstract class DemandForwardingBr
 
     private final AtomicBoolean started = new AtomicBoolean();
     private TransportConnection duplexInitiatingConnection;
-    private BrokerService brokerService = null;
+    protected BrokerService brokerService = null;
     private ObjectName mbeanObjectName;
     private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
     private Transport duplexInboundLocalBroker = null;
@@ -818,6 +790,28 @@ public abstract class DemandForwardingBr
         } else if (data.getClass() == RemoveInfo.class) {
             ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
             removeDemandSubscription(id);
+        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
+            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
+            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
+            for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
+                DemandSubscription ds = (DemandSubscription) i.next();
+                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
+                if (removed) {
+                    if (ds.getDurableRemoteSubs().isEmpty()) {
+
+                        // deactivate subscriber
+                        RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
+                        localBroker.oneway(removeInfo);
+
+                        // remove subscriber
+                        RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
+                        sending.setClientId(localClientId);
+                        sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
+                        sending.setConnectionId(this.localConnectionInfo.getConnectionId());
+                        localBroker.oneway(sending);
+                    }
+                }
+            }
         }
     }
 
@@ -1180,6 +1174,9 @@ public abstract class DemandForwardingBr
             if (duplicateSuppressionIsRequired(sub)) {
                 undoMapRegistration(sub);
             } else {
+                if (consumerInfo.isDurable()) {
+                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
+                }
                 addSubscription(sub);
                 consumerAdded = true;
             }
@@ -1274,7 +1271,7 @@ public abstract class DemandForwardingBr
         return found;
     }
 
-    private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
+    protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
         RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
         Region region;
         Collection<Subscription> subs;
@@ -1370,8 +1367,7 @@ public abstract class DemandForwardingBr
             // may need to change if we ever subscribe to a remote broker.
             sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
         } else {
-            // need to ack this message if it is ignored as it is durable so
-            // we check before we send. see: suppressMessageDispatch()
+            sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
         }
     }
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java Fri May 17 14:27:48 2013
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.NetworkBridgeFilter;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,8 @@ public class DemandSubscription {
     private final Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
     private final AtomicInteger dispatched = new AtomicInteger(0);
     private final AtomicBoolean activeWaiter = new AtomicBoolean();
+    private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
+    private SubscriptionInfo localDurableSubscriber;
 
     private NetworkBridgeFilter networkBridgeFilter;
 
@@ -69,6 +72,10 @@ public class DemandSubscription {
         return remoteSubsIds.remove(id);
     }
 
+    public Set<SubscriptionInfo> getDurableRemoteSubs() {
+        return durableRemoteSubs;
+    }
+
     /**
      * @return true if there are no interested consumers
      */
@@ -138,4 +145,12 @@ public class DemandSubscription {
     public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) {
         this.networkBridgeFilter = networkBridgeFilter;
     }
+
+    public SubscriptionInfo getLocalDurableSubscriber() {
+        return localDurableSubscriber;
+    }
+
+    public void setLocalDurableSubscriber(SubscriptionInfo localDurableSubscriber) {
+        this.localDurableSubscriber = localDurableSubscriber;
+    }
 }

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri May 17 14:27:48 2013
@@ -214,6 +214,7 @@ public class ActiveMQMessageConsumer imp
 
         this.info = new ConsumerInfo(consumerId);
         this.info.setExclusive(this.session.connection.isExclusiveConsumer());
+        this.info.setClientId(this.session.connection.getClientID());
         this.info.setSubscriptionName(name);
         this.info.setPrefetchSize(prefetch);
         this.info.setCurrentPrefetchSize(prefetch);

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java Fri May 17 14:27:48 2013
@@ -42,6 +42,7 @@ public class ConsumerInfo extends BaseCo
     protected boolean browser;
     protected boolean dispatchAsync;
     protected String selector;
+    protected String clientId;
     protected String subscriptionName;
     protected boolean noLocal;
     protected boolean exclusive;
@@ -93,6 +94,7 @@ public class ConsumerInfo extends BaseCo
         info.browser = browser;
         info.dispatchAsync = dispatchAsync;
         info.selector = selector;
+        info.clientId = clientId;
         info.subscriptionName = subscriptionName;
         info.noLocal = noLocal;
         info.exclusive = exclusive;
@@ -217,6 +219,19 @@ public class ConsumerInfo extends BaseCo
     }
 
     /**
+     * Used to identify the id of a client connection.
+     *
+     * @openwire:property version=1
+     */
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    /**
      * Used to identify the name of a durable subscription.
      *
      * @openwire:property version=1

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Fri May 17 14:27:48 2013
@@ -34,6 +34,13 @@ public class SubscriptionInfo implements
     protected String subscriptionName;
     protected String selector;
 
+    public SubscriptionInfo() {}
+
+    public SubscriptionInfo(String clientId, String subscriptionName) {
+        this.clientId = clientId;
+        this.subscriptionName = subscriptionName;
+    }
+
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java Fri May 17 14:27:48 2013
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
  */
 public class DurableSubInBrokerNetworkTest extends NetworkTestSupport {
 
-    private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class);
+    private static final Logger LOG = LoggerFactory.getLogger(DurableSubInBrokerNetworkTest.class);
     // protected BrokerService localBroker;
     private final String subName = "Subscriber1";
     private final String subName2 = "Subscriber2";
@@ -152,6 +152,16 @@ public class DurableSubInBrokerNetworkTe
         assertTrue("Durable subscription should still be on remote broker",
                 foundSubInRemoteBrokerByTopicName(topicName));
 
+        sub2.close();
+        session.unsubscribe(subName2);
+
+        Thread.sleep(100);
+
+        assertFalse(foundSubInLocalBroker(subName2));
+
+        assertFalse("Durable subscription not unregistered on remote broker",
+                foundSubInRemoteBrokerByTopicName(topicName));
+
     }
 
     private boolean foundSubInRemoteBrokerByTopicName(String topicName) throws Exception {