You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/12/13 17:00:33 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6538

Repository: activemq
Updated Branches:
  refs/heads/master 27238b2dd -> 25703fbd1


https://issues.apache.org/jira/browse/AMQ-6538

Fixing an issue with syncDurableSubs that cause a bridge failure when
adding multiple bridges between the same brokers


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/25703fbd
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/25703fbd
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/25703fbd

Branch: refs/heads/master
Commit: 25703fbd1f27b65a7410acd7df0bfaf7c16845d8
Parents: 27238b2
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Dec 13 11:58:57 2016 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Dec 13 11:58:57 2016 -0500

----------------------------------------------------------------------
 .../activemq/network/DurableConduitBridge.java  | 20 ++++---
 .../network/DurableSyncNetworkBridgeTest.java   | 55 +++++++++++++++++++-
 2 files changed, 67 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/25703fbd/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index 50c9855..fb2b6c9 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -18,6 +18,7 @@ package org.apache.activemq.network;
 
 import java.io.IOException;
 
+import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
@@ -95,14 +96,19 @@ public class DurableConduitBridge extends ConduitBridge {
                         String candidateSubName = getSubscriberName(dest);
                         for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
                             String subName = subscription.getConsumerInfo().getSubscriptionName();
-                            if (subName != null && subName.equals(candidateSubName)) {
+                            if (subName != null && subName.equals(candidateSubName) &&
+                                    subscription instanceof DurableTopicSubscription) {
                                try {
-                                    // remove the NC subscription as it is no longer for a permissable dest
-                                    RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
-                                    sending.setClientId(localClientId);
-                                    sending.setSubscriptionName(subName);
-                                    sending.setConnectionId(this.localConnectionInfo.getConnectionId());
-                                    localBroker.oneway(sending);
+                                    DurableTopicSubscription durableSub = (DurableTopicSubscription) subscription;
+                                    //check the clientId so we only remove subs for the matching bridge
+                                    if (durableSub.getSubscriptionKey().getClientId().equals(localClientId)) {
+                                        // remove the NC subscription as it is no longer for a permissible dest
+                                        RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
+                                        sending.setClientId(localClientId);
+                                        sending.setSubscriptionName(subName);
+                                        sending.setConnectionId(this.localConnectionInfo.getConnectionId());
+                                        localBroker.oneway(sending);
+                                    }
                                 } catch (IOException e) {
                                     LOG.debug("Exception removing NC durable subscription: {}", subName, e);
                                     serviceRemoteException(e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/25703fbd/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 4a705f3..4e115a4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -76,7 +76,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
     private boolean forceDurable = false;
     private boolean useVirtualDestSubs = false;
     private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
-    public static enum FLOW {FORWARD, REVERSE};
+    public static enum FLOW {FORWARD, REVERSE}
 
     private BrokerService broker1;
     private BrokerService broker2;
@@ -535,6 +535,59 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
 
     }
 
+    //Test that durable sync works with more than one bridge
+    @Test
+    public void testAddOnlineSubscriptionsTwoBridges() throws Exception {
+
+        final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+        final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName);
+        final ActiveMQTopic topic2 = new ActiveMQTopic("include.new.topic");
+
+        assertSubscriptionsCount(broker1, topic, 0);
+        assertNCDurableSubsCount(broker2, topic, 0);
+
+        //create durable that shouldn't be propagated
+        session1.createDurableSubscriber(excludeTopic, "sub-exclude");
+
+        //Add 3 online subs
+        session1.createDurableSubscriber(topic, subName);
+        session1.createDurableSubscriber(topic, "sub2");
+        session1.createDurableSubscriber(topic, "sub3");
+        //Add sub on second topic/bridge
+        session1.createDurableSubscriber(topic2, "secondTopicSubName");
+        assertSubscriptionsCount(broker1, topic, 3);
+        assertSubscriptionsCount(broker1, topic2, 1);
+
+        //Add the second network connector
+        NetworkConnector secondConnector = configureLocalNetworkConnector();
+        secondConnector.setName("networkConnector2");
+        secondConnector.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination>newArrayList(
+                        new ActiveMQTopic("include.new.topic?forceDurable=" + forceDurable)));
+        localBroker.addNetworkConnector(secondConnector);
+        secondConnector.start();
+
+        //Make sure both bridges are connected
+        assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
+                        localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1;
+            }
+        }, 10000, 500));
+
+        //Make sure NC durables exist for both bridges
+        assertNCDurableSubsCount(broker2, topic2, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
+        assertNCDurableSubsCount(broker2, excludeTopic, 0);
+
+        //Make sure message can reach remote broker
+        MessageProducer producer = session2.createProducer(topic2);
+        producer.send(session2.createTextMessage("test"));
+        waitForDispatchFromLocalBroker(broker2.getDestination(topic2).getDestinationStatistics(), 1);
+        assertLocalBrokerStatistics(broker2.getDestination(topic2).getDestinationStatistics(), 1);
+    }
+
     @Test(timeout = 60 * 1000)
     public void testVirtualDestSubForceDurableSync() throws Exception {
         Assume.assumeTrue(flow == FLOW.FORWARD);