You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/09/06 21:25:47 UTC

svn commit: r573342 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java

Author: chirino
Date: Thu Sep  6 12:25:46 2007
New Revision: 573342

URL: http://svn.apache.org/viewvc?rev=573342&view=rev
Log:
Fixed synchronizations so that threads don't block each others processing as much and now the test works fine without hanging.  see https://issues.apache.org/activemq/browse/AMQ-1251

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java?rev=573342&r1=573341&r2=573342&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java Thu Sep  6 12:25:46 2007
@@ -18,26 +18,32 @@
 package org.apache.activemq.bugs;
 
 import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
-import javax.jms.MessageListener;
-import javax.jms.Session;
 import javax.jms.Connection;
 import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.MessageProducer;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
 
+import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
 
-import junit.framework.TestCase;
-
 /**
  * Test case demonstrating situation where messages are not delivered to consumers.
  */
 public class QueueWorkerPrefetchTest extends TestCase implements MessageListener
 {
+    private static final long WAIT_TIMEOUT = 1000*10;
+
     /** The connection URL. */
     private static final String CONNECTION_URL = "tcp://localhost:61616";
 
@@ -57,10 +63,9 @@
     private MessageConsumer masterItemConsumer;
 
     /** The number of acks received by the master. */
-    private long acksReceived;
+    private AtomicLong acksReceived = new AtomicLong(0);
 
-    /** The expected number of acks the master should receive. */
-    private long expectedCount;
+    private AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>();
 
     /** Messages sent to the work-item queue. */
     private static class WorkMessage implements Serializable
@@ -75,7 +80,7 @@
     private static class Worker implements MessageListener
     {
         /** Counter shared between workers to decided when new work-item messages are created. */
-        private static Integer counter = new Integer(0);
+        private static AtomicInteger counter = new AtomicInteger(0);
 
         /** Session to use. */
         private Session session;
@@ -104,13 +109,9 @@
                 boolean sendMessage = false;
 
                 // Don't create a new work item for every 1000th message. */
-                synchronized (counter)
+                if (counter.incrementAndGet() % 1000 != 0)
                 {
-                    counter++;
-                    if (counter % 1000 != 0)
-                    {
-                        sendMessage = true;
-                    }
+                    sendMessage = true;
                 }
 
                 if (sendMessage)
@@ -140,16 +141,11 @@
     }
 
     /** Master message handler.  Process ack messages. */
-    public synchronized void onMessage(javax.jms.Message message)
+    public void onMessage(javax.jms.Message message)
     {
-        acksReceived++;
-        if (acksReceived == expectedCount)
-        {
-            // If expected number of acks are received, wake up the main process.
-            notify();
-        }
-        if (acksReceived % 100 == 0)
-        {
+        long acks = acksReceived.incrementAndGet();
+        latch.get().countDown();
+        if (acks % 100 == 0) {
             System.out.println("Master now has ack count of: " + acksReceived);
         }
     }
@@ -173,7 +169,7 @@
         super.tearDown();
     }
 
-    public synchronized void testActiveMQ()
+    public void testActiveMQ()
         throws Exception
     {
         // Create the connection to the broker.
@@ -198,30 +194,32 @@
         }
 
         // Send a message to the work queue, and wait for the 1000 acks from the workers.
-        expectedCount = 1000;
-        acksReceived = 0;
+        acksReceived.set(0);
+        latch.set(new CountDownLatch(1000));
         workItemProducer.send(masterSession.createObjectMessage(new WorkMessage()));
-        while (acksReceived != expectedCount)
-        {
-            wait();
+        
+        if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
+            fail("First batch only received " + acksReceived + " messages");
         }
+
         System.out.println("First batch received");
 
         // Send another message to the work queue, and wait for the next 1000 acks.  It is
         // at this point where the workers never get notified of this message, as they
         // have a large pending queue.  Creating a new worker at this point however will
         // receive this new message.
-        expectedCount = 2000;
+        acksReceived.set(0);
+        latch.set(new CountDownLatch(1000));
         workItemProducer.send(masterSession.createObjectMessage(new WorkMessage()));
-        while (acksReceived != expectedCount)
-        {
-            wait();
+        
+        if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
+            fail("Second batch only received " + acksReceived + " messages");
         }
+
         System.out.println("Second batch received");
 
         // Cleanup all JMS resources.
-        for (int i = 0; i < NUM_WORKERS; i++)
-        {
+        for (int i = 0; i < NUM_WORKERS; i++) {
             workers[i].close();
         }
         masterSession.close();