You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/08/21 16:59:41 UTC

svn commit: r1375595 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/AdvisoryConsumer.java test/java/org/apache/activemq/JmsTempDestinationTest.java

Author: gtully
Date: Tue Aug 21 14:59:41 2012
New Revision: 1375595

URL: http://svn.apache.org/viewvc?rev=1375595&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3985 - ActiveMQConnection temp advisory consumer should use asyncDispatch - can cause deadlock with slow consumers. Fix with test

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java?rev=1375595&r1=1375594&r2=1375595&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java Tue Aug 21 14:59:41 2012
@@ -45,6 +45,7 @@ public class AdvisoryConsumer implements
         info.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
         info.setPrefetchSize(1000);
         info.setNoLocal(true);
+        info.setDispatchAsync(true);
 
         this.connection.addDispatcher(info.getConsumerId(), this);
         this.connection.syncSendPacket(this.info);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java?rev=1375595&r1=1375594&r2=1375595&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java Tue Aug 21 14:59:41 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -23,6 +24,9 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -38,12 +42,17 @@ import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
 
 import junit.framework.TestCase;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.vm.VMTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @version
  */
 public class JmsTempDestinationTest extends TestCase {
 
+    private static final Logger LOG = LoggerFactory.getLogger(JmsTempDestinationTest.class);
     private Connection connection;
     private ActiveMQConnectionFactory factory;
     protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
@@ -293,4 +302,58 @@ public class JmsTempDestinationTest exte
             assertTrue("failed to throw an exception", true);
         }
     }
+
+    public void testSlowConsumerDoesNotBlockFastTempUsers() throws Exception {
+        ActiveMQConnectionFactory advisoryConnFactory = new ActiveMQConnectionFactory("vm://localhost?asyncQueueDepth=20");
+        Connection connection = advisoryConnFactory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final CountDownLatch done = new CountDownLatch(1);
+        final AtomicBoolean ok = new AtomicBoolean(true);
+        final AtomicBoolean first = new AtomicBoolean(true);
+        VMTransport t = ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class);
+        t.setTransportListener(new TransportListener() {
+            @Override
+            public void onCommand(Object command) {
+                // block first dispatch for a while so broker backs up, but other connection should be able to proceed
+                if (first.compareAndSet(true, false)) {
+                    try {
+                        ok.set(done.await(35, TimeUnit.SECONDS));
+                        LOG.info("Done waiting: " + ok.get());
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+            @Override
+            public void onException(IOException error) {
+            }
+
+            @Override
+            public void transportInterupted() {
+            }
+
+            @Override
+            public void transportResumed() {
+            }
+        });
+
+        connection = factory.createConnection();
+        connections.add(connection);
+        ((ActiveMQConnection)connection).setWatchTopicAdvisories(false);
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        for (int i=0; i<2500; i++) {
+            TemporaryQueue queue = session.createTemporaryQueue();
+            MessageConsumer consumer = session.createConsumer(queue);
+            consumer.close();
+            queue.delete();
+        }
+        LOG.info("Done with work: " + ok.get());
+        done.countDown();
+        assertTrue("ok", ok.get());
+    }
 }