You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/02/10 18:38:29 UTC
svn commit: r743027 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
Author: gtully
Date: Tue Feb 10 17:38:28 2009
New Revision: 743027
URL: http://svn.apache.org/viewvc?rev=743027&view=rev
Log:
test for AMQ-2100
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=743027&r1=743026&r2=743027&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Tue Feb 10 17:38:28 2009
@@ -16,12 +16,18 @@
*/
package org.apache.activemq;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -36,6 +42,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.Collections;
+
/**
* Test cases used to test the JMS message consumer.
*
@@ -109,6 +117,73 @@
assertEquals(2, counter.get());
}
+
+ public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch closeDone = new CountDownLatch(1);
+
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+ // preload the queue
+ sendMessages(session, destination, 2000);
+
+
+ final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+
+ final Map<Thread, Throwable> exceptions =
+ Collections.synchronizedMap(new HashMap<Thread, Throwable>());
+ Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("Uncaught exception:", e);
+ exceptions.put(t, e);
+ }
+ });
+
+ final class AckAndClose implements Runnable {
+ private Message message;
+
+ public AckAndClose(Message m) {
+ this.message = m;
+ }
+
+ public void run() {
+ try {
+ int count = counter.incrementAndGet();
+ if (count == 590) {
+ // close in a separate thread is ok by jms
+ consumer.close();
+ closeDone.countDown();
+ }
+ if (count % 200 == 0) {
+ // ensure there are some outstanding messages
+ // ack every 200
+ message.acknowledge();
+ }
+ } catch (Exception e) {
+ LOG.error("Exception on close or ack:", e);
+ exceptions.put(Thread.currentThread(), e);
+ }
+ }
+ };
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ // ack and close eventually in separate thread
+ executor.execute(new AckAndClose(m));
+ }
+ });
+
+ assertTrue(closeDone.await(20, TimeUnit.SECONDS));
+ // await possible exceptions
+ Thread.sleep(1000);
+ assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+ }
+
+
public void initCombosForTestMutiReceiveWithPrefetch1() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),