You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/05/18 09:10:30 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6286 - refine fix to distinguish multiple consumers in a transaction, verify insertion at head will preserve order

Repository: activemq
Updated Branches:
  refs/heads/master 2c3046b81 -> c2230fda4


https://issues.apache.org/jira/browse/AMQ-6286 - refine fix to distinguish multiple consumers in a transaction, verify insertion at head will preserve order


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c2230fda
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c2230fda
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c2230fda

Branch: refs/heads/master
Commit: c2230fda4bd8eed7bba3245ad4b11fcd8e3bd581
Parents: 2c3046b
Author: gtully <ga...@gmail.com>
Authored: Tue May 17 17:03:05 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed May 18 10:09:39 2016 +0100

----------------------------------------------------------------------
 .../cursors/QueueDispatchPendingList.java       |  10 +-
 .../QueueOrderSingleTransactedConsumerTest.java | 108 ++++++++++++++++---
 2 files changed, 102 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c2230fda/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
index a01795c..ae35b4e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
@@ -208,7 +208,7 @@ public class QueueDispatchPendingList implements PendingList {
     }
 
     public void addForRedelivery(List<MessageReference> list, boolean noConsumers) {
-        if (noConsumers && redeliveredWaitingDispatch instanceof OrderedPendingList) {
+        if (noConsumers && redeliveredWaitingDispatch instanceof OrderedPendingList && willBeInOrder(list)) {
             // a single consumer can expect repeatable redelivery order irrespective
             // of transaction or prefetch boundaries
             ((OrderedPendingList)redeliveredWaitingDispatch).insertAtHead(list);
@@ -218,4 +218,12 @@ public class QueueDispatchPendingList implements PendingList {
             }
         }
     }
+
+    private boolean willBeInOrder(List<MessageReference> list) {
+        // for a single consumer inserting at head will be in order w.r.t brokerSequence but
+        // will not be if there were multiple consumers in the mix even if this is the last
+        // consumer to close (noConsumers==true)
+        return !redeliveredWaitingDispatch.isEmpty() && list != null && !list.isEmpty() &&
+            redeliveredWaitingDispatch.iterator().next().getMessageId().getBrokerSequenceId() > list.get(list.size() - 1).getMessageId().getBrokerSequenceId();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/c2230fda/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java
index 78c50b3..87a967c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java
@@ -25,6 +25,8 @@ import org.apache.activemq.command.ActiveMQQueue;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -39,13 +41,26 @@ import static org.junit.Assert.assertEquals;
 
 public class QueueOrderSingleTransactedConsumerTest {
 
+    private static final Logger LOG = LoggerFactory.getLogger(QueueOrderSingleTransactedConsumerTest.class);
+
     BrokerService broker = null;
     ActiveMQQueue dest = new ActiveMQQueue("Queue");
 
     @Test
     public void testSingleConsumerTxRepeat() throws Exception {
 
-        publishMessages(100);
+        // effect the broker sequence id that is region wide
+        ActiveMQQueue dummyDest = new ActiveMQQueue("AnotherQueue");
+        publishMessagesWithOrderProperty(10, 0, dest);
+        publishMessagesWithOrderProperty(1, 0, dummyDest);
+
+        publishMessagesWithOrderProperty(10, 10, dest);
+        publishMessagesWithOrderProperty(1, 0, dummyDest);
+
+        publishMessagesWithOrderProperty(10, 20, dest);
+        publishMessagesWithOrderProperty(1, 0, dummyDest);
+
+        publishMessagesWithOrderProperty(5, 30, dest);
 
         consumeVerifyOrderRollback(20);
         consumeVerifyOrderRollback(10);
@@ -55,48 +70,105 @@ public class QueueOrderSingleTransactedConsumerTest {
     @Test
     public void testSingleSessionXConsumerTxRepeat() throws Exception {
 
-        publishMessages(100);
+        publishMessagesWithOrderProperty(50);
+
+        Connection connection = getConnectionFactory().createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer messageConsumer = consumeVerifyOrder(session, 20);
+        messageConsumer.close();
+        session.rollback();
+        messageConsumer = consumeVerifyOrder(session, 10);
+        messageConsumer.close();
+        session.rollback();
+        messageConsumer = consumeVerifyOrder(session, 5);
+        messageConsumer.close();
+        session.commit();
+        connection.close();
+    }
+
+    @Test
+    public void tesXConsumerTxRepeat() throws Exception {
+
+        publishMessagesWithOrderProperty(10);
 
-        Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
+        Connection connection = getConnectionFactory().createConnection();
         connection.start();
         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        consumeVerifyOrder(session, 20);
+        MessageConsumer messageConsumer = consumeVerifyOrder(session, 6);
+        messageConsumer.close();
+        messageConsumer = consumeVerifyOrder(session, 4, 6);
+
+        // rollback before close, so there are two consumers in the mix
         session.rollback();
-        consumeVerifyOrder(session, 10);
+
+        messageConsumer.close();
+
+        messageConsumer = consumeVerifyOrder(session, 10);
+        session.commit();
+        messageConsumer.close();
+        connection.close();
+    }
+
+    @Test
+    public void testSingleTxXConsumerTxRepeat() throws Exception {
+
+        publishMessagesWithOrderProperty(10);
+
+        Connection connection = getConnectionFactory().createConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer messageConsumer = consumeVerifyOrder(session, 6);
+        messageConsumer.close();
+        messageConsumer = consumeVerifyOrder(session, 4, 6);
+        messageConsumer.close();
+
         session.rollback();
-        consumeVerifyOrder(session, 5);
+        messageConsumer = consumeVerifyOrder(session, 10);
         session.commit();
+        messageConsumer.close();
+        connection.close();
     }
 
     private void consumeVerifyOrderRollback(final int num) throws Exception {
-        Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
+        Connection connection = getConnectionFactory().createConnection();
         connection.start();
         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        consumeVerifyOrder(session, num);
+        MessageConsumer messageConsumer = consumeVerifyOrder(session, num);
+        messageConsumer.close();
         session.rollback();
         connection.close();
     }
 
-    private void consumeVerifyOrder(Session session, final int num) throws Exception {
+    private MessageConsumer consumeVerifyOrder(Session session, final int num) throws Exception {
+        return consumeVerifyOrder(session, num, 0);
+    }
+
+    private MessageConsumer consumeVerifyOrder(Session session, final int num, final int base) throws Exception {
         MessageConsumer messageConsumer = session.createConsumer(dest);
         for (int i=0; i<num; ) {
             Message message = messageConsumer.receive(4000);
             if (message != null) {
-                assertEquals(i, message.getIntProperty("Order"));
+                assertEquals(i + base, message.getIntProperty("Order"));
                 i++;
+                LOG.debug("Received:" + message.getJMSMessageID() + ", Order: " + message.getIntProperty("Order"));
             }
         }
-        messageConsumer.close();
+        return messageConsumer;
     }
 
-    private void publishMessages(int num) throws Exception {
-        Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
+    private void publishMessagesWithOrderProperty(int num) throws Exception {
+        publishMessagesWithOrderProperty(num, 0, dest);
+    }
+
+    private void publishMessagesWithOrderProperty(int num, int seqStart, ActiveMQQueue destination) throws Exception {
+        Connection connection = getConnectionFactory().createConnection();
         connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer messageProducer = session.createProducer(dest);
+        MessageProducer messageProducer = session.createProducer(destination);
         TextMessage textMessage = session.createTextMessage("A");
         for (int i=0; i<num; i++) {
-            textMessage.setIntProperty("Order", i);
+            textMessage.setIntProperty("Order", i + seqStart);
             messageProducer.send(textMessage);
         }
     }
@@ -130,4 +202,10 @@ public class QueueOrderSingleTransactedConsumerTest {
             broker.stop();
         }
     }
+
+
+    private ActiveMQConnectionFactory getConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
+    }
+
 }