You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/02/14 17:37:30 UTC
svn commit: r1244116 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/network/ test/java/org/apache/activemq/test/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Tue Feb 14 16:37:29 2012
New Revision: 1244116
URL: http://svn.apache.org/viewvc?rev=1244116&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3716: NetworkBridge with conduitSubscriptions=true will leak consumer info in org.apache.activemq.network.DemandForwardingBridgeSupport#subscriptionMapByRemoteId map. fix with test
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueSendReceiveTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1244116&r1=1244115&r2=1244116&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Tue Feb 14 16:37:29 2012
@@ -611,6 +611,7 @@ public abstract class DemandForwardingBr
LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
}
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
+ subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
// continue removal in separate thread to free up this thread for outstanding responses
asyncTaskRunner.execute(new Runnable() {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java?rev=1244116&r1=1244115&r2=1244116&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java Tue Feb 14 16:37:29 2012
@@ -110,6 +110,13 @@ public abstract class JmsSendReceiveTest
Thread.sleep(1000);
messages.clear();
+ sendMessages();
+
+ assertMessagesAreReceived();
+ LOG.info("" + data.length + " messages(s) received, closing down connections");
+ }
+
+ protected void sendMessages() throws Exception {
for (int i = 0; i < data.length; i++) {
Message message = createMessage(i);
configureMessage(message);
@@ -118,11 +125,8 @@ public abstract class JmsSendReceiveTest
}
sendMessage(i, message);
}
-
- assertMessagesAreReceived();
- LOG.info("" + data.length + " messages(s) received, closing down connections");
}
-
+
protected void sendMessage(int index, Message message) throws Exception {
producer.send(producerDestination, message);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueSendReceiveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueSendReceiveTest.java?rev=1244116&r1=1244115&r2=1244116&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueSendReceiveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueSendReceiveTest.java Tue Feb 14 16:37:29 2012
@@ -16,20 +16,44 @@
*/
package org.apache.activemq.usecases;
-import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.network.DemandForwardingBridgeSupport;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*
*/
public class TwoBrokerQueueSendReceiveTest extends TwoBrokerTopicSendReceiveTest {
- protected ActiveMQConnectionFactory sendFactory;
- protected ActiveMQConnectionFactory receiveFactory;
+ private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerQueueSendReceiveTest.class);
protected void setUp() throws Exception {
topic = false;
super.setUp();
}
+ public void testReceiveOnXConsumersNoLeak() throws Exception {
+ consumer.close();
+ sendMessages();
+ for (int i=0; i<data.length; i++) {
+ consumer = createConsumer();
+ onMessage(consumer.receive(10000));
+ consumer.close();
+ }
+ this.assertMessagesAreReceived();
+
+ BrokerService broker = (BrokerService) brokers.get("receiver");
+ final DemandForwardingBridgeSupport bridge = (DemandForwardingBridgeSupport) broker.getNetworkConnectors().get(0).activeBridges().toArray()[0];
+ assertTrue("No extra, size:" + bridge.getLocalSubscriptionMap().size(), Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("local subs map size = " + bridge.getLocalSubscriptionMap().size());
+ return 1 == bridge.getLocalSubscriptionMap().size();
+ }
+ }));
+
+ }
}