You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/02/10 18:38:29 UTC

svn commit: r743027 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java

Author: gtully
Date: Tue Feb 10 17:38:28 2009
New Revision: 743027

URL: http://svn.apache.org/viewvc?rev=743027&view=rev
Log:
test for AMQ-2100

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=743027&r1=743026&r2=743027&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Tue Feb 10 17:38:28 2009
@@ -16,12 +16,18 @@
  */
 package org.apache.activemq;
 
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -36,6 +42,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import edu.emory.mathcs.backport.java.util.Collections;
+
 /**
  * Test cases used to test the JMS message consumer.
  * 
@@ -109,6 +117,73 @@
         assertEquals(2, counter.get());
     }
 
+
+    public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch closeDone = new CountDownLatch(1);
+        
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+        // preload the queue
+        sendMessages(session, destination, 2000);
+        
+
+        final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+       
+        final Map<Thread, Throwable> exceptions = 
+            Collections.synchronizedMap(new HashMap<Thread, Throwable>());
+        Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+            public void uncaughtException(Thread t, Throwable e) {
+                LOG.error("Uncaught exception:", e);
+                exceptions.put(t, e);
+            }
+        });
+        
+        final class AckAndClose implements Runnable {            
+            private Message message;
+
+            public AckAndClose(Message m) {
+                this.message = m;
+            }
+
+            public void run() {
+                try {   
+                    int count = counter.incrementAndGet();
+                    if (count == 590) {
+                        // close in a separate thread is ok by jms
+                        consumer.close();
+                        closeDone.countDown();
+                    }
+                    if (count % 200 == 0) {
+                        // ensure there are some outstanding messages
+                        // ack every 200
+                        message.acknowledge();
+                    }
+                } catch (Exception e) {        
+                    LOG.error("Exception on close or ack:", e);
+                    exceptions.put(Thread.currentThread(), e);
+                } 
+            }  
+        };
+    
+        final ExecutorService executor = Executors.newCachedThreadPool();
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) { 
+                // ack and close eventually in separate thread
+                executor.execute(new AckAndClose(m));
+            }
+        });
+
+        assertTrue(closeDone.await(20, TimeUnit.SECONDS));
+        // await possible exceptions
+        Thread.sleep(1000);
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+    }
+
+    
     public void initCombosForTestMutiReceiveWithPrefetch1() {
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),