You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/05/05 09:09:37 UTC
svn commit: r941174 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/Queue.java
test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
Author: dejanb
Date: Wed May 5 07:09:37 2010
New Revision: 941174
URL: http://svn.apache.org/viewvc?rev=941174&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2719 - order of redelivered messages
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=941174&r1=941173&r2=941174&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed May 5 07:09:37 2010
@@ -95,6 +95,7 @@ public class Queue extends BaseDestinati
private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
// Messages that are paged in but have not yet been targeted at a subscription
private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
+ private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>();
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
@@ -377,7 +378,7 @@ public class Queue extends BaseDestinati
wakeup();
}
}
-
+
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) throws Exception {
destinationStatistics.getConsumers().decrement();
// synchronize with dispatch method so that no new messages are sent
@@ -406,7 +407,7 @@ public class Queue extends BaseDestinati
getMessageGroupOwners().removeConsumer(consumerId);
// redeliver inflight messages
- List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
+
for (MessageReference ref : sub.remove(context, this)) {
QueueMessageReference qmr = (QueueMessageReference) ref;
if (qmr.getLockOwner() == sub) {
@@ -416,11 +417,10 @@ public class Queue extends BaseDestinati
qmr.incrementRedeliveryCounter();
}
}
- list.add(qmr);
+ redeliveredWaitingDispatch.add(qmr);
}
-
- if (!list.isEmpty()) {
- doDispatch(list);
+ if (!redeliveredWaitingDispatch.isEmpty()) {
+ doDispatch(new ArrayList());
}
}
if (!(this.optimizedDispatch || isSlave())) {
@@ -1220,7 +1220,7 @@ public class Queue extends BaseDestinati
// Perhaps we should page always into the pagedInPendingDispatch list if
// !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
// then we do a dispatch.
- if (pageInMoreMessages || pendingBrowserDispatch != null) {
+ if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
try {
pageInMessages(pendingBrowserDispatch != null);
@@ -1494,8 +1494,12 @@ public class Queue extends BaseDestinati
synchronized (dispatchMutex) {
synchronized (pagedInPendingDispatch) {
+ if (!redeliveredWaitingDispatch.isEmpty()) {
+ // Try first to dispatch redelivered messages to keep an proper order
+ redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
+ }
if (!pagedInPendingDispatch.isEmpty()) {
- // Try to first dispatch anything that had not been
+ // Next dispatch anything that had not been
// dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java?rev=941174&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java Wed May 5 07:09:37 2010
@@ -0,0 +1,118 @@
+package org.apache.activemq.bugs;
+
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.bugs.AMQ1866.ConsumerThread;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class OutOfOrderTestCase extends TestCase {
+
+ private static final Log log = LogFactory.getLog(OutOfOrderTestCase.class);
+
+ public static final String BROKER_URL = "tcp://localhost:61616";
+ private static final int PREFETCH = 10;
+ private static final String CONNECTION_URL = BROKER_URL + "?jms.prefetchPolicy.all=" + PREFETCH;
+
+ public static final String QUEUE_NAME = "QUEUE";
+ private static final String DESTINATION = "QUEUE?consumer.exclusive=true";
+
+ BrokerService brokerService;
+ Session session;
+ Connection connection;
+
+ int seq = 0;
+
+ public void setUp() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setUseJmx(true);
+ brokerService.addConnector(BROKER_URL);
+ brokerService.deleteAllMessages();
+ brokerService.start();
+
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ }
+
+
+ protected void tearDown() throws Exception {
+ session.close();
+ connection.close();
+ brokerService.stop();
+ }
+
+
+
+ public void testOrder() throws Exception {
+
+ log.info("Producing messages 0-29 . . .");
+ Destination destination = session.createQueue(DESTINATION);
+ final MessageProducer messageProducer = session
+ .createProducer(destination);
+ try {
+ for (int i = 0; i < 30; ++i) {
+ final Message message = session
+ .createTextMessage(createMessageText(i));
+ message.setStringProperty("JMSXGroupID", "FOO");
+
+ messageProducer.send(message);
+ log.info("sent " + toString(message));
+ }
+ } finally {
+ messageProducer.close();
+ }
+
+ log.info("Consuming messages 0-9 . . .");
+ consumeBatch();
+
+ log.info("Consuming messages 10-19 . . .");
+ consumeBatch();
+
+ log.info("Consuming messages 20-29 . . .");
+ consumeBatch();
+ }
+
+ protected void consumeBatch() throws Exception {
+ Destination destination = session.createQueue(DESTINATION);
+ final MessageConsumer messageConsumer = session.createConsumer(destination);
+ try {
+ for (int i = 0; i < 10; ++i) {
+ final Message message = messageConsumer.receive(1000L);
+ log.info("received " + toString(message));
+ assertEquals("Message out of order", createMessageText(seq++), ((TextMessage) message).getText());
+ message.acknowledge();
+ }
+ } finally {
+ messageConsumer.close();
+ }
+ }
+
+ private String toString(final Message message) throws JMSException {
+ String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID();
+ if (message.getJMSRedelivered())
+ ret += " (redelivered)";
+ return ret;
+
+ }
+
+ private static String createMessageText(final int index) {
+ return "message #" + index;
+ }
+}
\ No newline at end of file