You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2017/11/16 12:51:25 UTC

[4/6] activemq git commit: AMQ-6858 - handle resync of network proxy durables after restart

AMQ-6858 - handle resync of network proxy durables after restart

We need to properly handle the re-addition of network proxy durables
after the brokers are restarted so removal is done properly

(cherry picked from commit 6013441a9a7c4a13f7412d6d72638de0f420e6a3)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/247243c9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/247243c9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/247243c9

Branch: refs/heads/activemq-5.15.x
Commit: 247243c9c120f5d1c526330813dcc2b8079f52c3
Parents: 39d6321
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Nov 15 08:22:47 2017 -0500
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Nov 16 07:50:40 2017 -0500

----------------------------------------------------------------------
 .../apache/activemq/network/ConduitBridge.java  |  14 ++-
 .../network/DemandForwardingBridgeSupport.java  |  89 +++++++++-----
 .../activemq/network/DemandSubscription.java    |   6 -
 .../DurableFiveBrokerNetworkBridgeTest.java     | 117 ++++++++++++++++++-
 4 files changed, 185 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index bc9d004..70f45f7 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -81,10 +81,16 @@ public class ConduitBridge extends DemandForwardingBridge {
                         ds.addForcedDurableConsumer(info.getConsumerId());
                     }
                 } else {
-                	if (isProxyNSConsumer(info)) {
-                	    final BrokerId[] path = info.getBrokerPath();
-                	    addProxyNetworkSubscription(ds, path, info.getSubscriptionName());
-                	} else {
+                    //Handle the demand generated by proxy network subscriptions
+                    //The broker path is case is normal
+                    if (isProxyNSConsumerBrokerPath(info)) {
+                        final BrokerId[] path = info.getBrokerPath();
+                        addProxyNetworkSubscriptionBrokerPath(ds, path, info.getSubscriptionName());
+                    //This is the durable sync case on broker restart
+                    } else if (isProxyNSConsumerClientId(info.getClientId()) &&
+                            isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) {
+                        addProxyNetworkSubscriptionClientId(ds, info.getClientId(), info.getSubscriptionName());
+                    } else {
                 		ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
                 	}
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 75084d1..df493c3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -36,7 +36,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import javax.management.ObjectName;
@@ -672,31 +671,53 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 (info.getClientId() == null || info.getClientId().startsWith(configuration.getName()));
     }
 
-    private boolean isProxyBridgeSubscription(SubscriptionInfo info) {
-        if (info.getSubcriptionName() != null && info.getClientId() != null) {
-            if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)
-                    && !info.getClientId().startsWith(configuration.getName())) {
+    protected boolean isProxyBridgeSubscription(String clientId, String subName) {
+        if (subName != null && clientId != null) {
+            if (subName.startsWith(DURABLE_SUB_PREFIX) && !clientId.startsWith(configuration.getName())) {
                 return true;
             }
         }
         return false;
     }
 
-    protected void addProxyNetworkSubscription(final DemandSubscription sub, final BrokerId[] path, String subName) {
+    /**
+     * This scenaior is primarily used for durable sync on broker restarts
+     *
+     * @param sub
+     * @param clientId
+     * @param subName
+     */
+    protected void addProxyNetworkSubscriptionClientId(final DemandSubscription sub, final String clientId, String subName) {
+        if (clientId != null && sub != null && subName != null) {
+                String newClientId = getProxyBridgeClientId(clientId);
+                final SubscriptionInfo newSubInfo = new SubscriptionInfo(newClientId, subName);
+                sub.getDurableRemoteSubs().add(newSubInfo);
+                LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
+
+        } else {
+            LOG.debug("Skipping addProxyNetworkSubscription");
+        }
+    }
+
+    /**
+     * Add a durable remote proxy subscription when we can generate via the BrokerId path
+     * This is the most common scenario
+     *
+     * @param sub
+     * @param path
+     * @param subName
+     */
+    protected void addProxyNetworkSubscriptionBrokerPath(final DemandSubscription sub, final BrokerId[] path, String subName) {
         if (sub != null && path.length > 1 && subName != null) {
             String b1 = path[path.length-1].toString();
             String b2 = path[path.length-2].toString();
             final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName);
             sub.getDurableRemoteSubs().add(newSubInfo);
-            sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> new AtomicInteger()).incrementAndGet();
-            LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
-        } else {
-            LOG.debug("Skipping addProxyNetworkSubscription");
         }
     }
 
-    private String getProxyBridgeClientId(SubscriptionInfo info) {
-        String newClientId = info.getClientId();
+    private String getProxyBridgeClientId(String clientId) {
+        String newClientId = clientId;
         String[] clientIdTokens = newClientId != null ? newClientId.split(Pattern.quote(configuration.getClientIdToken())) : null;
         if (clientIdTokens != null && clientIdTokens.length > 2) {
             newClientId = clientIdTokens[clientIdTokens.length - 3] +  configuration.getClientIdToken() + "inbound"
@@ -705,10 +726,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         return newClientId;
     }
 
-    protected boolean isProxyNSConsumer(ConsumerInfo info) {
+    protected boolean isProxyNSConsumerBrokerPath(ConsumerInfo info) {
         return info.getBrokerPath() != null && info.getBrokerPath().length > 1;
     }
 
+    protected boolean isProxyNSConsumerClientId(String clientId) {
+        return clientId != null && clientId.split(Pattern.quote(configuration.getClientIdToken())).length > 3;
+    }
+
     protected void serviceRemoteCommand(Command command) {
         if (!disposed.get()) {
             try {
@@ -1008,28 +1033,26 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
            }
 
         } else if (data.getClass() == RemoveSubscriptionInfo.class) {
-            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
-            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
+            final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
+            final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
+            final boolean proxyBridgeSub = isProxyBridgeSubscription(subscriptionInfo.getClientId(),
+                    subscriptionInfo.getSubscriptionName());
             for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                 DemandSubscription ds = i.next();
                 boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
 
-                if (removed) {
-                    cleanupDurableSub(ds, i);
                 //If this is a proxy bridge subscription we need to try changing the clientId
-                } else if (!removed && isProxyBridgeSubscription(subscriptionInfo)){
-                    subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo));
+                if (!removed && proxyBridgeSub){
+                    subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo.getClientId()));
                     if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) {
-                        AtomicInteger count = ds.getNetworkDemandConsumerMap().computeIfAbsent(subscriptionInfo, v -> new AtomicInteger());
-                        count.decrementAndGet();
-                        //Only remove the durable remote sub if the count <= 0
-                        if (count.get() <= 0) {
-                            ds.getDurableRemoteSubs().remove(subscriptionInfo);
-                            ds.getNetworkDemandConsumerMap().remove(subscriptionInfo);
-                            cleanupDurableSub(ds, i);
-                        }
+                        ds.getDurableRemoteSubs().remove(subscriptionInfo);
+                        removed = true;
                     }
                 }
+
+                if (removed) {
+                    cleanupDurableSub(ds, i);
+                }
             }
         }
     }
