You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ce...@apache.org on 2013/02/15 06:22:08 UTC
svn commit: r1446447 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/advisory/
activemq-broker/src/main/java/org/apache/activemq/network/
activemq-unit-tests/src/test/java/org/apache/activemq/usecases/
Author: ceposta
Date: Fri Feb 15 05:22:08 2013
New Revision: 1446447
URL: http://svn.apache.org/r1446447
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-4000 Durable subscription not getting unregistered on networked broker, thanks torsten for the unit test!
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1446447&r1=1446446&r2=1446447&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Fri Feb 15 05:22:08 2013
@@ -29,25 +29,13 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.*;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,6 +53,7 @@ public class AdvisoryBroker extends Brok
protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
+ protected final ConcurrentHashMap<SubscriptionKey, ActiveMQTopic> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, ActiveMQTopic>();
protected final ProducerId advisoryProducerId = new ProducerId();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
@@ -92,6 +81,12 @@ public class AdvisoryBroker extends Brok
// Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
+ if (info.getDestination().isTopic() && info.isDurable()) {
+ SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
+ if (!this.durableSubscriptions.contains(key)) {
+ this.durableSubscriptions.put(key, (ActiveMQTopic)info.getDestination());
+ }
+ }
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
consumers.put(info.getConsumerId(), info);
fireConsumerAdvisory(context, info.getDestination(), topic, info);
@@ -264,6 +259,26 @@ public class AdvisoryBroker extends Brok
}
@Override
+ public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
+ super.removeSubscription(context, info);
+
+ SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
+
+ ActiveMQTopic dest = durableSubscriptions.get(key);
+ if (dest == null) {
+ LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub");
+ }
+
+ // Don't advise advisory topics.
+ if (!AdvisorySupport.isAdvisoryTopic(dest)) {
+ ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
+ durableSubscriptions.remove(key);
+ fireConsumerAdvisory(context,dest, topic, info);
+ }
+
+ }
+
+ @Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.removeProducer(context, info);
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1446447&r1=1446446&r2=1446447&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Feb 15 05:22:08 2013
@@ -50,32 +50,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTempDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.NetworkBridgeFilter;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.*;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
@@ -816,6 +791,11 @@ public abstract class DemandForwardingBr
} else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id);
+ } else if (data.getClass() == RemoveSubscriptionInfo.class) {
+ RemoveSubscriptionInfo durableSub = (RemoveSubscriptionInfo)data;
+ LOG.debug("Removing durable subscription: clientId: " + durableSub.getClientId()
+ + ", durableName: " + durableSub.getSubcriptionName());
+ localBroker.oneway(data);
}
}
Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java?rev=1446447&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java Fri Feb 15 05:22:08 2013
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import javax.management.ObjectName;
+
+import junit.framework.Assert;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.network.NetworkTestSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests durable topic subscriptions inside a network of brokers.
+ *
+ * @author tmielke
+ *
+ */
+public class DurableSubInBrokerNetworkTest extends NetworkTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class);
+ // protected BrokerService localBroker;
+ private final String subName = "Subscriber1";
+ private final String topicName = "TEST.FOO";
+
+ protected void setUp() throws Exception {
+ useJmx=true;
+ super.setUp();
+
+ URI ncUri = new URI("static:(" + connector.getConnectUri().toString() + ")");
+ NetworkConnector nc = new DiscoveryNetworkConnector(ncUri);
+ nc.setDuplex(true);
+ remoteBroker.addNetworkConnector(nc);
+ nc.start();
+ }
+
+ protected void tearDown() throws Exception {
+ if (remoteBroker.isStarted()) {
+ remoteBroker.stop();
+ remoteBroker.waitUntilStopped();
+ }
+ if (broker.isStarted()) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ super.tearDown();
+ }
+
+
+ /**
+ * Creates a durable topic subscription, checks that it is propagated
+ * in the broker network, removes the subscription and checks that
+ * the subscription is removed from remote broker as well.
+ *
+ * @throws Exception
+ */
+ public void testDurableSubNetwork() throws Exception {
+ LOG.info("testDurableSubNetwork started.");
+
+ // create durable sub
+ ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory(connector.getConnectUri().toString());
+ Connection conn = fact.createConnection();
+ conn.setClientID("clientID1");
+ Session session = conn.createSession(false, 1);
+ Destination dest = session.createTopic(topicName);
+ TopicSubscriber sub = session.createDurableSubscriber((Topic)dest, subName);
+ LOG.info("Durable subscription of name " + subName + "created.");
+ Thread.sleep(100);
+
+ // query durable sub on local and remote broker
+ // raise an error if not found
+ boolean foundSub = false;
+ ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
+
+ for (int i=0 ; i<subs.length; i++) {
+ if (subs[i].toString().contains(subName))
+ foundSub = true;
+ }
+ Assert.assertTrue(foundSub);
+
+ foundSub = false;
+ subs = remoteBroker.getAdminView().getDurableTopicSubscribers();
+ for (int i=0 ; i<subs.length; i++) {
+ if (subs[i].toString().contains("destinationName=" + topicName))
+ foundSub = true;
+ }
+ Assert.assertTrue(foundSub);
+
+ // unsubscribe from durable sub
+ sub.close();
+ session.unsubscribe(subName);
+ LOG.info("Unsubscribed from durable subscription.");
+ Thread.sleep(100);
+
+ // query durable sub on local and remote broker
+ // raise an error if its not removed from both brokers
+ foundSub = false;
+ subs = broker.getAdminView().getDurableTopicSubscribers();
+ for (int i=0 ; i<subs.length; i++) {
+ if (subs[i].toString().contains(subName))
+ foundSub = true;
+ }
+ Assert.assertFalse(foundSub);
+
+ foundSub = false;
+ subs = remoteBroker.getAdminView().getDurableTopicSubscribers();
+ for (int i=0 ; i<subs.length; i++) {
+ if (subs[i].toString().contains("destinationName=" + topicName))
+ foundSub = true;
+ }
+ Assert.assertFalse("Durable subscription not unregistered on remote broker", foundSub);
+
+
+ }
+}