You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/08/21 16:59:41 UTC
svn commit: r1375595 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/AdvisoryConsumer.java
test/java/org/apache/activemq/JmsTempDestinationTest.java
Author: gtully
Date: Tue Aug 21 14:59:41 2012
New Revision: 1375595
URL: http://svn.apache.org/viewvc?rev=1375595&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3985 - ActiveMQConnection temp advisory consumer should use asyncDispatch - can cause deadlock with slow consumers. Fix with test
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java?rev=1375595&r1=1375594&r2=1375595&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/AdvisoryConsumer.java Tue Aug 21 14:59:41 2012
@@ -45,6 +45,7 @@ public class AdvisoryConsumer implements
info.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
info.setPrefetchSize(1000);
info.setNoLocal(true);
+ info.setDispatchAsync(true);
this.connection.addDispatcher(info.getConsumerId(), this);
this.connection.syncSendPacket(this.info);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java?rev=1375595&r1=1375594&r2=1375595&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTempDestinationTest.java Tue Aug 21 14:59:41 2012
@@ -16,6 +16,7 @@
*/
package org.apache.activemq;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -23,6 +24,9 @@ import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -38,12 +42,17 @@ import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import junit.framework.TestCase;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.vm.VMTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @version
*/
public class JmsTempDestinationTest extends TestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(JmsTempDestinationTest.class);
private Connection connection;
private ActiveMQConnectionFactory factory;
protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
@@ -293,4 +302,58 @@ public class JmsTempDestinationTest exte
assertTrue("failed to throw an exception", true);
}
}
+
+ public void testSlowConsumerDoesNotBlockFastTempUsers() throws Exception {
+ ActiveMQConnectionFactory advisoryConnFactory = new ActiveMQConnectionFactory("vm://localhost?asyncQueueDepth=20");
+ Connection connection = advisoryConnFactory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ final CountDownLatch done = new CountDownLatch(1);
+ final AtomicBoolean ok = new AtomicBoolean(true);
+ final AtomicBoolean first = new AtomicBoolean(true);
+ VMTransport t = ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class);
+ t.setTransportListener(new TransportListener() {
+ @Override
+ public void onCommand(Object command) {
+ // block first dispatch for a while so broker backs up, but other connection should be able to proceed
+ if (first.compareAndSet(true, false)) {
+ try {
+ ok.set(done.await(35, TimeUnit.SECONDS));
+ LOG.info("Done waiting: " + ok.get());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public void onException(IOException error) {
+ }
+
+ @Override
+ public void transportInterupted() {
+ }
+
+ @Override
+ public void transportResumed() {
+ }
+ });
+
+ connection = factory.createConnection();
+ connections.add(connection);
+ ((ActiveMQConnection)connection).setWatchTopicAdvisories(false);
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ for (int i=0; i<2500; i++) {
+ TemporaryQueue queue = session.createTemporaryQueue();
+ MessageConsumer consumer = session.createConsumer(queue);
+ consumer.close();
+ queue.delete();
+ }
+ LOG.info("Done with work: " + ok.get());
+ done.countDown();
+ assertTrue("ok", ok.get());
+ }
}