@@ -1407,9 +1430,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 undoMapRegistration(sub);
             } else {
                 if (consumerInfo.isDurable()) {
-                	if (isProxyNSConsumer(sub.getRemoteInfo())) {
-                		BrokerId[] path = sub.getRemoteInfo().getBrokerPath();
-                		addProxyNetworkSubscription(sub, path, consumerInfo.getSubscriptionName());
+                    //Handle the demand generated by proxy network subscriptions
+                    //The broker path is case is normal
+                    if (isProxyNSConsumerBrokerPath(sub.getRemoteInfo())) {
+                        final BrokerId[] path = info.getBrokerPath();
+                        addProxyNetworkSubscriptionBrokerPath(sub, path, consumerInfo.getSubscriptionName());
+                    //This is the durable sync case on broker restart
+                    } else if (isProxyNSConsumerClientId(sub.getRemoteInfo().getClientId()) &&
+                            isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) {
+                		addProxyNetworkSubscriptionClientId(sub, sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName());
                 	} else {
             			sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
             		}

http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
index 96a9baf..843a6d1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
@@ -44,8 +44,6 @@ public class DemandSubscription {
     private final AtomicBoolean activeWaiter = new AtomicBoolean();
     private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
     private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>();
-    //Used for proxy network consumers
-    private final Map<SubscriptionInfo, AtomicInteger> networkDemandConsumerMap = new ConcurrentHashMap<>();
     private SubscriptionInfo localDurableSubscriber;
 
     private NetworkBridgeFilter networkBridgeFilter;
@@ -87,10 +85,6 @@ public class DemandSubscription {
         return durableRemoteSubs;
     }
 
-    public Map<SubscriptionInfo, AtomicInteger> getNetworkDemandConsumerMap() {
-        return networkDemandConsumerMap;
-    }
-
     /**
      * @return true if there are no interested consumers
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 94d7393..2d8dc03 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.network;
 
+import java.io.File;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
@@ -46,6 +47,7 @@ import junit.framework.Test;
 public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
 
     private boolean duplex = true;
+    private boolean deletePersistentMessagesOnStartup = true;
 
     @Override
     protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
@@ -61,6 +63,117 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
         return connector;
     }
 
+    public void testDurablePropagationBrokerRestartDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagationBrokerRestart();
+    }
+
+    public void testDurablePropagationBrokerRestartOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagationBrokerRestart();
+    }
+
+    protected void testDurablePropagationBrokerRestart() throws Exception {
+        deletePersistentMessagesOnStartup = true;
+
+        // Setup broker networks
+        bridgeBrokers("Broker_A_A", "Broker_B_B");
+        bridgeBrokers("Broker_B_B", "Broker_C_C");
+        bridgeBrokers("Broker_C_C", "Broker_D_D");
+        bridgeBrokers("Broker_D_D", "Broker_E_E");
+
+        if (!duplex) {
+            bridgeBrokers("Broker_B_B", "Broker_A_A");
+            bridgeBrokers("Broker_C_C", "Broker_B_B");
+            bridgeBrokers("Broker_D_D", "Broker_C_C");
+            bridgeBrokers("Broker_E_E", "Broker_D_D");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        Connection conn = brokers.get("Broker_A_A").factory.createConnection();
+        conn.setClientID("clientId1");
+        conn.start();
+        Session ses = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientA2 = ses.createDurableSubscriber(dest, "subA2");
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+
+        //bring online a consumer on the other side
+        Connection conn2 = brokers.get("Broker_E_E").factory.createConnection();
+        conn2.setClientID("clientId2");
+        conn2.start();
+        Session ses2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
+        MessageConsumer clientE2 = ses2.createDurableSubscriber(dest, "subE2");
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
+
+        clientA.close();
+        clientA2.close();
+        clientE.close();
+        clientE2.close();
+
+        this.destroyAllBrokers();
+        deletePersistentMessagesOnStartup = false;
+        String options = new String("?persistent=true&useJmx=false");
+        createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
+        createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
+        createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
+        createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + options));
+        createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + options));
+        bridgeBrokers("Broker_A_A", "Broker_B_B");
+        bridgeBrokers("Broker_B_B", "Broker_C_C");
+        bridgeBrokers("Broker_C_C", "Broker_D_D");
+        bridgeBrokers("Broker_D_D", "Broker_E_E");
+        if (!duplex) {
+            bridgeBrokers("Broker_B_B", "Broker_A_A");
+            bridgeBrokers("Broker_C_C", "Broker_B_B");
+            bridgeBrokers("Broker_D_D", "Broker_C_C");
+            bridgeBrokers("Broker_E_E", "Broker_D_D");
+        }
+
+        startAllBrokers();
+
+        conn = brokers.get("Broker_A_A").factory.createConnection();
+        conn.setClientID("clientId1");
+        conn.start();
+        ses = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        conn2 = brokers.get("Broker_E_E").factory.createConnection();
+        conn2.setClientID("clientId2");
+        conn2.start();
+        ses2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        //bring one online and leave others offline to test mixed
+        clientE = ses2.createDurableSubscriber(dest, "subE");
+        clientE.close();
+
+        ses.unsubscribe("subA");
+        ses.unsubscribe("subA2");
+        ses2.unsubscribe("subE");
+        ses2.unsubscribe("subE2");
+
+        assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
+    }
+
     public void testDurablePropagationDuplex() throws Exception {
         duplex = true;
         testDurablePropagation();
@@ -552,7 +665,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();
-        String options = new String("?persistent=false&useJmx=false");
+        String options = new String("?persistent=true&useJmx=false");
         createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
         createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
         createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
@@ -563,6 +676,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
     @Override
     protected void configureBroker(BrokerService broker) {
         broker.setBrokerId(broker.getBrokerName());
+        broker.setDeleteAllMessagesOnStartup(deletePersistentMessagesOnStartup);
+        broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "DurableFiveBrokerNetworkBridgeTest");
     }
 
     protected Session createSession(String broker) throws Exception {