You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2019/07/08 12:40:05 UTC

[activemq] branch activemq-5.15.x updated: AMQ-7238 - Ensure remoteId subscription map is also cleared when local map is cleared inside DemandForwardingBridgeSupport

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
     new 8a11ab4  AMQ-7238 - Ensure remoteId subscription map is also cleared when local map is cleared inside DemandForwardingBridgeSupport
8a11ab4 is described below

commit 8a11ab4a1b59693e498f16e629f7e21191445e6f
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Mon Jul 8 08:38:34 2019 -0400

    AMQ-7238 - Ensure remoteId subscription map is also cleared when local
    map is cleared inside DemandForwardingBridgeSupport
    
    (cherry picked from commit c7eff840588be718da9a985d5c2b8db89b26796d)
---
 .../network/DemandForwardingBridgeSupport.java     |  5 ++++-
 .../network/DynamicNetworkTestSupport.java         | 24 ++++++++++++++++++++++
 .../network/NetworkDurableRecreationTest.java      | 21 ++++++++++++++++++-
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 7ce0339..ddf5709 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1073,8 +1073,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             sending.setConnectionId(this.localConnectionInfo.getConnectionId());
             localBroker.oneway(sending);
 
-            //remove subscriber from map
+            //remove subscriber from local map
             i.remove();
+
+            //need to remove the mapping from the remote map as well
+            subscriptionMapByRemoteId.remove(ds.getRemoteInfo().getConsumerId());
         }
     }
 
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index 4b8942b..aade6d3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -16,10 +16,12 @@
  */
 package org.apache.activemq.network;
 
+import static junit.framework.TestCase.assertNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -30,6 +32,8 @@ import javax.jms.Session;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.broker.region.DestinationStatistics;
@@ -227,5 +231,25 @@ public abstract class DynamicNetworkTestSupport {
         }, 10000, 500));
     }
 
+    protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, final int count) {
+        assertNotNull(networkBridge);
+        DemandForwardingBridgeSupport bridge = (DemandForwardingBridgeSupport) networkBridge;
+        assertEquals(count, bridge.subscriptionMapByLocalId.size());
+        assertEquals(count, bridge.subscriptionMapByRemoteId.size());
+    }
+
+    protected DemandForwardingBridge findDuplexBridge(final TransportConnector connector) throws Exception {
+        assertNotNull(connector);
+
+        for (TransportConnection tc : connector.getConnections()) {
+            if (tc.getConnectionId().startsWith("networkConnector_")) {
+                final Field bridgeField = TransportConnection.class.getDeclaredField("duplexBridge");
+                bridgeField.setAccessible(true);
+                return (DemandForwardingBridge) bridgeField.get(tc);
+            }
+        }
+
+        return null;
+    }
 
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
index c5899a0..800c103 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.network;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+import java.lang.reflect.Field;
 import java.net.URI;
 
 import javax.jms.JMSException;
@@ -28,15 +30,18 @@ import javax.jms.Session;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.util.Wait;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
+import scala.annotation.bridge;
 
 /**
  * This test is to show that if a durable subscription over a network bridge is deleted and
@@ -106,7 +111,7 @@ public class NetworkDurableRecreationTest extends DynamicNetworkTestSupport {
         });
     }
 
-    public void testReceive(BrokerService receiveBroker, Session receiveSession,
+    protected void testReceive(BrokerService receiveBroker, Session receiveSession,
             BrokerService publishBroker, Session publishSession, ConsumerCreator secondConsumerCreator) throws Exception {
 
         final DestinationStatistics destinationStatistics =
@@ -118,6 +123,17 @@ public class NetworkDurableRecreationTest extends DynamicNetworkTestSupport {
 
         waitForConsumerCount(destinationStatistics, 1);
 
+        final NetworkBridge bridge;
+        if (publishBroker.getNetworkConnectors().size() > 0) {
+            Wait.waitFor(() -> publishBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, 10000, 500);
+            bridge = publishBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+        } else {
+            bridge = findDuplexBridge(publishBroker.getTransportConnectorByScheme("tcp"));
+        }
+
+        //Should be 2 - one for the durable destination and one for the advisory destinations
+        assertSubscriptionMapCounts(bridge, 2);
+
         //remove the durable
         final ConnectionContext context = new ConnectionContext();
         RemoveSubscriptionInfo info = getRemoveSubscriptionInfo(context, receiveBroker);
@@ -126,6 +142,9 @@ public class NetworkDurableRecreationTest extends DynamicNetworkTestSupport {
         receiveBroker.getBroker().removeSubscription(context, info);
         waitForConsumerCount(destinationStatistics, 0);
 
+        //Should be 1 - 0 for the durable destination and one for the advisory destinations
+        assertSubscriptionMapCounts(bridge, 1);
+
         //re-create consumer
         MessageConsumer bridgeConsumer2 = secondConsumerCreator.createConsumer();
         waitForConsumerCount(destinationStatistics, 1);