You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2017/11/16 12:51:25 UTC
[4/6] activemq git commit: AMQ-6858 - handle resync of network proxy
durables after restart
AMQ-6858 - handle resync of network proxy durables after restart
We need to properly handle the re-addition of network proxy durables
after the brokers are restarted so removal is done properly
(cherry picked from commit 6013441a9a7c4a13f7412d6d72638de0f420e6a3)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/247243c9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/247243c9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/247243c9
Branch: refs/heads/activemq-5.15.x
Commit: 247243c9c120f5d1c526330813dcc2b8079f52c3
Parents: 39d6321
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Nov 15 08:22:47 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Nov 16 07:50:40 2017 -0500
----------------------------------------------------------------------
.../apache/activemq/network/ConduitBridge.java | 14 ++-
.../network/DemandForwardingBridgeSupport.java | 89 +++++++++-----
.../activemq/network/DemandSubscription.java | 6 -
.../DurableFiveBrokerNetworkBridgeTest.java | 117 ++++++++++++++++++-
4 files changed, 185 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index bc9d004..70f45f7 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -81,10 +81,16 @@ public class ConduitBridge extends DemandForwardingBridge {
ds.addForcedDurableConsumer(info.getConsumerId());
}
} else {
- if (isProxyNSConsumer(info)) {
- final BrokerId[] path = info.getBrokerPath();
- addProxyNetworkSubscription(ds, path, info.getSubscriptionName());
- } else {
+ //Handle the demand generated by proxy network subscriptions
+ //The broker path is case is normal
+ if (isProxyNSConsumerBrokerPath(info)) {
+ final BrokerId[] path = info.getBrokerPath();
+ addProxyNetworkSubscriptionBrokerPath(ds, path, info.getSubscriptionName());
+ //This is the durable sync case on broker restart
+ } else if (isProxyNSConsumerClientId(info.getClientId()) &&
+ isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) {
+ addProxyNetworkSubscriptionClientId(ds, info.getClientId(), info.getSubscriptionName());
+ } else {
ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 75084d1..df493c3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -36,7 +36,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.management.ObjectName;
@@ -672,31 +671,53 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
(info.getClientId() == null || info.getClientId().startsWith(configuration.getName()));
}
- private boolean isProxyBridgeSubscription(SubscriptionInfo info) {
- if (info.getSubcriptionName() != null && info.getClientId() != null) {
- if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)
- && !info.getClientId().startsWith(configuration.getName())) {
+ protected boolean isProxyBridgeSubscription(String clientId, String subName) {
+ if (subName != null && clientId != null) {
+ if (subName.startsWith(DURABLE_SUB_PREFIX) && !clientId.startsWith(configuration.getName())) {
return true;
}
}
return false;
}
- protected void addProxyNetworkSubscription(final DemandSubscription sub, final BrokerId[] path, String subName) {
+ /**
+ * This scenaior is primarily used for durable sync on broker restarts
+ *
+ * @param sub
+ * @param clientId
+ * @param subName
+ */
+ protected void addProxyNetworkSubscriptionClientId(final DemandSubscription sub, final String clientId, String subName) {
+ if (clientId != null && sub != null && subName != null) {
+ String newClientId = getProxyBridgeClientId(clientId);
+ final SubscriptionInfo newSubInfo = new SubscriptionInfo(newClientId, subName);
+ sub.getDurableRemoteSubs().add(newSubInfo);
+ LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
+
+ } else {
+ LOG.debug("Skipping addProxyNetworkSubscription");
+ }
+ }
+
+ /**
+ * Add a durable remote proxy subscription when we can generate via the BrokerId path
+ * This is the most common scenario
+ *
+ * @param sub
+ * @param path
+ * @param subName
+ */
+ protected void addProxyNetworkSubscriptionBrokerPath(final DemandSubscription sub, final BrokerId[] path, String subName) {
if (sub != null && path.length > 1 && subName != null) {
String b1 = path[path.length-1].toString();
String b2 = path[path.length-2].toString();
final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName);
sub.getDurableRemoteSubs().add(newSubInfo);
- sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> new AtomicInteger()).incrementAndGet();
- LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
- } else {
- LOG.debug("Skipping addProxyNetworkSubscription");
}
}
- private String getProxyBridgeClientId(SubscriptionInfo info) {
- String newClientId = info.getClientId();
+ private String getProxyBridgeClientId(String clientId) {
+ String newClientId = clientId;
String[] clientIdTokens = newClientId != null ? newClientId.split(Pattern.quote(configuration.getClientIdToken())) : null;
if (clientIdTokens != null && clientIdTokens.length > 2) {
newClientId = clientIdTokens[clientIdTokens.length - 3] + configuration.getClientIdToken() + "inbound"
@@ -705,10 +726,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return newClientId;
}
- protected boolean isProxyNSConsumer(ConsumerInfo info) {
+ protected boolean isProxyNSConsumerBrokerPath(ConsumerInfo info) {
return info.getBrokerPath() != null && info.getBrokerPath().length > 1;
}
+ protected boolean isProxyNSConsumerClientId(String clientId) {
+ return clientId != null && clientId.split(Pattern.quote(configuration.getClientIdToken())).length > 3;
+ }
+
protected void serviceRemoteCommand(Command command) {
if (!disposed.get()) {
try {
@@ -1008,28 +1033,26 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
} else if (data.getClass() == RemoveSubscriptionInfo.class) {
- RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
- SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
+ final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
+ final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
+ final boolean proxyBridgeSub = isProxyBridgeSubscription(subscriptionInfo.getClientId(),
+ subscriptionInfo.getSubscriptionName());
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
- if (removed) {
- cleanupDurableSub(ds, i);
//If this is a proxy bridge subscription we need to try changing the clientId
- } else if (!removed && isProxyBridgeSubscription(subscriptionInfo)){
- subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo));
+ if (!removed && proxyBridgeSub){
+ subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo.getClientId()));
if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) {
- AtomicInteger count = ds.getNetworkDemandConsumerMap().computeIfAbsent(subscriptionInfo, v -> new AtomicInteger());
- count.decrementAndGet();
- //Only remove the durable remote sub if the count <= 0
- if (count.get() <= 0) {
- ds.getDurableRemoteSubs().remove(subscriptionInfo);
- ds.getNetworkDemandConsumerMap().remove(subscriptionInfo);
- cleanupDurableSub(ds, i);
- }
+ ds.getDurableRemoteSubs().remove(subscriptionInfo);
+ removed = true;
}
}
+
+ if (removed) {
+ cleanupDurableSub(ds, i);
+ }
}
}
}
@@ -1407,9 +1430,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
undoMapRegistration(sub);
} else {
if (consumerInfo.isDurable()) {
- if (isProxyNSConsumer(sub.getRemoteInfo())) {
- BrokerId[] path = sub.getRemoteInfo().getBrokerPath();
- addProxyNetworkSubscription(sub, path, consumerInfo.getSubscriptionName());
+ //Handle the demand generated by proxy network subscriptions
+ //The broker path is case is normal
+ if (isProxyNSConsumerBrokerPath(sub.getRemoteInfo())) {
+ final BrokerId[] path = info.getBrokerPath();
+ addProxyNetworkSubscriptionBrokerPath(sub, path, consumerInfo.getSubscriptionName());
+ //This is the durable sync case on broker restart
+ } else if (isProxyNSConsumerClientId(sub.getRemoteInfo().getClientId()) &&
+ isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) {
+ addProxyNetworkSubscriptionClientId(sub, sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName());
} else {
sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
index 96a9baf..843a6d1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
@@ -44,8 +44,6 @@ public class DemandSubscription {
private final AtomicBoolean activeWaiter = new AtomicBoolean();
private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>();
- //Used for proxy network consumers
- private final Map<SubscriptionInfo, AtomicInteger> networkDemandConsumerMap = new ConcurrentHashMap<>();
private SubscriptionInfo localDurableSubscriber;
private NetworkBridgeFilter networkBridgeFilter;
@@ -87,10 +85,6 @@ public class DemandSubscription {
return durableRemoteSubs;
}
- public Map<SubscriptionInfo, AtomicInteger> getNetworkDemandConsumerMap() {
- return networkDemandConsumerMap;
- }
-
/**
* @return true if there are no interested consumers
*/
http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 94d7393..2d8dc03 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.network;
+import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -46,6 +47,7 @@ import junit.framework.Test;
public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
private boolean duplex = true;
+ private boolean deletePersistentMessagesOnStartup = true;
@Override
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
@@ -61,6 +63,117 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
return connector;
}
+ public void testDurablePropagationBrokerRestartDuplex() throws Exception {
+ duplex = true;
+ testDurablePropagationBrokerRestart();
+ }
+
+ public void testDurablePropagationBrokerRestartOneWay() throws Exception {
+ duplex = false;
+ testDurablePropagationBrokerRestart();
+ }
+
+ protected void testDurablePropagationBrokerRestart() throws Exception {
+ deletePersistentMessagesOnStartup = true;
+
+ // Setup broker networks
+ bridgeBrokers("Broker_A_A", "Broker_B_B");
+ bridgeBrokers("Broker_B_B", "Broker_C_C");
+ bridgeBrokers("Broker_C_C", "Broker_D_D");
+ bridgeBrokers("Broker_D_D", "Broker_E_E");
+
+ if (!duplex) {
+ bridgeBrokers("Broker_B_B", "Broker_A_A");
+ bridgeBrokers("Broker_C_C", "Broker_B_B");
+ bridgeBrokers("Broker_D_D", "Broker_C_C");
+ bridgeBrokers("Broker_E_E", "Broker_D_D");
+ }
+
+ startAllBrokers();
+
+ // Setup destination
+ ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ Connection conn = brokers.get("Broker_A_A").factory.createConnection();
+ conn.setClientID("clientId1");
+ conn.start();
+ Session ses = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+ MessageConsumer clientA2 = ses.createDurableSubscriber(dest, "subA2");
+
+ // let consumers propagate around the network
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+
+ //bring online a consumer on the other side
+ Connection conn2 = brokers.get("Broker_E_E").factory.createConnection();
+ conn2.setClientID("clientId2");
+ conn2.start();
+ Session ses2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
+ MessageConsumer clientE2 = ses2.createDurableSubscriber(dest, "subE2");
+
+ // let consumers propagate around the network
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
+ assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
+
+ clientA.close();
+ clientA2.close();
+ clientE.close();
+ clientE2.close();
+
+ this.destroyAllBrokers();
+ deletePersistentMessagesOnStartup = false;
+ String options = new String("?persistent=true&useJmx=false");
+ createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
+ createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
+ createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
+ createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + options));
+ createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + options));
+ bridgeBrokers("Broker_A_A", "Broker_B_B");
+ bridgeBrokers("Broker_B_B", "Broker_C_C");
+ bridgeBrokers("Broker_C_C", "Broker_D_D");
+ bridgeBrokers("Broker_D_D", "Broker_E_E");
+ if (!duplex) {
+ bridgeBrokers("Broker_B_B", "Broker_A_A");
+ bridgeBrokers("Broker_C_C", "Broker_B_B");
+ bridgeBrokers("Broker_D_D", "Broker_C_C");
+ bridgeBrokers("Broker_E_E", "Broker_D_D");
+ }
+
+ startAllBrokers();
+
+ conn = brokers.get("Broker_A_A").factory.createConnection();
+ conn.setClientID("clientId1");
+ conn.start();
+ ses = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn2 = brokers.get("Broker_E_E").factory.createConnection();
+ conn2.setClientID("clientId2");
+ conn2.start();
+ ses2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ //bring one online and leave others offline to test mixed
+ clientE = ses2.createDurableSubscriber(dest, "subE");
+ clientE.close();
+
+ ses.unsubscribe("subA");
+ ses.unsubscribe("subA2");
+ ses2.unsubscribe("subE");
+ ses2.unsubscribe("subE2");
+
+ assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
+ assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
+ }
+
public void testDurablePropagationDuplex() throws Exception {
duplex = true;
testDurablePropagation();
@@ -552,7 +665,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
- String options = new String("?persistent=false&useJmx=false");
+ String options = new String("?persistent=true&useJmx=false");
createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
@@ -563,6 +676,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
@Override
protected void configureBroker(BrokerService broker) {
broker.setBrokerId(broker.getBrokerName());
+ broker.setDeleteAllMessagesOnStartup(deletePersistentMessagesOnStartup);
+ broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "DurableFiveBrokerNetworkBridgeTest");
}
protected Session createSession(String broker) throws Exception {