You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/29 23:33:55 UTC
qpid-broker-j git commit: QPID-6933: [System Tests] Move
CommitRollbackTest and RollbackOrderTest into JMS 1.1 system tests
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 37b6c9dbc -> f30122249
QPID-6933: [System Tests] Move CommitRollbackTest and RollbackOrderTest into JMS 1.1 system tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f3012224
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f3012224
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f3012224
Branch: refs/heads/master
Commit: f3012224957ef02cdc2b737002e6992f093b92fd
Parents: 37b6c9d
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Nov 29 23:33:21 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 29 23:33:21 2017 +0000
----------------------------------------------------------------------
.../java/org/apache/qpid/systests/Utils.java | 33 +-
.../jms_1_1/acknowledge/AcknowledgeTest.java | 2 +-
.../jms_1_1/acknowledge/RecoverTest.java | 20 +-
.../jms_1_1/transaction/CommitRollbackTest.java | 656 ++++++++++++++++++
.../deliverycount/DeliveryCountTest.java | 2 +-
.../subscription/SharedSubscriptionTest.java | 14 +-
.../qpid/test/client/RollbackOrderTest.java | 193 ------
.../unit/transacted/CommitRollbackTest.java | 670 -------------------
test-profiles/CPPExcludes | 2 -
test-profiles/Java010Excludes | 3 -
test-profiles/Java10Excludes | 6 -
11 files changed, 700 insertions(+), 901 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
index dc975d6..c2dfe75 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
@@ -23,10 +23,12 @@ package org.apache.qpid.systests;
import java.util.ArrayList;
import java.util.List;
+import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
public class Utils
@@ -35,7 +37,36 @@ public class Utils
public static final String INDEX = "index";
private static final String DEFAULT_MESSAGE_PAYLOAD = createString(DEFAULT_MESSAGE_SIZE);
- public static List<Message> sendMessage(Session session, Destination destination, int count) throws Exception
+ public static void sendTextMessage(final Connection connection, final Destination destination, String message)
+ throws JMSException
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ MessageProducer producer = session.createProducer(destination);
+ producer.send(session.createTextMessage(message));
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+
+ public static void sendMessages(final Connection connection, final Destination destination, final int messageNumber)
+ throws JMSException
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ sendMessages(session, destination, messageNumber);
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+
+ public static List<Message> sendMessages(Session session, Destination destination, int count) throws JMSException
{
List<Message> messages = new ArrayList<>(count);
MessageProducer producer = session.createProducer(destination);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java
index 65eecc4..ea50e38 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java
@@ -54,7 +54,7 @@ public class AcknowledgeTest extends JmsTestBase
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
- Utils.sendMessage(session, queue, 2);
+ Utils.sendMessages(session, queue, 2);
Message message = consumer.receive(getReceiveTimeout());
assertNotNull("Message has not been received", message);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java
index cc41859..174eb31 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java
@@ -60,7 +60,7 @@ public class RecoverTest extends JmsTestBase
{
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
- produceTestMessages(connection, queue, SENT_COUNT);
+ Utils.sendMessages(connection, queue, SENT_COUNT);
connection.start();
Message message = receiveAndValidateMessage(consumer, 0);
@@ -89,7 +89,7 @@ public class RecoverTest extends JmsTestBase
{
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
- produceTestMessages(connection, queue, SENT_COUNT);
+ Utils.sendMessages(connection, queue, SENT_COUNT);
connection.start();
int messageSeen = 0;
@@ -175,7 +175,7 @@ public class RecoverTest extends JmsTestBase
{
Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(queue);
- produceTestMessages(connection, queue, 1);
+ Utils.sendMessages(connection, queue, 1);
final CountDownLatch awaitMessages = new CountDownLatch(2);
final AtomicReference<Throwable> listenerCaughtException = new AtomicReference<>();
@@ -226,18 +226,4 @@ public class RecoverTest extends JmsTestBase
return message;
}
- private void produceTestMessages(final Connection connection, final Queue queue, final int messageNumber)
- throws Exception
- {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- try
- {
- Utils.sendMessage(session, queue, messageNumber);
- }
- finally
- {
- session.close();
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/transaction/CommitRollbackTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/transaction/CommitRollbackTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/transaction/CommitRollbackTest.java
new file mode 100644
index 0000000..d65822d
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/transaction/CommitRollbackTest.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.systests.jms_1_1.transaction;
+
+import static junit.framework.TestCase.fail;
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.systests.Utils;
+
+public class CommitRollbackTest extends JmsTestBase
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(CommitRollbackTest.class);
+
+ @Test
+ public void produceMessageAndAbortTransaction() throws Exception
+ {
+ final Queue queue = createQueue(getTestName());
+ Connection connection = getConnection();
+ try
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer messageProducer = session.createProducer(queue);
+ messageProducer.send(session.createTextMessage("A"));
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ Connection connection2 = getConnection();
+ try
+ {
+ connection2.start();
+ Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer messageProducer = session.createProducer(queue);
+ messageProducer.send(session.createTextMessage("B"));
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertTrue("Text message should be received", message instanceof TextMessage);
+ TextMessage textMessage = (TextMessage) message;
+ assertEquals("Unexpected message received", "B", textMessage.getText());
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+
+ @Test
+ public void produceMessageAndRollbackTransaction() throws Exception
+ {
+ final Queue queue = createQueue(getTestName());
+ Connection connection = getConnection();
+ try
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer messageProducer = session.createProducer(queue);
+ messageProducer.send(session.createTextMessage("A"));
+ session.rollback();
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ Connection connection2 = getConnection();
+ try
+ {
+ Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer messageProducer = session.createProducer(queue);
+ messageProducer.send(session.createTextMessage("B"));
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ connection2.start();
+
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertTrue("Text message should be received", message instanceof TextMessage);
+ TextMessage textMessage = (TextMessage) message;
+ assertEquals("Unexpected message received", "B", textMessage.getText());
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+
+ @Test
+ public void produceMessageAndCommitTransaction() throws Exception
+ {
+ final Queue queue = createQueue(getTestName());
+ Connection connection = getConnection();
+ try
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer messageProducer = session.createProducer(queue);
+ messageProducer.send(session.createTextMessage("A"));
+ session.commit();
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ Connection connection2 = getConnection();
+ try
+ {
+ Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ connection2.start();
+
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertTrue("Text message should be received", message instanceof TextMessage);
+ TextMessage textMessage = (TextMessage) message;
+ assertEquals("Unexpected message received", "A", textMessage.getText());
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+
+ @Test
+ public void receiveMessageAndAbortTransaction() throws Exception
+ {
+ final Queue queue = createQueue(getTestName());
+ Connection connection = getConnection();
+ try
+ {
+ Utils.sendTextMessage(connection, queue, "A");
+
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertTrue("Text message should be received", message instanceof TextMessage);
+ TextMessage textMessage = (TextMessage) message;
+ assertEquals("Unexpected message received", "A", textMessage.getText());
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ Connection connection2 = getConnection();
+ try
+ {
+ connection2.start();
+ Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertTrue("Text message should be received", message instanceof TextMessage);
+ TextMessage textMessage = (TextMessage) message;
+ assertEquals("Unexpected message received", "A", textMessage.getText());
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+
+ @Test
+ public void receiveMessageAndRollbackTransaction() throws Exception
+ {
+ final Queue queue = createQueue(getTestName());
+ Connection connection = getConnection();
+ try
+ {
+ Utils.sendTextMessage(connection, queue, "A");
+
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertTrue("Text message should be received", message instanceof TextMessage);
+ TextMessage textMessage = (TextMessage) message;
+ assertEquals("Unexpected message received", "A", textMessage.getText());
+ session.rollback();
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ Connection connection2 = getConnection();
+ try
+ {
+ Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ connection2.start();
+
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertTrue("Text message should be received", message instanceof TextMessage);
+ TextMessage textMessage = (TextMessage) message;
+ assertEquals("Unexpected message received", "A", textMessage.getText());
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+
+ @Test
+ public void receiveMessageAndCommitTransaction() throws Exception
+ {
+ final Queue queue = createQueue(getTestName());
+ Connection connection = getConnection();
+ try
+ {
+ Utils.sendMessages(connection, queue, 2);
+
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertNotNull("Message not received", message);
+ assertEquals("Unexpected message received", 0, message.getIntProperty(INDEX));
+ session.commit();
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ Connection connection2 = getConnection();
+ try
+ {
+ Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ connection2.start();
+
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertNotNull("Message not received", message);
+ assertEquals("Unexpected message received", 1, message.getIntProperty(INDEX));
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+
+ @Test
+ public void receiveMessageCloseConsumerAndCommitTransaction() throws Exception
+ {
+ final Queue queue = createQueue(getTestName());
+ Connection connection = getConnection();
+ try
+ {
+ Utils.sendMessages(connection, queue, 2);
+
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertNotNull("Message not received", message);
+ assertEquals("Unexpected message received", 0, message.getIntProperty(INDEX));
+ messageConsumer.close();
+ session.commit();
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ Connection connection2 = getConnection();
+ try
+ {
+ Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ connection2.start();
+
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertNotNull("Message not received", message);
+ assertEquals("Unexpected message received", 1, message.getIntProperty(INDEX));
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+
+ @Test
+ public void receiveMessageCloseConsumerAndRollbackTransaction() throws Exception
+ {
+ final Queue queue = createQueue(getTestName());
+ Connection connection = getConnection();
+ try
+ {
+ Utils.sendMessages(connection, queue, 2);
+
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertNotNull("Message not received", message);
+ assertEquals("Unexpected message received", 0, message.getIntProperty(INDEX));
+ messageConsumer.close();
+ session.rollback();
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ Connection connection2 = getConnection();
+ try
+ {
+ Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ connection2.start();
+
+ Message message = messageConsumer.receive(getReceiveTimeout());
+ assertNotNull("Message not received", message);
+ assertEquals("Unexpected message received", 0, message.getIntProperty(INDEX));
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+
+ @Test
+ public void transactionSharedByConsumers() throws Exception
+ {
+ final Queue queue1 = createQueue("Q1");
+ final Queue queue2 = createQueue("Q2");
+ Connection connection = getConnection();
+ try
+ {
+ Utils.sendTextMessage(connection, queue1, "queue1Message1");
+ Utils.sendTextMessage(connection, queue1, "queue1Message2");
+ Utils.sendTextMessage(connection, queue2, "queue2Message1");
+ Utils.sendTextMessage(connection, queue2, "queue2Message2");
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumer1 = session.createConsumer(queue1);
+ MessageConsumer messageConsumer2 = session.createConsumer(queue2);
+ connection.start();
+
+ Message message1 = messageConsumer1.receive(getReceiveTimeout());
+ assertTrue("Text message not received from first queue", message1 instanceof TextMessage);
+ assertEquals("Unexpected message received from first queue",
+ "queue1Message1",
+ ((TextMessage) message1).getText());
+
+ Message message2 = messageConsumer2.receive(getReceiveTimeout());
+ assertTrue("Text message not received from second queue", message2 instanceof TextMessage);
+ assertEquals("Unexpected message received from second queue",
+ "queue2Message1",
+ ((TextMessage) message2).getText());
+
+ session.rollback();
+
+ message1 = messageConsumer1.receive(getReceiveTimeout());
+ assertTrue("Text message not received from first queue", message1 instanceof TextMessage);
+ assertEquals("Unexpected message received from first queue",
+ "queue1Message1",
+ ((TextMessage) message1).getText());
+
+ message2 = messageConsumer2.receive(getReceiveTimeout());
+ assertTrue("Text message not received from second queue", message2 instanceof TextMessage);
+ assertEquals("Unexpected message received from second queue",
+ "queue2Message1",
+ ((TextMessage) message2).getText());
+
+ session.commit();
+
+ Message message3 = messageConsumer1.receive(getReceiveTimeout());
+ assertTrue("Text message not received from first queue", message3 instanceof TextMessage);
+ assertEquals("Unexpected message received from first queue",
+ "queue1Message2",
+ ((TextMessage) message3).getText());
+
+ Message message4 = messageConsumer2.receive(getReceiveTimeout());
+ assertTrue("Text message not received from second queue", message4 instanceof TextMessage);
+ assertEquals("Unexpected message received from second queue",
+ "queue2Message2",
+ ((TextMessage) message4).getText());
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ public void testCommitWithinOnMessage() throws Exception
+ {
+ final Queue queue = createQueue(getTestName());
+ Connection connection = getConnection();
+ int messageNumber = 2;
+ try
+ {
+ Utils.sendMessages(connection, queue, messageNumber);
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ final CountDownLatch receiveLatch = new CountDownLatch(messageNumber);
+ final AtomicInteger commitCounter = new AtomicInteger();
+ final AtomicReference<Throwable> messageConsumerThrowable = new AtomicReference<>();
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ messageConsumer.setMessageListener(message -> {
+ try
+ {
+ LOGGER.info("received message " + message);
+ assertEquals("Wrong message received", message.getJMSCorrelationID(), "m1");
+ LOGGER.info("commit session");
+ session.commit();
+ commitCounter.incrementAndGet();
+ }
+ catch (Throwable e)
+ {
+ messageConsumerThrowable.set(e);
+ LOGGER.error("Unexpected exception", e);
+ }
+ finally
+ {
+ receiveLatch.countDown();
+ }
+ });
+ connection.start();
+
+ assertTrue("Messages not received in expected time",
+ receiveLatch.await(getReceiveTimeout() * messageNumber, TimeUnit.MILLISECONDS));
+ assertNull("Unexpected exception: " + messageConsumerThrowable.get(), messageConsumerThrowable.get());
+ assertEquals("Unexpected number of commits", messageNumber, commitCounter.get());
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void exhaustedPrefetchInTransaction() throws Exception
+ {
+ final int maxPrefetch = 2;
+ final int messageNumber = maxPrefetch + 1;
+
+ final Queue queue = createQueue(getTestName());
+ Connection connection = getConnectionBuilder().setPrefetch(maxPrefetch).build();
+ try
+ {
+ Utils.sendMessages(connection, queue, messageNumber);
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ connection.start();
+
+ for (int i = 0; i < maxPrefetch; i++)
+ {
+ final Message message = messageConsumer.receive(getReceiveTimeout());
+ assertNotNull(String.format("Message %d not received", i), message);
+ assertEquals("Unexpected message received", i, message.getIntProperty(INDEX));
+ }
+
+ session.rollback();
+
+ for (int i = 0; i < maxPrefetch; i++)
+ {
+ final Message message = messageConsumer.receive(getReceiveTimeout());
+ assertNotNull(String.format("Message %d not received after rollback", i), message);
+ assertEquals("Unexpected message received after rollback", i, message.getIntProperty(INDEX));
+ }
+
+ session.commit();
+
+ final Message message = messageConsumer.receive(getReceiveTimeout());
+ assertNotNull(String.format("Message %d not received", maxPrefetch), message);
+ assertEquals("Unexpected message received", maxPrefetch, message.getIntProperty(INDEX));
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testMessageOrder() throws Exception
+ {
+ final int messageNumber = 4;
+ Queue queue = createQueue(getTestName());
+ Connection connection = getConnection();
+ try
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(queue);
+ Utils.sendMessages(connection, queue, messageNumber);
+ connection.start();
+
+ int messageSeen = 0;
+ int expectedIndex = 0;
+ while (expectedIndex < messageNumber)
+ {
+ Message message = consumer.receive(getReceiveTimeout());
+ assertNotNull(String.format("Expected message '%d' is not received", expectedIndex), message);
+ assertEquals("Received message out of order", expectedIndex, message.getIntProperty(INDEX));
+
+ //don't commit transaction for the message until we receive it 5 times
+ if (messageSeen < 5)
+ {
+ // receive remaining
+ for (int m = expectedIndex + 1; m < messageNumber; m++)
+ {
+ Message remaining = consumer.receive(getReceiveTimeout());
+ assertNotNull(String.format("Expected remaining message '%d' is not received", m), message);
+ assertEquals("Received remained message out of order", m, remaining.getIntProperty(INDEX));
+ }
+
+ LOGGER.debug(String.format("Rolling back transaction for message with index %d", expectedIndex));
+ session.rollback();
+ messageSeen++;
+ }
+ else
+ {
+ LOGGER.debug(String.format("Committing transaction for message with index %d", expectedIndex));
+ messageSeen = 0;
+ expectedIndex++;
+ session.commit();
+ }
+ }
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testRollbackSoak() throws Exception
+ {
+ final int messageNumber = 20;
+ final int maximumRollbacks = messageNumber * 2;
+ final int numberOfConsumers = 2;
+ long timeout = getReceiveTimeout() * (maximumRollbacks + messageNumber);
+ final AtomicInteger rollbackCounter = new AtomicInteger();
+ final AtomicInteger commitCounter = new AtomicInteger();
+ final AtomicBoolean shutdown = new AtomicBoolean();
+ List<ListenableFuture<Void>> consumerFutures = new ArrayList<>(numberOfConsumers);
+ final Queue queue = createQueue(getTestName());
+ final Connection connection = getConnectionBuilder().setPrefetch(messageNumber / numberOfConsumers).build();
+ try
+ {
+ Utils.sendMessages(connection, queue, messageNumber);
+ final ListeningExecutorService threadPool =
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfConsumers));
+ try
+ {
+ for (int i = 0; i < numberOfConsumers; ++i)
+ {
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ final MessageConsumer consumer = session.createConsumer(queue);
+ consumerFutures.add(threadPool.submit(() -> {
+ do
+ {
+ Message m = consumer.receive(getReceiveTimeout());
+ if (m != null)
+ {
+ if (rollbackCounter.incrementAndGet() <= maximumRollbacks)
+ {
+ session.rollback();
+ }
+ else
+ {
+ session.commit();
+ commitCounter.incrementAndGet();
+ }
+ }
+ }
+ while (commitCounter.get() < messageNumber
+ && !Thread.currentThread().isInterrupted()
+ && !shutdown.get());
+ return null;
+ }));
+ }
+ connection.start();
+
+ final ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(consumerFutures);
+ try
+ {
+ combinedFuture.get(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e)
+ {
+ fail(String.format(
+ "Test took more than %.1f seconds. All consumers probably starved."
+ + " Performed %d rollbacks, consumed %d/%d messages",
+ timeout / 1000.,
+ rollbackCounter.get(),
+ commitCounter.get(),
+ messageNumber));
+ }
+ finally
+ {
+ shutdown.set(true);
+ }
+ }
+ finally
+ {
+ threadPool.shutdown();
+ threadPool.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+ }
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
index 39e3322..40b4848 100644
--- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
@@ -60,7 +60,7 @@ public class DeliveryCountTest extends JmsTestBase
connection.start();
Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
_queue = session.createQueue(testQueueName);
- Utils.sendMessage(session, _queue, 1);
+ Utils.sendMessages(session, _queue, 1);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
index 32c60f5..f40ded7 100644
--- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
@@ -57,7 +57,7 @@ public class SharedSubscriptionTest extends JmsTestBase
MessageConsumer consumer1 = subscriber1Session.createSharedConsumer(topic, "subscription");
MessageConsumer consumer2 = subscriber2Session.createSharedConsumer(topic, "subscription");
- Utils.sendMessage(publishingSession, topic, 2);
+ Utils.sendMessages(publishingSession, topic, 2);
connection.start();
@@ -94,7 +94,7 @@ public class SharedSubscriptionTest extends JmsTestBase
MessageConsumer consumer1 = subscriber1Session.createSharedDurableConsumer(topic, "subscription");
MessageConsumer consumer2 = subscriber2Session.createSharedDurableConsumer(topic, "subscription");
- Utils.sendMessage(publishingSession, topic, 4);
+ Utils.sendMessages(publishingSession, topic, 4);
connection.start();
@@ -224,7 +224,7 @@ public class SharedSubscriptionTest extends JmsTestBase
MessageConsumer consumer2 = subscriberSession.createSharedConsumer(topic, "testSharedSubscription");
connection.start();
- Utils.sendMessage(publishingSession, topic, 1);
+ Utils.sendMessages(publishingSession, topic, 1);
Message message1 = consumer1.receive(getReceiveTimeout());
Message message2 = consumer2.receive(getReceiveTimeout());
@@ -254,7 +254,7 @@ public class SharedSubscriptionTest extends JmsTestBase
connection.start();
connection2.start();
- Utils.sendMessage(publishingSession, topic, 1);
+ Utils.sendMessages(publishingSession, topic, 1);
Message message1 = consumer1.receive(getReceiveTimeout());
Message message2 = consumer2.receive(getReceiveTimeout());
@@ -283,7 +283,7 @@ public class SharedSubscriptionTest extends JmsTestBase
MessageConsumer consumer1 =
subscriber1Session.createSharedDurableConsumer(topic, "subscription", "index>1");
- Utils.sendMessage(publishingSession, topic, 4);
+ Utils.sendMessages(publishingSession, topic, 4);
connection.start();
connection2.start();
@@ -320,7 +320,7 @@ public class SharedSubscriptionTest extends JmsTestBase
"No message should be received as re-subscribing with different topic or selector is equivalent to unsubscribe/subscribe",
message2);
- Utils.sendMessage(publishingSession, topic, 4);
+ Utils.sendMessages(publishingSession, topic, 4);
Message message3 = consumer2.receive(getReceiveTimeout());
assertNotNull("Should receive message 3", message3);
@@ -335,7 +335,7 @@ public class SharedSubscriptionTest extends JmsTestBase
"No message should be received as re-subscribing with different topic or selector is equivalent to unsubscribe/subscribe",
message4);
- Utils.sendMessage(publishingSession, topic2, 4);
+ Utils.sendMessages(publishingSession, topic2, 4);
Message message5 = consumer3.receive(getReceiveTimeout());
assertEquals("Unexpected index for message 5", 3, message5.getIntProperty(Utils.INDEX));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/src/test/java/org/apache/qpid/test/client/RollbackOrderTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/client/RollbackOrderTest.java b/systests/src/test/java/org/apache/qpid/test/client/RollbackOrderTest.java
deleted file mode 100644
index ea3352e..0000000
--- a/systests/src/test/java/org/apache/qpid/test/client/RollbackOrderTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.client;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import junit.framework.AssertionFailedError;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/**
- * RollbackOrderTest, QPID-1864, QPID-1871
- *
- * Description:
- *
- * The problem that this test is exposing is that the dispatcher used to be capable
- * of holding on to a message when stopped. This meant that when the rollback was
- * called and the dispatcher stopped it may have hold of a message. So after all
- * the local queues(preDeliveryQueue, SynchronousQueue, PostDeliveryTagQueue)
- * have been cleared the client still had a single message, the one the
- * dispatcher was holding on to.
- *
- * As a result the TxRollback operation would run and then release the dispatcher.
- * Whilst the dispatcher would then proceed to reject the message it was holding
- * the Broker would already have resent that message so the rejection would silently
- * fail.
- *
- * And the client would receive that single message 'early', depending on the
- * number of messages already received when rollback was called.
- *
- *
- * Aims:
- *
- * The tests puts 50 messages on to the queue.
- *
- * The test then tries to cause the dispatcher to stop whilst it is in the process
- * of moving a message from the preDeliveryQueue to a consumers sychronousQueue.
- *
- * To exercise this path we have 50 message flowing to the client to give the
- * dispatcher a bit of work to do moving messages.
- *
- * Then we loop - 10 times
- * - Validating that the first message received is always message 1.
- * - Receive a few more so that there are a few messages to reject.
- * - call rollback, to try and catch the dispatcher mid process.
- *
- * Outcome:
- *
- * The hope is that we catch the dispatcher mid process and cause a BasicReject
- * to fail. Which will be indicated in the log but will also cause that failed
- * rejected message to be the next to be delivered which will not be message 1
- * as expected.
- *
- * We are testing a race condition here but we can check through the log file if
- * the race condition occurred. However, performing that check will only validate
- * the problem exists and will not be suitable as part of a system test.
- *
- * @see org.apache.qpid.test.unit.transacted.CommitRollbackTest
- */
-public class RollbackOrderTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(RollbackOrderTest.class);
- private Connection _connection;
- private Queue _queue;
- private Session _session;
- private MessageConsumer _consumer;
-
- @Override public void setUp() throws Exception
- {
- super.setUp();
- _connection = getConnection();
-
- _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
- _queue = createTestQueue(_session);
- _consumer = _session.createConsumer(_queue);
-
- //Send more messages so it is more likely that the dispatcher is
- // processing on rollback.
- sendMessage(_session, _queue, 50);
- _session.commit();
-
- }
-
- public void testOrderingAfterRollback() throws Exception
- {
- //Start the session now so we
- _connection.start();
-
- for (int i = 0; i < 20; i++)
- {
- Message msg = _consumer.receive();
- assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
-
- // Pull additional messages through so we have some reject work to do
- for (int m=1; m <= 5 ; m++)
- {
- msg = _consumer.receive();
- assertEquals("Incorrect Message Received (message " + m + ")", m, msg.getIntProperty(INDEX));
- }
-
- _session.rollback();
- }
- }
-
- public void testOrderingAfterRollbackOnMessage() throws Exception
- {
- final CountDownLatch count= new CountDownLatch(20);
- final Exception exceptions[] = new Exception[20];
- final AtomicBoolean failed = new AtomicBoolean(false);
-
- _consumer.setMessageListener(new MessageListener()
- {
-
- @Override
- public void onMessage(Message message)
- {
-
- Message msg = message;
- try
- {
- count.countDown();
- assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
-
- _session.rollback();
- }
- catch (JMSException e)
- {
- LOGGER.error("Error:" + e.getMessage(), e);
- exceptions[(int)count.getCount()] = e;
- }
- catch (AssertionFailedError cf)
- {
- // End Test if Equality test fails
- while (count.getCount() != 0)
- {
- count.countDown();
- }
-
- LOGGER.error("Error:" + cf.getMessage(), cf);
- failed.set(true);
- }
- }
- });
- //Start the session now so we
- _connection.start();
-
- count.await(10l, TimeUnit.SECONDS);
- assertEquals("Not all message received. Count should be 0.", 0, count.getCount());
-
- for (Exception e : exceptions)
- {
- if (e != null)
- {
- LOGGER.error("Encountered exception", e);
- failed.set(true);
- }
- }
-
- _connection.close();
-
- assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
deleted file mode 100644
index c4eb3d5..0000000
--- a/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ /dev/null
@@ -1,670 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.test.unit.transacted;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.RejectBehaviour;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/**
- * This class tests a number of commits and roll back scenarios
- *
- * Assumptions; - Assumes empty Queue
- *
- * @see org.apache.qpid.test.client.RollbackOrderTest
- */
-public class CommitRollbackTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(CommitRollbackTest.class);
- private long _positiveTimeout;
- private long _negativeTimeout;
-
- private Connection _conn;
- private Session _session;
- private MessageProducer _publisher;
- private Session _pubSession;
- private MessageConsumer _consumer;
- private Queue _jmsQueue;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- _positiveTimeout = getReceiveTimeout();
- _negativeTimeout = getShortReceiveTimeout();
- }
-
- private void newConnection() throws Exception
- {
- LOGGER.debug("calling newConnection()");
- _conn = getConnection();
-
- _session = _conn.createSession(true, Session.SESSION_TRANSACTED);
-
- final String queueName = getTestQueueName();
- _jmsQueue = createTestQueue(_session);
- _session.commit();
- _consumer = _session.createConsumer(_jmsQueue);
-
- _pubSession = _conn.createSession(true, Session.SESSION_TRANSACTED);
-
- _publisher = _pubSession.createProducer(_pubSession.createQueue(queueName));
-
- _conn.start();
- }
-
- /**
- * PUT a text message, disconnect before commit, confirm it is gone.
- *
- * @throws Exception On error
- */
- public void testPutThenDisconnect() throws Exception
- {
- newConnection();
-
- assertTrue("session is not transacted", _session.getTransacted());
- assertTrue("session is not transacted", _pubSession.getTransacted());
-
- LOGGER.info("sending test message");
- String MESSAGE_TEXT = "testPutThenDisconnect";
- _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
- LOGGER.info("reconnecting without commit");
- _conn.close();
-
- newConnection();
-
- LOGGER.info("receiving result");
- Message result = _consumer.receive(_negativeTimeout);
-
- // commit to ensure message is removed from queue
- _session.commit();
-
- assertNull("test message was put and disconnected before commit, but is still present", result);
- }
-
-
- /**
- * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different
- * session as producer
- *
- * @throws Exception On error
- */
- public void testPutThenRollback() throws Exception
- {
- newConnection();
-
- assertTrue("session is not transacted", _session.getTransacted());
- assertTrue("session is not transacted", _pubSession.getTransacted());
-
- LOGGER.info("sending test message");
- String MESSAGE_TEXT = "testPutThenRollback";
- _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
- LOGGER.info("rolling back");
- _pubSession.rollback();
-
- LOGGER.info("receiving result");
- Message result = _consumer.receive(_negativeTimeout);
-
- assertNull("test message was put and rolled back, but is still present", result);
- }
-
- /**
- * GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection
- *
- * @throws Exception On error
- */
- public void testGetThenDisconnect() throws Exception
- {
- newConnection();
-
- assertTrue("session is not transacted", _session.getTransacted());
- assertTrue("session is not transacted", _pubSession.getTransacted());
-
- LOGGER.info("sending test message");
- String MESSAGE_TEXT = "testGetThenDisconnect";
- _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
- _pubSession.commit();
-
- LOGGER.info("getting test message");
-
- Message msg = _consumer.receive(_positiveTimeout);
- assertNotNull("retrieved message is null", msg);
-
- LOGGER.info("closing connection");
- _conn.close();
-
- newConnection();
-
- LOGGER.info("receiving result");
- Message result = _consumer.receive(_negativeTimeout);
-
- _session.commit();
-
- assertNotNull("test message was consumed and disconnected before commit, but is gone", result);
- assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
- }
-
- /**
- * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the
- * same connection but different session as producer
- *
- * @throws Exception On error
- */
- public void testGetThenCloseDisconnect() throws Exception
- {
- newConnection();
-
- assertTrue("session is not transacted", _session.getTransacted());
- assertTrue("session is not transacted", _pubSession.getTransacted());
-
- LOGGER.info("sending test message");
- String MESSAGE_TEXT = "testGetThenCloseDisconnect";
- _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
- _pubSession.commit();
-
- LOGGER.info("getting test message");
-
- Message msg = _consumer.receive(_positiveTimeout);
- assertNotNull("retrieved message is null", msg);
- assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
-
- LOGGER.info("reconnecting without commit");
- _consumer.close();
- _conn.close();
-
- newConnection();
-
- LOGGER.info("receiving result");
- Message result = _consumer.receive(_positiveTimeout);
-
- _session.commit();
-
- assertNotNull("test message was consumed and disconnected before commit, but is gone", result);
- assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
- }
-
- /**
- * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt
- * session to the producer
- *
- * @throws Exception On error
- */
- public void testGetThenRollback() throws Exception
- {
- newConnection();
-
- assertTrue("session is not transacted", _session.getTransacted());
- assertTrue("session is not transacted", _pubSession.getTransacted());
-
- LOGGER.info("sending test message");
- String MESSAGE_TEXT = "testGetThenRollback";
- _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
- _pubSession.commit();
-
- LOGGER.info("getting test message");
-
- Message msg = _consumer.receive(_positiveTimeout);
-
- assertNotNull("retrieved message is null", msg);
- assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
-
- LOGGER.info("rolling back");
-
- _session.rollback();
-
- LOGGER.info("receiving result");
-
- Message result = _consumer.receive(_positiveTimeout);
-
- _session.commit();
- assertNotNull("test message was consumed and rolled back, but is gone", result);
- assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
- assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
- }
-
- /**
- * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same
- * connection but different session as producer
- *
- * @throws Exception On error
- */
- public void testGetThenCloseRollback() throws Exception
- {
- newConnection();
-
- assertTrue("session is not transacted", _session.getTransacted());
- assertTrue("session is not transacted", _pubSession.getTransacted());
-
- LOGGER.info("sending test message");
- String MESSAGE_TEXT = "testGetThenCloseRollback";
- _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
- _pubSession.commit();
-
- LOGGER.info("getting test message");
-
- Message msg = _consumer.receive(_positiveTimeout);
-
- assertNotNull("retrieved message is null", msg);
- assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
-
- LOGGER.info("Closing consumer");
- _consumer.close();
-
- LOGGER.info("rolling back");
- _session.rollback();
-
- LOGGER.info("receiving result");
-
- _consumer = _session.createConsumer(_jmsQueue);
-
- Message result = _consumer.receive(_positiveTimeout);
-
- _session.commit();
- assertNotNull("test message was consumed and rolled back, but is gone", result);
- assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
- assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
- }
-
- /**
- * This test sends two messages receives one of them but doesn't ack it.
- * The consumer is then closed
- * the first message should be returned as redelivered.
- * the second message should be delivered normally.
- * @throws Exception
- */
- public void testSend2ThenCloseAfter1andTryAgain() throws Exception
- {
- newConnection();
-
- assertTrue("session is not transacted", _session.getTransacted());
- assertTrue("session is not transacted", _pubSession.getTransacted());
-
- LOGGER.info("sending two test messages");
- _publisher.send(_pubSession.createTextMessage("1"));
- _publisher.send(_pubSession.createTextMessage("2"));
- _pubSession.commit();
-
- LOGGER.info("getting test message");
- Message result = _consumer.receive(_positiveTimeout);
-
- assertNotNull("Message received should not be null", result);
- assertEquals("1", ((TextMessage) result).getText());
- assertTrue("Message is marked as redelivered" + result, !result.getJMSRedelivered());
-
- LOGGER.info("Closing Consumer");
-
- _consumer.close();
-
- LOGGER.info("Creating New consumer");
- _consumer = _session.createConsumer(_jmsQueue);
-
- LOGGER.info("receiving result");
-
-
- // Message 2 may be marked as redelivered if it was prefetched.
- result = _consumer.receive(_positiveTimeout);
- assertNotNull("Second message was not consumed, but is gone", result);
-
- // The first message back will be 2, message 1 has been received but not committed
- // Closing the consumer does not commit the session.
-
- // if this is message 1 then it should be marked as redelivered
- if("1".equals(((TextMessage) result).getText()))
- {
- fail("First message was received again");
- }
-
- result = _consumer.receive(_negativeTimeout);
- assertNull("test message should be null:" + result, result);
-
- _session.commit();
- }
-
- public void testPutThenRollbackThenGet() throws Exception
- {
- newConnection();
-
- assertTrue("session is not transacted", _session.getTransacted());
- assertTrue("session is not transacted", _pubSession.getTransacted());
-
- LOGGER.info("sending test message");
- String MESSAGE_TEXT = "testPutThenRollbackThenGet";
-
- _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
- _pubSession.commit();
-
- assertNotNull(_consumer.receive(_positiveTimeout));
-
- _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
- LOGGER.info("rolling back");
- _pubSession.rollback();
-
- LOGGER.info("receiving result");
- Message result = _consumer.receive(_negativeTimeout);
- assertNull("test message was put and rolled back, but is still present", result);
-
- _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
- _pubSession.commit();
-
- assertNotNull(_consumer.receive(_positiveTimeout));
-
- _session.commit();
- }
-
- /**
- * Qpid-1163
- * Check that when commit is called inside onMessage then
- * the last message is nor redelivered.
- *
- * @throws Exception
- */
- public void testCommitWithinOnMessage() throws Exception
- {
- newConnection();
-
- Queue queue = createTestQueue(_session,"example.queue");
- _session.commit();
-
- // create a consumer
- MessageConsumer cons = _session.createConsumer(queue);
- MessageProducer prod = _session.createProducer(queue);
- Message message = _session.createTextMessage("Message");
- message.setJMSCorrelationID("m1");
- prod.send(message);
- _session.commit();
- LOGGER.info("Sent message to queue");
- CountDownLatch cd = new CountDownLatch(1);
- cons.setMessageListener(new CommitWithinOnMessageListener(cd));
- _conn.start();
- cd.await(30, TimeUnit.SECONDS);
- if( cd.getCount() > 0 )
- {
- fail("Did not received message");
- }
- // Check that the message has been dequeued
- _session.close();
- _conn.close();
- _conn = getConnection();
- _conn.start();
- Session session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- cons = session.createConsumer(queue);
- message = cons.receiveNoWait();
- if(message != null)
- {
- if(message.getJMSCorrelationID().equals("m1"))
- {
- fail("received message twice");
- }
- else
- {
- fail("queue should have been empty, received message: " + message);
- }
- }
- }
-
- /**
- * This test ensures that after exhausting credit (prefetch), a {@link Session#rollback()} successfully
- * restores credit and allows the same messages to be re-received.
- */
- public void testRollbackSessionAfterCreditExhausted() throws Exception
- {
- final int maxPrefetch= 5;
-
- // We send more messages than prefetch size. This ensure that if the 0-10 client were to
- // complete the message commands before the rollback command is sent, the broker would
- // send additional messages utilising the release credit. This problem would manifest itself
- // as an incorrect message (or no message at all) being received at the end of the test.
-
- final int numMessages = maxPrefetch * 2;
-
- setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, String.valueOf(maxPrefetch));
-
- newConnection();
-
- assertEquals("Prefetch not reset", maxPrefetch, ((AMQSession<?, ?>)_session).getDefaultPrefetch());
-
- assertTrue("session is not transacted", _session.getTransacted());
- assertTrue("session is not transacted", _pubSession.getTransacted());
-
- sendMessage(_pubSession, _publisher.getDestination(), numMessages);
- _pubSession.commit();
-
- for (int i=0 ;i< maxPrefetch; i++)
- {
- final Message message = _consumer.receive(_positiveTimeout);
- assertNotNull("Received:" + i, message);
- assertEquals("Unexpected message received", i, message.getIntProperty(INDEX));
- }
-
- LOGGER.info("Rolling back");
- _session.rollback();
-
- LOGGER.info("Receiving messages");
-
- Message result = _consumer.receive(_positiveTimeout);
- assertNotNull("Message expected", result);
- // Expect the first message
- assertEquals("Unexpected message received", 0, result.getIntProperty(INDEX));
- }
-
- private class CommitWithinOnMessageListener implements MessageListener
- {
- private CountDownLatch _cd;
- private CommitWithinOnMessageListener(CountDownLatch cd)
- {
- _cd = cd;
- }
- @Override
- public void onMessage(Message message)
- {
- try
- {
- LOGGER.info("received message " + message);
- assertEquals("Wrong message received", message.getJMSCorrelationID(), "m1");
- LOGGER.info("commit session");
- _session.commit();
- _cd.countDown();
- }
- catch (JMSException e)
- {
- LOGGER.error("OnMessage error",e);
- }
- }
- }
-
- public void testRollbackSoak() throws Exception
- {
- newConnection();
- _consumer.close();
-
- final int rollbackTime = 2000;
- final int numberOfMessages = 1000;
- final int numberOfConsumers = 2;
- final long testTimeout = 60*1000L;
- sendMessage(_pubSession, _jmsQueue, numberOfMessages);
-
- List<ListenableFuture<Void >> consumerFutures = new ArrayList<>(numberOfConsumers);
- final ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfConsumers));
-
- try
- {
- final CountDownLatch modeFlippedLatch = new CountDownLatch(1);
- final AtomicInteger counter = new AtomicInteger();
- final AtomicInteger rollbackCounter = new AtomicInteger();
- final long flipModeTime = System.currentTimeMillis() + rollbackTime;
- final AtomicBoolean shutdown = new AtomicBoolean();
-
- for (int i = 0; i < numberOfConsumers; ++i)
- {
- consumerFutures.add(threadPool.submit(new Callable<Void>()
- {
- @Override
- public Void call() throws Exception
- {
- Session session = _conn.createSession(true, Session.SESSION_TRANSACTED);
- final MessageConsumer consumer = session.createConsumer(_jmsQueue);
-
- while(!shutdown.get())
- {
- Message m = consumer.receive(_positiveTimeout);
- if (m != null)
- {
- long currentTime = System.currentTimeMillis();
- if (currentTime < flipModeTime)
- {
- session.rollback();
- rollbackCounter.incrementAndGet();
- }
- else
- {
- modeFlippedLatch.countDown();
- counter.incrementAndGet();
- session.commit();
- }
- }
-
- if (counter.get() == numberOfMessages)
- {
- break;
- }
-
- if (Thread.currentThread().isInterrupted())
- {
- break;
- }
- }
-
- return null;
- }
- }));
- }
-
- final ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(consumerFutures);
- modeFlippedLatch.await(rollbackTime * 2, TimeUnit.MILLISECONDS);
- try
- {
- combinedFuture.get(testTimeout, TimeUnit.MILLISECONDS);
- LOGGER.debug("Performed {} rollbacks, consumed {}/{} messages",
- rollbackCounter.get(),
- counter.get(),
- numberOfMessages);
- }
- catch (TimeoutException e)
- {
- fail(String.format(
- "Test took more than %.1f seconds. All consumers probably starved. Performed %d rollbacks, consumed %d/%d messages",
- testTimeout / 1000.,
- rollbackCounter.get(),
- counter.get(),
- numberOfMessages));
- }
- finally
- {
- shutdown.set(true);
- }
- assertEquals(String.format(
- "Unexpected number of messages received. Performed %d rollbacks, consumed %d/%d messages",
- rollbackCounter.get(),
- counter.get(),
- numberOfMessages), numberOfMessages, counter.get());
- }
- finally
- {
- threadPool.shutdownNow();
- threadPool.awaitTermination(2 * _positiveTimeout, TimeUnit.SECONDS);
- }
- }
-
- public void testResendUnseenMessagesAfterRollback() throws Exception
- {
- resendAfterRollback();
- }
-
- public void testResendUnseenMessagesAfterRollbackWithServerReject() throws Exception
- {
- setTestSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.SERVER.toString());
- resendAfterRollback();
- }
-
- private void resendAfterRollback() throws Exception
- {
- newConnection();
-
- assertTrue("session is not transacted", _session.getTransacted());
- assertTrue("session is not transacted", _pubSession.getTransacted());
-
- LOGGER.info("sending test message");
- String MESSAGE_TEXT = "message text";
-
- _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
- _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
- _pubSession.commit();
-
- assertNotNull("two messages were sent, but none has been received", _consumer.receive(_positiveTimeout));
-
- _session.rollback();
-
- LOGGER.info("receiving result");
-
- assertNotNull("two messages were sent, but none has been received", _consumer.receive(_positiveTimeout));
- assertNotNull("two messages were sent, but only one has been received", _consumer.receive(_positiveTimeout));
- assertNull("Only two messages were sent, but more have been received", _consumer.receive(_negativeTimeout));
-
- _session.commit();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 973be36..7bea664 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -112,8 +112,6 @@ org.apache.qpid.client.failover.FailoverBehaviourTest#testFlowControlFlagResetOn
org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFailoverHandlerTimeoutExpires
org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowControlFlagResetOnFailover
-org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage#*
-
// Excluded because plugins from Qpid Broker-J are not used in CPP broker
org.apache.qpid.server.virtualhost.plugin.*
org.apache.qpid.info.test.*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/test-profiles/Java010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java010Excludes b/test-profiles/Java010Excludes
index c67bc36..2846875 100755
--- a/test-profiles/Java010Excludes
+++ b/test-profiles/Java010Excludes
@@ -37,9 +37,6 @@ org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartConsumerFlowSt
// 0-10 is not supported by the MethodRegistry
org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
-//QPID-1864: rollback with subscriptions does not work in 0-10 yet
-org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage
-
// QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs.
org.apache.qpid.server.failover.FailoverMethodTest#*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/test-profiles/Java10Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index 56ee05a..52e1ba4 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -36,12 +36,6 @@ org.apache.qpid.server.failover.FailoverMethodTest#*
// Testing that the 0-x implementation of a durable topic does not cause queue growth when messages are excluded by selectors
org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesHandledCorrectly
-// These tests explicitly meddle with prefetch - maybe there should be equivalents for AMQP 1.0
-org.apache.qpid.test.unit.transacted.CommitRollbackTest#testRollbackSessionAfterCreditExhausted
-
-// Excluded due to client issue QPIDJMS-231: Prefetched messages are not released on consumer close
-org.apache.qpid.test.unit.transacted.CommitRollbackTest#testSend2ThenCloseAfter1andTryAgain
-
// This test covers the client version specific mechanisms for restricting the types of Object which can be sent via an ObjectMessage
org.apache.qpid.client.message.ObjectMessageClassWhitelistingTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org