You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/05/31 15:44:17 UTC
svn commit: r949742 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
activemq-spring/src/test/java/org/apache/bugs/
activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java
Author: gtully
Date: Mon May 31 13:44:17 2010
New Revision: 949742
URL: http://svn.apache.org/viewvc?rev=949742&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2754 - fix issue with conduit sub removal
Added:
activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/
activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=949742&r1=949741&r2=949742&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java Mon May 31 13:44:17 2010
@@ -79,7 +79,7 @@ public class ConduitBridge extends Deman
if (filter.matches(info.getDestination())) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " + ds.getRemoteInfo()
- + " with sub: " + info);
+ + " with sub: " + info.getConsumerId());
}
// add the interest in the subscription
// ds.add(ds.getRemoteInfo().getConsumerId());
@@ -96,7 +96,6 @@ public class ConduitBridge extends Deman
@Override
protected void removeDemandSubscription(ConsumerId id) throws IOException {
- super.removeDemandSubscription(id);
List<DemandSubscription> tmpList = new ArrayList<DemandSubscription>();
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
Added: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java?rev=949742&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java (added)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java Mon May 31 13:44:17 2010
@@ -0,0 +1,148 @@
+package org.apache.bugs;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+//import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class AMQ2754Test extends TestCase {
+
+ public void testNetworkOfBrokers() throws Exception {
+ BrokerService brokerService1 = null;
+ BrokerService brokerService2 = null;
+
+ final int total = 100;
+ final CountDownLatch latch = new CountDownLatch(total);
+ final boolean conduitSubscriptions = true;
+ try {
+
+ {
+ brokerService1 = new BrokerService();
+ brokerService1.setBrokerName("consumer");
+ brokerService1.setUseJmx(false);
+ brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter());
+ brokerService1.addConnector("tcp://0.0.0.0:61616");
+ brokerService1.start();
+ }
+
+ {
+ brokerService2 = new BrokerService();
+ brokerService2.setBrokerName("producer");
+ brokerService2.setUseJmx(false);
+ brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter());
+ brokerService2.addConnector("tcp://0.0.0.0:51515");
+ NetworkConnector network2 = brokerService2.addNetworkConnector("static:(tcp://localhost:61616)");
+ network2.setName("network1");
+ network2.setDynamicOnly(true);
+ network2.setConduitSubscriptions(conduitSubscriptions);
+ network2.setNetworkTTL(3);
+ network2.setPrefetchSize(1);
+ brokerService2.start();
+ }
+
+ ExecutorService pool = Executors.newSingleThreadExecutor();
+
+ ActiveMQConnectionFactory connectionFactory1 =
+ new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
+
+ connectionFactory1.setWatchTopicAdvisories(false);
+ final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+ container.setConnectionFactory(connectionFactory1);
+ container.setMaxConcurrentConsumers(10);
+ container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+ container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
+ container.setDestination(new ActiveMQQueue("testingqueue"));
+ container.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ latch.countDown();
+ }
+ });
+ container.setMaxMessagesPerTask(1);
+ container.afterPropertiesSet();
+ container.start();
+
+ pool.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ try {
+ final int batch = 10;
+ ActiveMQConnectionFactory connectionFactory2 =
+ new ActiveMQConnectionFactory("failover:(tcp://localhost:51515)");
+ PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory2);
+ connectionFactory2.setWatchTopicAdvisories(false);
+ JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
+ ActiveMQQueue queue = new ActiveMQQueue("testingqueue");
+ for(int b = 0; b < batch; b++) {
+ for(int i = 0; i < (total / batch); i++) {
+ final String id = ":batch=" + b + "i=" + i;
+ template.send(queue, new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ TextMessage message = session.createTextMessage();
+ message.setText("Hello World!" + id);
+ return message;
+ }
+ });
+ }
+ // give spring time to scale back again
+ while(container.getActiveConsumerCount() > 1) {
+ System.out.println("active consumer count:" + container.getActiveConsumerCount());
+ System.out.println("concurrent consumer count: " + container.getConcurrentConsumers());
+ Thread.sleep(1000);
+ }
+ }
+ //pooledConnectionFactory.stop();
+ } catch(Throwable t) {
+ t.printStackTrace();
+ }
+ return null;
+ }
+ });
+
+ pool.shutdown();
+ pool.awaitTermination(10, TimeUnit.SECONDS);
+
+ int count = 0;
+
+ // give it 20 seconds
+ while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) {
+ System.out.println("count " + latch.getCount());
+ }
+
+
+ container.destroy();
+
+ } finally {
+ try { if(brokerService1 != null) {
+ brokerService1.stop();
+ }} catch(Throwable t) { t.printStackTrace(); }
+ try { if(brokerService2 != null) {
+ brokerService2.stop();
+ }} catch(Throwable t) { t.printStackTrace(); }
+ }
+
+ if(latch.getCount() > 0) {
+ fail("latch should have gone down to 0 but was " + latch.getCount());
+ }
+
+ }
+
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/AMQ2754Test.java
------------------------------------------------------------------------------
svn:keywords = Rev Date