You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/08/21 12:01:12 UTC

svn commit: r687677 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/bugs/

Author: gtully
Date: Thu Aug 21 03:01:10 2008
New Revision: 687677

URL: http://svn.apache.org/viewvc?rev=687677&view=rev
Log:
fix for AMQ-1902 introduced by fix for AMQ-1866

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=687677&r1=687676&r2=687677&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Aug 21 03:01:10 2008
@@ -20,6 +20,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -1166,19 +1167,24 @@
             // list anything that does not actually get dispatched.
             if (list != null && !list.isEmpty()) {
 //                System.out.println(getName()+": dispatching from paged in: "+list.size());
-                pagedInPendingDispatch.addAll(doActualDispatch(list));
+                if (pagedInPendingDispatch.isEmpty()) {
+                    pagedInPendingDispatch.addAll(doActualDispatch(list));
+                } else {
+                    pagedInPendingDispatch.addAll(list);
+                }
 //                System.out.println(getName()+": new pending list2: "+pagedInPendingDispatch.size());
             }
         } finally {
             dispatchLock.unlock();
         }
     }
-
+    
     /**
      * @return list of messages that could get dispatched to consumers if they were not full.
      */
     private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
         List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
+        Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
         List<Subscription> consumers;
         
         synchronized (this.consumers) {
@@ -1190,13 +1196,18 @@
             int interestCount=0;
             for (Subscription s : consumers) {
                 if (dispatchSelector.canSelect(s, node)) {
-                    if (!s.isFull()) {
-                        // Dispatch it.
-                        s.add(node);
-//                        System.out.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
-                        target = s;
-                        break;
-                    } 
+                    if (!fullConsumers.contains(s)) {
+                        if (!s.isFull()) {
+                            // Dispatch it.
+                            s.add(node);
+                            //System.err.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
+                            target = s;
+                            break;
+                        } else {
+                            // no further dispatch of list to a full consumer to avoid out of order message receipt 
+                            fullConsumers.add(s);
+                        }
+                    }
                     interestCount++;
                 }
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java?rev=687677&r1=687676&r2=687677&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java Thu Aug 21 03:01:10 2008
@@ -23,6 +23,8 @@
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
 
+import junit.framework.Test;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -62,4 +64,12 @@
         answer.addConnector(bindAddress);
         answer.setDeleteAllMessagesOnStartup(true);
     }
+    
+    public static Test suite() {
+        return suite(CursorQueueStoreTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java?rev=687677&r1=687676&r2=687677&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java Thu Aug 21 03:01:10 2008
@@ -23,6 +23,7 @@
 import java.util.concurrent.TimeUnit;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -31,19 +32,24 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+
+import junit.framework.Test;
 import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTest;
+import org.apache.activemq.broker.region.Queue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision: 1.3 $
  */
-public abstract class CursorSupport extends TestCase {
+public abstract class CursorSupport extends CombinationTestSupport {
 
-    protected static final int MESSAGE_COUNT = 500;
-    protected static final int PREFETCH_SIZE = 50;
+    public int MESSAGE_COUNT = 500;
+    public int PREFETCH_SIZE = 50;
     private static final Log LOG = LogFactory.getLog(CursorSupport.class);
 
     protected BrokerService broker;
@@ -55,7 +61,7 @@
 
     protected abstract void configureBroker(BrokerService answer) throws Exception;
 
-    public void XtestSendFirstThenConsume() throws Exception {
+    public void testSendFirstThenConsume() throws Exception {
         ConnectionFactory factory = createConnectionFactory();
         Connection consumerConnection = getConsumerConnection(factory);
         MessageConsumer consumer = getConsumer(consumerConnection);
@@ -85,7 +91,15 @@
         consumerConnection.close();
     }
 
-    public void testSendWhilstConaume() throws Exception {
+
+    public void initCombosForTestSendWhilstConsume() {
+        addCombinationValues("MESSAGE_COUNT", new Object[] {Integer.valueOf(400),
+                                                           Integer.valueOf(500)});
+        addCombinationValues("PREFETCH_SIZE", new Object[] {Integer.valueOf(100),
+                Integer.valueOf(50)});
+    }
+
+    public void testSendWhilstConsume() throws Exception {
         ConnectionFactory factory = createConnectionFactory();
         Connection consumerConnection = getConsumerConnection(factory);
         // create durable subs
@@ -150,7 +164,7 @@
             assertEquals("This should be the same at pos " + i + " in the list", sent.getJMSMessageID(), consumed.getJMSMessageID());
         }
     }
-
+    
     protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
         Connection connection = fac.createConnection();
         connection.setClientID("testConsumer");

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java?rev=687677&r1=687676&r2=687677&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java Thu Aug 21 03:01:10 2008
@@ -21,13 +21,13 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Connection;
-import javax.jms.Destination;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import junit.framework.TestCase;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -89,19 +89,16 @@
         brokerService.stop();
     }
 
-    // Failing
     public void testConsumerSlowDownPrefetch0() throws Exception {
         ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0";
         doTestConsumerSlowDown();
     }
 
-    // Failing
     public void testConsumerSlowDownPrefetch10() throws Exception {
         ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=10";
         doTestConsumerSlowDown();
     }
     
-    // Passing
     public void testConsumerSlowDownDefaultPrefetch() throws Exception {
         ACTIVEMQ_BROKER_URI = "tcp://localhost:61616";
         doTestConsumerSlowDown();
@@ -137,15 +134,18 @@
         threads.add(c2);
         c2.start();
 
+        int totalReceived = 0;
         for ( int i=0; i < 30; i++) {
             Thread.sleep(1000);
             long c1Counter = c1.counter.getAndSet(0);
             long c2Counter = c2.counter.getAndSet(0);
             System.out.println("c1: "+c1Counter+", c2: "+c2Counter);
+            totalReceived += c1Counter;
+            totalReceived += c2Counter;
             
             // Once message have been flowing for a few seconds, start asserting that c2 always gets messages.  It should be receiving about 100 / sec
-            if( i > 3 ) {
-                assertTrue("Consumer 2 should be receiving new messages every second.", c2Counter > 0);
+            if( i > 10 ) {
+                assertTrue("Total received=" + totalReceived + ", Consumer 2 should be receiving new messages every second.", c2Counter > 0);
             }
         }
     }    
@@ -204,8 +204,8 @@
                         } else {
                             sleepingTime = 1; 
                         }
-                        Thread.sleep(sleepingTime);
                         counter.incrementAndGet();
+                        Thread.sleep(sleepingTime);
                     }
                 }