You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/05/05 09:09:37 UTC

svn commit: r941174 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java

Author: dejanb
Date: Wed May  5 07:09:37 2010
New Revision: 941174

URL: http://svn.apache.org/viewvc?rev=941174&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2719 - order of redelivered messages

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=941174&r1=941173&r2=941174&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed May  5 07:09:37 2010
@@ -95,6 +95,7 @@ public class Queue extends BaseDestinati
     private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
     // Messages that are paged in but have not yet been targeted at a subscription
     private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
+    private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>();
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
@@ -377,7 +378,7 @@ public class Queue extends BaseDestinati
             wakeup();
         }
     }
-
+    
     public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) throws Exception {
         destinationStatistics.getConsumers().decrement();
         // synchronize with dispatch method so that no new messages are sent
@@ -406,7 +407,7 @@ public class Queue extends BaseDestinati
                 getMessageGroupOwners().removeConsumer(consumerId);
 
                 // redeliver inflight messages
-                List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
+                
                 for (MessageReference ref : sub.remove(context, this)) {
                     QueueMessageReference qmr = (QueueMessageReference) ref;
                     if (qmr.getLockOwner() == sub) {
@@ -416,11 +417,10 @@ public class Queue extends BaseDestinati
                             qmr.incrementRedeliveryCounter();
                         }
                     }
-                    list.add(qmr);
+                    redeliveredWaitingDispatch.add(qmr);
                 }
-
-                if (!list.isEmpty()) {
-                    doDispatch(list);
+                if (!redeliveredWaitingDispatch.isEmpty()) {
+                    doDispatch(new ArrayList());
                 }
             }
             if (!(this.optimizedDispatch || isSlave())) {
@@ -1220,7 +1220,7 @@ public class Queue extends BaseDestinati
             // Perhaps we should page always into the pagedInPendingDispatch list if 
             // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
             // then we do a dispatch.
-            if (pageInMoreMessages || pendingBrowserDispatch != null) {
+            if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
                 try {
                     pageInMessages(pendingBrowserDispatch != null);
 
@@ -1494,8 +1494,12 @@ public class Queue extends BaseDestinati
         synchronized (dispatchMutex) {
 
             synchronized (pagedInPendingDispatch) {
+                if (!redeliveredWaitingDispatch.isEmpty()) {
+                    // Try first to dispatch redelivered messages to keep an proper order
+                    redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
+                }
                 if (!pagedInPendingDispatch.isEmpty()) {
-                    // Try to first dispatch anything that had not been
+                    // Next dispatch anything that had not been
                     // dispatched before.
                     pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
                 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java?rev=941174&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java Wed May  5 07:09:37 2010
@@ -0,0 +1,118 @@
+package org.apache.activemq.bugs;
+
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.bugs.AMQ1866.ConsumerThread;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class OutOfOrderTestCase extends TestCase {
+	
+	private static final Log log = LogFactory.getLog(OutOfOrderTestCase.class);
+	
+	public static final String BROKER_URL = "tcp://localhost:61616";
+	private static final int PREFETCH = 10;
+	private static final String CONNECTION_URL = BROKER_URL + "?jms.prefetchPolicy.all=" + PREFETCH;
+
+	public static final String QUEUE_NAME = "QUEUE";
+	private static final String DESTINATION = "QUEUE?consumer.exclusive=true";
+	
+	BrokerService brokerService;
+	Session session;
+	Connection connection;
+	
+	int seq = 0;
+	
+	public void setUp() throws Exception {
+		brokerService = new BrokerService();
+		brokerService.setUseJmx(true);
+		brokerService.addConnector(BROKER_URL);
+		brokerService.deleteAllMessages();
+		brokerService.start();
+		
+		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
+		connection = connectionFactory.createConnection();
+		connection.start();
+		session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ 
+	}
+	
+
+	protected void tearDown() throws Exception {
+		session.close();
+		connection.close();
+		brokerService.stop();
+	}
+
+
+
+    public void testOrder() throws Exception {
+
+        log.info("Producing messages 0-29 . . .");
+        Destination destination = session.createQueue(DESTINATION);
+        final MessageProducer messageProducer = session
+                .createProducer(destination);
+        try {
+            for (int i = 0; i < 30; ++i) {
+                final Message message = session
+                        .createTextMessage(createMessageText(i));
+                message.setStringProperty("JMSXGroupID", "FOO");
+
+                messageProducer.send(message);
+                log.info("sent " + toString(message));
+            }
+        } finally {
+            messageProducer.close();
+        }
+
+        log.info("Consuming messages 0-9 . . .");
+        consumeBatch();
+
+        log.info("Consuming messages 10-19 . . .");
+        consumeBatch();
+
+        log.info("Consuming messages 20-29 . . .");
+        consumeBatch();
+    }
+	
+    protected void consumeBatch() throws Exception {
+        Destination destination = session.createQueue(DESTINATION);
+        final MessageConsumer messageConsumer = session.createConsumer(destination);
+        try {
+            for (int i = 0; i < 10; ++i) {
+                final Message message = messageConsumer.receive(1000L);
+                log.info("received " + toString(message));
+                assertEquals("Message out of order", createMessageText(seq++), ((TextMessage) message).getText());
+                message.acknowledge();
+            }
+        } finally {
+            messageConsumer.close();
+        }
+    }
+
+	private String toString(final Message message) throws JMSException {
+	    String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID();
+		if (message.getJMSRedelivered())
+			 ret += " (redelivered)";
+		return ret;
+		
+	}
+
+	private static String createMessageText(final int index) {
+		return "message #" + index;
+	}
+}
\ No newline at end of file