You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/05/18 09:10:30 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6286 -
refine fix to distinguish multiple consumers in a transaction,
verify insertion at head will preserve order
Repository: activemq
Updated Branches:
refs/heads/master 2c3046b81 -> c2230fda4
https://issues.apache.org/jira/browse/AMQ-6286 - refine fix to distinguish multiple consumers in a transaction, verify insertion at head will preserve order
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c2230fda
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c2230fda
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c2230fda
Branch: refs/heads/master
Commit: c2230fda4bd8eed7bba3245ad4b11fcd8e3bd581
Parents: 2c3046b
Author: gtully <ga...@gmail.com>
Authored: Tue May 17 17:03:05 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed May 18 10:09:39 2016 +0100
----------------------------------------------------------------------
.../cursors/QueueDispatchPendingList.java | 10 +-
.../QueueOrderSingleTransactedConsumerTest.java | 108 ++++++++++++++++---
2 files changed, 102 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c2230fda/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
index a01795c..ae35b4e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
@@ -208,7 +208,7 @@ public class QueueDispatchPendingList implements PendingList {
}
public void addForRedelivery(List<MessageReference> list, boolean noConsumers) {
- if (noConsumers && redeliveredWaitingDispatch instanceof OrderedPendingList) {
+ if (noConsumers && redeliveredWaitingDispatch instanceof OrderedPendingList && willBeInOrder(list)) {
// a single consumer can expect repeatable redelivery order irrespective
// of transaction or prefetch boundaries
((OrderedPendingList)redeliveredWaitingDispatch).insertAtHead(list);
@@ -218,4 +218,12 @@ public class QueueDispatchPendingList implements PendingList {
}
}
}
+
+ private boolean willBeInOrder(List<MessageReference> list) {
+ // for a single consumer inserting at head will be in order w.r.t brokerSequence but
+ // will not be if there were multiple consumers in the mix even if this is the last
+ // consumer to close (noConsumers==true)
+ return !redeliveredWaitingDispatch.isEmpty() && list != null && !list.isEmpty() &&
+ redeliveredWaitingDispatch.iterator().next().getMessageId().getBrokerSequenceId() > list.get(list.size() - 1).getMessageId().getBrokerSequenceId();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/c2230fda/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java
index 78c50b3..87a967c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueOrderSingleTransactedConsumerTest.java
@@ -25,6 +25,8 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.Message;
@@ -39,13 +41,26 @@ import static org.junit.Assert.assertEquals;
public class QueueOrderSingleTransactedConsumerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(QueueOrderSingleTransactedConsumerTest.class);
+
BrokerService broker = null;
ActiveMQQueue dest = new ActiveMQQueue("Queue");
@Test
public void testSingleConsumerTxRepeat() throws Exception {
- publishMessages(100);
+ // effect the broker sequence id that is region wide
+ ActiveMQQueue dummyDest = new ActiveMQQueue("AnotherQueue");
+ publishMessagesWithOrderProperty(10, 0, dest);
+ publishMessagesWithOrderProperty(1, 0, dummyDest);
+
+ publishMessagesWithOrderProperty(10, 10, dest);
+ publishMessagesWithOrderProperty(1, 0, dummyDest);
+
+ publishMessagesWithOrderProperty(10, 20, dest);
+ publishMessagesWithOrderProperty(1, 0, dummyDest);
+
+ publishMessagesWithOrderProperty(5, 30, dest);
consumeVerifyOrderRollback(20);
consumeVerifyOrderRollback(10);
@@ -55,48 +70,105 @@ public class QueueOrderSingleTransactedConsumerTest {
@Test
public void testSingleSessionXConsumerTxRepeat() throws Exception {
- publishMessages(100);
+ publishMessagesWithOrderProperty(50);
+
+ Connection connection = getConnectionFactory().createConnection();
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumer = consumeVerifyOrder(session, 20);
+ messageConsumer.close();
+ session.rollback();
+ messageConsumer = consumeVerifyOrder(session, 10);
+ messageConsumer.close();
+ session.rollback();
+ messageConsumer = consumeVerifyOrder(session, 5);
+ messageConsumer.close();
+ session.commit();
+ connection.close();
+ }
+
+ @Test
+ public void tesXConsumerTxRepeat() throws Exception {
+
+ publishMessagesWithOrderProperty(10);
- Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
+ Connection connection = getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- consumeVerifyOrder(session, 20);
+ MessageConsumer messageConsumer = consumeVerifyOrder(session, 6);
+ messageConsumer.close();
+ messageConsumer = consumeVerifyOrder(session, 4, 6);
+
+ // rollback before close, so there are two consumers in the mix
session.rollback();
- consumeVerifyOrder(session, 10);
+
+ messageConsumer.close();
+
+ messageConsumer = consumeVerifyOrder(session, 10);
+ session.commit();
+ messageConsumer.close();
+ connection.close();
+ }
+
+ @Test
+ public void testSingleTxXConsumerTxRepeat() throws Exception {
+
+ publishMessagesWithOrderProperty(10);
+
+ Connection connection = getConnectionFactory().createConnection();
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumer = consumeVerifyOrder(session, 6);
+ messageConsumer.close();
+ messageConsumer = consumeVerifyOrder(session, 4, 6);
+ messageConsumer.close();
+
session.rollback();
- consumeVerifyOrder(session, 5);
+ messageConsumer = consumeVerifyOrder(session, 10);
session.commit();
+ messageConsumer.close();
+ connection.close();
}
private void consumeVerifyOrderRollback(final int num) throws Exception {
- Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
+ Connection connection = getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- consumeVerifyOrder(session, num);
+ MessageConsumer messageConsumer = consumeVerifyOrder(session, num);
+ messageConsumer.close();
session.rollback();
connection.close();
}
- private void consumeVerifyOrder(Session session, final int num) throws Exception {
+ private MessageConsumer consumeVerifyOrder(Session session, final int num) throws Exception {
+ return consumeVerifyOrder(session, num, 0);
+ }
+
+ private MessageConsumer consumeVerifyOrder(Session session, final int num, final int base) throws Exception {
MessageConsumer messageConsumer = session.createConsumer(dest);
for (int i=0; i<num; ) {
Message message = messageConsumer.receive(4000);
if (message != null) {
- assertEquals(i, message.getIntProperty("Order"));
+ assertEquals(i + base, message.getIntProperty("Order"));
i++;
+ LOG.debug("Received:" + message.getJMSMessageID() + ", Order: " + message.getIntProperty("Order"));
}
}
- messageConsumer.close();
+ return messageConsumer;
}
- private void publishMessages(int num) throws Exception {
- Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()).createConnection();
+ private void publishMessagesWithOrderProperty(int num) throws Exception {
+ publishMessagesWithOrderProperty(num, 0, dest);
+ }
+
+ private void publishMessagesWithOrderProperty(int num, int seqStart, ActiveMQQueue destination) throws Exception {
+ Connection connection = getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer messageProducer = session.createProducer(dest);
+ MessageProducer messageProducer = session.createProducer(destination);
TextMessage textMessage = session.createTextMessage("A");
for (int i=0; i<num; i++) {
- textMessage.setIntProperty("Order", i);
+ textMessage.setIntProperty("Order", i + seqStart);
messageProducer.send(textMessage);
}
}
@@ -130,4 +202,10 @@ public class QueueOrderSingleTransactedConsumerTest {
broker.stop();
}
}
+
+
+ private ActiveMQConnectionFactory getConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
+ }
+
}