You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/02/14 17:37:30 UTC

svn commit: r1244116 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ test/java/org/apache/activemq/test/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Tue Feb 14 16:37:29 2012
New Revision: 1244116

URL: http://svn.apache.org/viewvc?rev=1244116&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3716: NetworkBridge with conduitSubscriptions=true will leak consumer info in org.apache.activemq.network.DemandForwardingBridgeSupport#subscriptionMapByRemoteId map. fix with test

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueSendReceiveTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1244116&r1=1244115&r2=1244116&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Tue Feb 14 16:37:29 2012
@@ -611,6 +611,7 @@ public abstract class DemandForwardingBr
                 LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
             }
             subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
+            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
 
             // continue removal in separate thread to free up this thread for outstanding responses
             asyncTaskRunner.execute(new Runnable() {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java?rev=1244116&r1=1244115&r2=1244116&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java Tue Feb 14 16:37:29 2012
@@ -110,6 +110,13 @@ public abstract class JmsSendReceiveTest
         Thread.sleep(1000);
         messages.clear();
 
+        sendMessages();
+
+        assertMessagesAreReceived();
+        LOG.info("" + data.length + " messages(s) received, closing down connections");
+    }
+
+    protected void sendMessages() throws Exception {
         for (int i = 0; i < data.length; i++) {
             Message message = createMessage(i);
             configureMessage(message);
@@ -118,11 +125,8 @@ public abstract class JmsSendReceiveTest
             }
             sendMessage(i, message);
         }
-
-        assertMessagesAreReceived();
-        LOG.info("" + data.length + " messages(s) received, closing down connections");
     }
-    
+
     protected void sendMessage(int index, Message message) throws Exception {
     	producer.send(producerDestination, message);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueSendReceiveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueSendReceiveTest.java?rev=1244116&r1=1244115&r2=1244116&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueSendReceiveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueSendReceiveTest.java Tue Feb 14 16:37:29 2012
@@ -16,20 +16,44 @@
  */
 package org.apache.activemq.usecases;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.network.DemandForwardingBridgeSupport;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * 
  */
 public class TwoBrokerQueueSendReceiveTest  extends TwoBrokerTopicSendReceiveTest {
 
-    protected ActiveMQConnectionFactory sendFactory;
-    protected ActiveMQConnectionFactory receiveFactory;
+    private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerQueueSendReceiveTest.class);
 
     protected void setUp() throws Exception {
         topic = false;
         super.setUp();
     }
 
+    public void testReceiveOnXConsumersNoLeak() throws Exception {
+        consumer.close();
+        sendMessages();
+        for (int i=0; i<data.length; i++) {
+            consumer = createConsumer();
+            onMessage(consumer.receive(10000));
+            consumer.close();
+        }
+        this.assertMessagesAreReceived();
+
+        BrokerService broker = (BrokerService) brokers.get("receiver");
+        final DemandForwardingBridgeSupport bridge = (DemandForwardingBridgeSupport) broker.getNetworkConnectors().get(0).activeBridges().toArray()[0];
+        assertTrue("No extra, size:" + bridge.getLocalSubscriptionMap().size(), Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("local subs map size = " + bridge.getLocalSubscriptionMap().size());
+                return 1 == bridge.getLocalSubscriptionMap().size();
+            }
+        }));
+
+    }
     
 }