You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ce...@apache.org on 2013/02/15 06:22:08 UTC

svn commit: r1446447 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/advisory/ activemq-broker/src/main/java/org/apache/activemq/network/ activemq-unit-tests/src/test/java/org/apache/activemq/usecases/

Author: ceposta
Date: Fri Feb 15 05:22:08 2013
New Revision: 1446447

URL: http://svn.apache.org/r1446447
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-4000 Durable subscription not getting unregistered on networked broker, thanks torsten for the unit test!

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1446447&r1=1446446&r2=1446447&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Fri Feb 15 05:22:08 2013
@@ -29,25 +29,13 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.*;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.SubscriptionKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +53,7 @@ public class AdvisoryBroker extends Brok
     protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
     protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
     protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
+    protected final ConcurrentHashMap<SubscriptionKey, ActiveMQTopic> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, ActiveMQTopic>();
     protected final ProducerId advisoryProducerId = new ProducerId();
 
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
@@ -92,6 +81,12 @@ public class AdvisoryBroker extends Brok
 
         // Don't advise advisory topics.
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
+            if (info.getDestination().isTopic() && info.isDurable()) {
+                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
+                if (!this.durableSubscriptions.contains(key)) {
+                    this.durableSubscriptions.put(key, (ActiveMQTopic)info.getDestination());
+                }
+            }
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
             consumers.put(info.getConsumerId(), info);
             fireConsumerAdvisory(context, info.getDestination(), topic, info);
@@ -264,6 +259,26 @@ public class AdvisoryBroker extends Brok
     }
 
     @Override
+    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
+        super.removeSubscription(context, info);
+
+        SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
+
+        ActiveMQTopic dest = durableSubscriptions.get(key);
+        if (dest == null) {
+            LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub");
+        }
+
+        // Don't advise advisory topics.
+        if (!AdvisorySupport.isAdvisoryTopic(dest)) {
+            ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
+            durableSubscriptions.remove(key);
+            fireConsumerAdvisory(context,dest, topic, info);
+        }
+
+    }
+
+    @Override
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         super.removeProducer(context, info);
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1446447&r1=1446446&r2=1446447&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Feb 15 05:22:08 2013
@@ -50,32 +50,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTempDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.NetworkBridgeFilter;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.*;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.security.SecurityContext;
@@ -816,6 +791,11 @@ public abstract class DemandForwardingBr
         } else if (data.getClass() == RemoveInfo.class) {
             ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
             removeDemandSubscription(id);
+        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
+            RemoveSubscriptionInfo durableSub = (RemoveSubscriptionInfo)data;
+            LOG.debug("Removing durable subscription: clientId: "  + durableSub.getClientId()
+                    + ", durableName: " + durableSub.getSubcriptionName());
+            localBroker.oneway(data);
         }
     }
 

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java?rev=1446447&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java Fri Feb 15 05:22:08 2013
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import javax.management.ObjectName;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.network.NetworkTestSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests durable topic subscriptions inside a network of brokers.
+ * 
+ * @author tmielke
+ *
+ */
+public class DurableSubInBrokerNetworkTest extends NetworkTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class);
+    // protected BrokerService localBroker;
+    private final String subName = "Subscriber1";
+    private final String topicName = "TEST.FOO";
+
+    protected void setUp() throws Exception {
+        useJmx=true;
+        super.setUp();
+
+        URI ncUri = new URI("static:(" + connector.getConnectUri().toString() + ")");
+        NetworkConnector nc = new DiscoveryNetworkConnector(ncUri);
+        nc.setDuplex(true);
+        remoteBroker.addNetworkConnector(nc);
+        nc.start();
+    }
+
+    protected void tearDown() throws Exception {
+        if (remoteBroker.isStarted()) {
+            remoteBroker.stop();
+            remoteBroker.waitUntilStopped();
+        }
+        if (broker.isStarted()) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        super.tearDown();
+    }
+
+
+    /**
+     * Creates a durable topic subscription, checks that it is propagated
+     * in the broker network, removes the subscription and checks that
+     * the subscription is removed from remote broker as well.
+     *  
+     * @throws Exception
+     */
+    public void testDurableSubNetwork() throws Exception {
+        LOG.info("testDurableSubNetwork started.");
+
+        // create durable sub
+        ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory(connector.getConnectUri().toString());
+        Connection conn = fact.createConnection();
+        conn.setClientID("clientID1");
+        Session session = conn.createSession(false, 1);
+        Destination dest = session.createTopic(topicName);
+        TopicSubscriber sub = session.createDurableSubscriber((Topic)dest, subName);
+        LOG.info("Durable subscription of name " + subName + "created.");
+        Thread.sleep(100);
+
+        // query durable sub on local and remote broker
+        // raise an error if not found
+        boolean foundSub = false;
+        ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
+        
+        for (int i=0 ; i<subs.length; i++) {
+            if (subs[i].toString().contains(subName))
+                foundSub = true;
+        }
+        Assert.assertTrue(foundSub);
+
+        foundSub = false;
+        subs = remoteBroker.getAdminView().getDurableTopicSubscribers();
+        for (int i=0 ; i<subs.length; i++) {
+            if (subs[i].toString().contains("destinationName=" + topicName))
+                foundSub = true;
+        }
+        Assert.assertTrue(foundSub);
+
+        // unsubscribe from durable sub
+        sub.close();
+        session.unsubscribe(subName);
+        LOG.info("Unsubscribed from durable subscription.");
+        Thread.sleep(100);
+
+        // query durable sub on local and remote broker
+        // raise an error if its not removed from both brokers
+        foundSub = false;
+        subs = broker.getAdminView().getDurableTopicSubscribers();
+        for (int i=0 ; i<subs.length; i++) {
+            if (subs[i].toString().contains(subName))
+                foundSub = true;
+        }
+        Assert.assertFalse(foundSub);
+
+        foundSub = false;
+        subs = remoteBroker.getAdminView().getDurableTopicSubscribers();
+        for (int i=0 ; i<subs.length; i++) {
+            if (subs[i].toString().contains("destinationName=" + topicName))
+                foundSub = true;
+        }
+        Assert.assertFalse("Durable subscription not unregistered on remote broker", foundSub);
+
+
+    }
+}