You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/12/13 17:00:33 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6538
Repository: activemq
Updated Branches:
refs/heads/master 27238b2dd -> 25703fbd1
https://issues.apache.org/jira/browse/AMQ-6538
Fixing an issue with syncDurableSubs that cause a bridge failure when
adding multiple bridges between the same brokers
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/25703fbd
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/25703fbd
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/25703fbd
Branch: refs/heads/master
Commit: 25703fbd1f27b65a7410acd7df0bfaf7c16845d8
Parents: 27238b2
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Dec 13 11:58:57 2016 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Dec 13 11:58:57 2016 -0500
----------------------------------------------------------------------
.../activemq/network/DurableConduitBridge.java | 20 ++++---
.../network/DurableSyncNetworkBridgeTest.java | 55 +++++++++++++++++++-
2 files changed, 67 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/25703fbd/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index 50c9855..fb2b6c9 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -18,6 +18,7 @@ package org.apache.activemq.network;
import java.io.IOException;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
@@ -95,14 +96,19 @@ public class DurableConduitBridge extends ConduitBridge {
String candidateSubName = getSubscriberName(dest);
for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
String subName = subscription.getConsumerInfo().getSubscriptionName();
- if (subName != null && subName.equals(candidateSubName)) {
+ if (subName != null && subName.equals(candidateSubName) &&
+ subscription instanceof DurableTopicSubscription) {
try {
- // remove the NC subscription as it is no longer for a permissable dest
- RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
- sending.setClientId(localClientId);
- sending.setSubscriptionName(subName);
- sending.setConnectionId(this.localConnectionInfo.getConnectionId());
- localBroker.oneway(sending);
+ DurableTopicSubscription durableSub = (DurableTopicSubscription) subscription;
+ //check the clientId so we only remove subs for the matching bridge
+ if (durableSub.getSubscriptionKey().getClientId().equals(localClientId)) {
+ // remove the NC subscription as it is no longer for a permissible dest
+ RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
+ sending.setClientId(localClientId);
+ sending.setSubscriptionName(subName);
+ sending.setConnectionId(this.localConnectionInfo.getConnectionId());
+ localBroker.oneway(sending);
+ }
} catch (IOException e) {
LOG.debug("Exception removing NC durable subscription: {}", subName, e);
serviceRemoteException(e);
http://git-wip-us.apache.org/repos/asf/activemq/blob/25703fbd/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 4a705f3..4e115a4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -76,7 +76,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
private boolean forceDurable = false;
private boolean useVirtualDestSubs = false;
private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
- public static enum FLOW {FORWARD, REVERSE};
+ public static enum FLOW {FORWARD, REVERSE}
private BrokerService broker1;
private BrokerService broker2;
@@ -535,6 +535,59 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
}
+ //Test that durable sync works with more than one bridge
+ @Test
+ public void testAddOnlineSubscriptionsTwoBridges() throws Exception {
+
+ final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+ final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName);
+ final ActiveMQTopic topic2 = new ActiveMQTopic("include.new.topic");
+
+ assertSubscriptionsCount(broker1, topic, 0);
+ assertNCDurableSubsCount(broker2, topic, 0);
+
+ //create durable that shouldn't be propagated
+ session1.createDurableSubscriber(excludeTopic, "sub-exclude");
+
+ //Add 3 online subs
+ session1.createDurableSubscriber(topic, subName);
+ session1.createDurableSubscriber(topic, "sub2");
+ session1.createDurableSubscriber(topic, "sub3");
+ //Add sub on second topic/bridge
+ session1.createDurableSubscriber(topic2, "secondTopicSubName");
+ assertSubscriptionsCount(broker1, topic, 3);
+ assertSubscriptionsCount(broker1, topic2, 1);
+
+ //Add the second network connector
+ NetworkConnector secondConnector = configureLocalNetworkConnector();
+ secondConnector.setName("networkConnector2");
+ secondConnector.setDynamicallyIncludedDestinations(
+ Lists.<ActiveMQDestination>newArrayList(
+ new ActiveMQTopic("include.new.topic?forceDurable=" + forceDurable)));
+ localBroker.addNetworkConnector(secondConnector);
+ secondConnector.start();
+
+ //Make sure both bridges are connected
+ assertTrue(Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
+ localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1;
+ }
+ }, 10000, 500));
+
+ //Make sure NC durables exist for both bridges
+ assertNCDurableSubsCount(broker2, topic2, 1);
+ assertNCDurableSubsCount(broker2, topic, 1);
+ assertNCDurableSubsCount(broker2, excludeTopic, 0);
+
+ //Make sure message can reach remote broker
+ MessageProducer producer = session2.createProducer(topic2);
+ producer.send(session2.createTextMessage("test"));
+ waitForDispatchFromLocalBroker(broker2.getDestination(topic2).getDestinationStatistics(), 1);
+ assertLocalBrokerStatistics(broker2.getDestination(topic2).getDestinationStatistics(), 1);
+ }
+
@Test(timeout = 60 * 1000)
public void testVirtualDestSubForceDurableSync() throws Exception {
Assume.assumeTrue(flow == FLOW.FORWARD);