You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/12/01 17:49:19 UTC
qpid-jms git commit: add test to expose broker issue when committing
transacted message acks out of sequence with their original dispatch
Repository: qpid-jms
Updated Branches:
refs/heads/master b7e7ecdd5 -> b7f2b9ec1
add test to expose broker issue when committing transacted message acks out of sequence with their original dispatch
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b7f2b9ec
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b7f2b9ec
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b7f2b9ec
Branch: refs/heads/master
Commit: b7f2b9ec1dcf92225e70ca0f958e457c55e0ae90
Parents: b7e7ecd
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Dec 1 16:48:53 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Dec 1 16:48:53 2014 +0000
----------------------------------------------------------------------
.../transactions/JmsTransactedConsumerTest.java | 67 ++++++++++++++++++++
1 file changed, 67 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b7f2b9ec/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
index 468997d..4d06a87 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
+import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -41,6 +42,8 @@ import org.junit.Test;
*/
public class JmsTransactedConsumerTest extends AmqpTestSupport {
+ private static final String MSG_NUM = "MSG_NUM";
+
@Test(timeout = 60000)
public void testCreateConsumerFromTxSession() throws Exception {
connection = createAmqpConnection();
@@ -283,4 +286,68 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport {
assertEquals(3, jmsxDeliveryCount);
session.commit();
}
+
+ @Ignore //TODO: enable after fixing rollback issue on broker
+ @Test(timeout=30000)
+ public void testSessionTransactedCommitWithPriorityReordering() throws Exception {
+ connection = createAmqpConnection();
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+
+ connection.start();
+
+ MessageProducer pr = session.createProducer(queue);
+ for (int i = 1; i <= 2; i++) {
+ Message m = session.createTextMessage("TestMessage" + i);
+ m.setIntProperty(MSG_NUM, i);
+ pr.send(m, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ session.commit();
+
+ // Receive first message.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(5000);
+ assertNotNull(msg);
+ assertEquals(1, msg.getIntProperty(MSG_NUM));
+ assertEquals(Message.DEFAULT_PRIORITY, msg.getJMSPriority());
+
+ // Send a couple higher priority, expect them to 'overtake' upon arrival at the consumer.
+ for (int i = 3; i <= 4; i++) {
+ Message m = session.createTextMessage("TestMessage" + i);
+ m.setIntProperty(MSG_NUM, i);
+ pr.send(m, DeliveryMode.NON_PERSISTENT, 5 , Message.DEFAULT_TIME_TO_LIVE);
+ }
+ session.commit();
+
+ // Wait for them to arrive at the consumer
+ Thread.sleep(3000);
+
+ // Receive the other messages. Expect higher priority messages first.
+ msg = consumer.receive(50);
+ assertNotNull(msg);
+ assertEquals(3, msg.getIntProperty(MSG_NUM));
+ assertEquals(5, msg.getJMSPriority());
+
+ msg = consumer.receive(50);
+ assertNotNull(msg);
+ assertEquals(4, msg.getIntProperty(MSG_NUM));
+ assertEquals(5, msg.getJMSPriority());
+
+ msg = consumer.receive(50);
+ assertNotNull(msg);
+ assertEquals(2, msg.getIntProperty(MSG_NUM));
+ assertEquals(Message.DEFAULT_PRIORITY, msg.getJMSPriority());
+
+ session.commit();
+
+ // Send a couple messages to check the session still works.
+ for (int i = 5; i <= 6; i++) {
+ Message m = session.createTextMessage("TestMessage" + i);
+ m.setIntProperty(MSG_NUM, i);
+ pr.send(m, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY , Message.DEFAULT_TIME_TO_LIVE);
+ }
+ session.commit();
+
+ session.close();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org