You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/29 23:33:55 UTC

qpid-broker-j git commit: QPID-6933: [System Tests] Move CommitRollbackTest and RollbackOrderTest into JMS 1.1 system tests

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 37b6c9dbc -> f30122249


QPID-6933: [System Tests] Move CommitRollbackTest and RollbackOrderTest into JMS 1.1 system tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f3012224
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f3012224
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f3012224

Branch: refs/heads/master
Commit: f3012224957ef02cdc2b737002e6992f093b92fd
Parents: 37b6c9d
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Nov 29 23:33:21 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 29 23:33:21 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/qpid/systests/Utils.java    |  33 +-
 .../jms_1_1/acknowledge/AcknowledgeTest.java    |   2 +-
 .../jms_1_1/acknowledge/RecoverTest.java        |  20 +-
 .../jms_1_1/transaction/CommitRollbackTest.java | 656 ++++++++++++++++++
 .../deliverycount/DeliveryCountTest.java        |   2 +-
 .../subscription/SharedSubscriptionTest.java    |  14 +-
 .../qpid/test/client/RollbackOrderTest.java     | 193 ------
 .../unit/transacted/CommitRollbackTest.java     | 670 -------------------
 test-profiles/CPPExcludes                       |   2 -
 test-profiles/Java010Excludes                   |   3 -
 test-profiles/Java10Excludes                    |   6 -
 11 files changed, 700 insertions(+), 901 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
index dc975d6..c2dfe75 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
@@ -23,10 +23,12 @@ package org.apache.qpid.systests;
 import java.util.ArrayList;
 import java.util.List;
 
+import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 
 public class Utils
@@ -35,7 +37,36 @@ public class Utils
     public static final String INDEX = "index";
     private static final String DEFAULT_MESSAGE_PAYLOAD = createString(DEFAULT_MESSAGE_SIZE);
 
-    public static List<Message> sendMessage(Session session, Destination destination, int count) throws Exception
+    public static void sendTextMessage(final Connection connection, final Destination destination, String message)
+            throws JMSException
+    {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        try
+        {
+            MessageProducer producer = session.createProducer(destination);
+            producer.send(session.createTextMessage(message));
+        }
+        finally
+        {
+            session.close();
+        }
+    }
+
+    public static void sendMessages(final Connection connection, final Destination destination, final int messageNumber)
+            throws JMSException
+    {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        try
+        {
+            sendMessages(session, destination, messageNumber);
+        }
+        finally
+        {
+            session.close();
+        }
+    }
+
+    public static List<Message> sendMessages(Session session, Destination destination, int count) throws JMSException
     {
         List<Message> messages = new ArrayList<>(count);
         MessageProducer producer = session.createProducer(destination);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java
index 65eecc4..ea50e38 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java
@@ -54,7 +54,7 @@ public class AcknowledgeTest extends JmsTestBase
             MessageConsumer consumer = session.createConsumer(queue);
             connection.start();
 
-            Utils.sendMessage(session, queue, 2);
+            Utils.sendMessages(session, queue, 2);
 
             Message message = consumer.receive(getReceiveTimeout());
             assertNotNull("Message has not been received", message);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java
index cc41859..174eb31 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java
@@ -60,7 +60,7 @@ public class RecoverTest extends JmsTestBase
         {
             Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
             MessageConsumer consumer = session.createConsumer(queue);
-            produceTestMessages(connection, queue, SENT_COUNT);
+            Utils.sendMessages(connection, queue, SENT_COUNT);
             connection.start();
 
             Message message = receiveAndValidateMessage(consumer, 0);
@@ -89,7 +89,7 @@ public class RecoverTest extends JmsTestBase
         {
             Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
             MessageConsumer consumer = session.createConsumer(queue);
-            produceTestMessages(connection, queue, SENT_COUNT);
+            Utils.sendMessages(connection, queue, SENT_COUNT);
             connection.start();
 
             int messageSeen = 0;
@@ -175,7 +175,7 @@ public class RecoverTest extends JmsTestBase
         {
             Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             MessageConsumer consumer = consumerSession.createConsumer(queue);
-            produceTestMessages(connection, queue, 1);
+            Utils.sendMessages(connection, queue, 1);
 
             final CountDownLatch awaitMessages = new CountDownLatch(2);
             final AtomicReference<Throwable> listenerCaughtException = new AtomicReference<>();
@@ -226,18 +226,4 @@ public class RecoverTest extends JmsTestBase
         return message;
     }
 
-    private void produceTestMessages(final Connection connection, final Queue queue, final int messageNumber)
-            throws Exception
-    {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        try
-        {
-            Utils.sendMessage(session, queue, messageNumber);
-        }
-        finally
-        {
-            session.close();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/transaction/CommitRollbackTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/transaction/CommitRollbackTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/transaction/CommitRollbackTest.java
new file mode 100644
index 0000000..d65822d
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/transaction/CommitRollbackTest.java
@@ -0,0 +1,656 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.systests.jms_1_1.transaction;
+
+import static junit.framework.TestCase.fail;
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.systests.Utils;
+
+public class CommitRollbackTest extends JmsTestBase
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(CommitRollbackTest.class);
+
+    @Test
+    public void produceMessageAndAbortTransaction() throws Exception
+    {
+        final Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageProducer messageProducer = session.createProducer(queue);
+            messageProducer.send(session.createTextMessage("A"));
+        }
+        finally
+        {
+            connection.close();
+        }
+
+        Connection connection2 = getConnection();
+        try
+        {
+            connection2.start();
+            Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer messageProducer = session.createProducer(queue);
+            messageProducer.send(session.createTextMessage("B"));
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertTrue("Text message should be received", message instanceof TextMessage);
+            TextMessage textMessage = (TextMessage) message;
+            assertEquals("Unexpected message received", "B", textMessage.getText());
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
+
+    @Test
+    public void produceMessageAndRollbackTransaction() throws Exception
+    {
+        final Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageProducer messageProducer = session.createProducer(queue);
+            messageProducer.send(session.createTextMessage("A"));
+            session.rollback();
+        }
+        finally
+        {
+            connection.close();
+        }
+
+        Connection connection2 = getConnection();
+        try
+        {
+            Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer messageProducer = session.createProducer(queue);
+            messageProducer.send(session.createTextMessage("B"));
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            connection2.start();
+
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertTrue("Text message should be received", message instanceof TextMessage);
+            TextMessage textMessage = (TextMessage) message;
+            assertEquals("Unexpected message received", "B", textMessage.getText());
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
+
+    @Test
+    public void produceMessageAndCommitTransaction() throws Exception
+    {
+        final Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageProducer messageProducer = session.createProducer(queue);
+            messageProducer.send(session.createTextMessage("A"));
+            session.commit();
+        }
+        finally
+        {
+            connection.close();
+        }
+
+        Connection connection2 = getConnection();
+        try
+        {
+            Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            connection2.start();
+
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertTrue("Text message should be received", message instanceof TextMessage);
+            TextMessage textMessage = (TextMessage) message;
+            assertEquals("Unexpected message received", "A", textMessage.getText());
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
+
+    @Test
+    public void receiveMessageAndAbortTransaction() throws Exception
+    {
+        final Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Utils.sendTextMessage(connection, queue, "A");
+
+            connection.start();
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertTrue("Text message should be received", message instanceof TextMessage);
+            TextMessage textMessage = (TextMessage) message;
+            assertEquals("Unexpected message received", "A", textMessage.getText());
+        }
+        finally
+        {
+            connection.close();
+        }
+
+        Connection connection2 = getConnection();
+        try
+        {
+            connection2.start();
+            Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertTrue("Text message should be received", message instanceof TextMessage);
+            TextMessage textMessage = (TextMessage) message;
+            assertEquals("Unexpected message received", "A", textMessage.getText());
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
+
+    @Test
+    public void receiveMessageAndRollbackTransaction() throws Exception
+    {
+        final Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Utils.sendTextMessage(connection, queue, "A");
+
+            connection.start();
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertTrue("Text message should be received", message instanceof TextMessage);
+            TextMessage textMessage = (TextMessage) message;
+            assertEquals("Unexpected message received", "A", textMessage.getText());
+            session.rollback();
+        }
+        finally
+        {
+            connection.close();
+        }
+
+        Connection connection2 = getConnection();
+        try
+        {
+            Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            connection2.start();
+
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertTrue("Text message should be received", message instanceof TextMessage);
+            TextMessage textMessage = (TextMessage) message;
+            assertEquals("Unexpected message received", "A", textMessage.getText());
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
+
+    @Test
+    public void receiveMessageAndCommitTransaction() throws Exception
+    {
+        final Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Utils.sendMessages(connection, queue, 2);
+
+            connection.start();
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertNotNull("Message not received", message);
+            assertEquals("Unexpected message received", 0, message.getIntProperty(INDEX));
+            session.commit();
+        }
+        finally
+        {
+            connection.close();
+        }
+
+        Connection connection2 = getConnection();
+        try
+        {
+            Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            connection2.start();
+
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertNotNull("Message not received", message);
+            assertEquals("Unexpected message received", 1, message.getIntProperty(INDEX));
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
+
+    @Test
+    public void receiveMessageCloseConsumerAndCommitTransaction() throws Exception
+    {
+        final Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Utils.sendMessages(connection, queue, 2);
+
+            connection.start();
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertNotNull("Message not received", message);
+            assertEquals("Unexpected message received", 0, message.getIntProperty(INDEX));
+            messageConsumer.close();
+            session.commit();
+        }
+        finally
+        {
+            connection.close();
+        }
+
+        Connection connection2 = getConnection();
+        try
+        {
+            Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            connection2.start();
+
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertNotNull("Message not received", message);
+            assertEquals("Unexpected message received", 1, message.getIntProperty(INDEX));
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
+
+    @Test
+    public void receiveMessageCloseConsumerAndRollbackTransaction() throws Exception
+    {
+        final Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Utils.sendMessages(connection, queue, 2);
+
+            connection.start();
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertNotNull("Message not received", message);
+            assertEquals("Unexpected message received", 0, message.getIntProperty(INDEX));
+            messageConsumer.close();
+            session.rollback();
+        }
+        finally
+        {
+            connection.close();
+        }
+
+        Connection connection2 = getConnection();
+        try
+        {
+            Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            connection2.start();
+
+            Message message = messageConsumer.receive(getReceiveTimeout());
+            assertNotNull("Message not received", message);
+            assertEquals("Unexpected message received", 0, message.getIntProperty(INDEX));
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
+
+    @Test
+    public void transactionSharedByConsumers() throws Exception
+    {
+        final Queue queue1 = createQueue("Q1");
+        final Queue queue2 = createQueue("Q2");
+        Connection connection = getConnection();
+        try
+        {
+            Utils.sendTextMessage(connection, queue1, "queue1Message1");
+            Utils.sendTextMessage(connection, queue1, "queue1Message2");
+            Utils.sendTextMessage(connection, queue2, "queue2Message1");
+            Utils.sendTextMessage(connection, queue2, "queue2Message2");
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer messageConsumer1 = session.createConsumer(queue1);
+            MessageConsumer messageConsumer2 = session.createConsumer(queue2);
+            connection.start();
+
+            Message message1 = messageConsumer1.receive(getReceiveTimeout());
+            assertTrue("Text message not received from first queue", message1 instanceof TextMessage);
+            assertEquals("Unexpected message received from first queue",
+                         "queue1Message1",
+                         ((TextMessage) message1).getText());
+
+            Message message2 = messageConsumer2.receive(getReceiveTimeout());
+            assertTrue("Text message not received from second queue", message2 instanceof TextMessage);
+            assertEquals("Unexpected message received from second queue",
+                         "queue2Message1",
+                         ((TextMessage) message2).getText());
+
+            session.rollback();
+
+            message1 = messageConsumer1.receive(getReceiveTimeout());
+            assertTrue("Text message not received from first queue", message1 instanceof TextMessage);
+            assertEquals("Unexpected message received from first queue",
+                         "queue1Message1",
+                         ((TextMessage) message1).getText());
+
+            message2 = messageConsumer2.receive(getReceiveTimeout());
+            assertTrue("Text message not received from second queue", message2 instanceof TextMessage);
+            assertEquals("Unexpected message received from second queue",
+                         "queue2Message1",
+                         ((TextMessage) message2).getText());
+
+            session.commit();
+
+            Message message3 = messageConsumer1.receive(getReceiveTimeout());
+            assertTrue("Text message not received from first queue", message3 instanceof TextMessage);
+            assertEquals("Unexpected message received from first queue",
+                         "queue1Message2",
+                         ((TextMessage) message3).getText());
+
+            Message message4 = messageConsumer2.receive(getReceiveTimeout());
+            assertTrue("Text message not received from second queue", message4 instanceof TextMessage);
+            assertEquals("Unexpected message received from second queue",
+                         "queue2Message2",
+                         ((TextMessage) message4).getText());
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    public void testCommitWithinOnMessage() throws Exception
+    {
+        final Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        int messageNumber = 2;
+        try
+        {
+            Utils.sendMessages(connection, queue, messageNumber);
+            final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            final CountDownLatch receiveLatch = new CountDownLatch(messageNumber);
+            final AtomicInteger commitCounter = new AtomicInteger();
+            final AtomicReference<Throwable> messageConsumerThrowable = new AtomicReference<>();
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+
+            messageConsumer.setMessageListener(message -> {
+                try
+                {
+                    LOGGER.info("received message " + message);
+                    assertEquals("Wrong message received", message.getJMSCorrelationID(), "m1");
+                    LOGGER.info("commit session");
+                    session.commit();
+                    commitCounter.incrementAndGet();
+                }
+                catch (Throwable e)
+                {
+                    messageConsumerThrowable.set(e);
+                    LOGGER.error("Unexpected exception", e);
+                }
+                finally
+                {
+                    receiveLatch.countDown();
+                }
+            });
+            connection.start();
+
+            assertTrue("Messages not received in expected time",
+                       receiveLatch.await(getReceiveTimeout() * messageNumber, TimeUnit.MILLISECONDS));
+            assertNull("Unexpected exception: " + messageConsumerThrowable.get(), messageConsumerThrowable.get());
+            assertEquals("Unexpected number of commits", messageNumber, commitCounter.get());
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    @Test
+    public void exhaustedPrefetchInTransaction() throws Exception
+    {
+        final int maxPrefetch = 2;
+        final int messageNumber = maxPrefetch + 1;
+
+        final Queue queue = createQueue(getTestName());
+        Connection connection = getConnectionBuilder().setPrefetch(maxPrefetch).build();
+        try
+        {
+            Utils.sendMessages(connection, queue, messageNumber);
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            connection.start();
+
+            for (int i = 0; i < maxPrefetch; i++)
+            {
+                final Message message = messageConsumer.receive(getReceiveTimeout());
+                assertNotNull(String.format("Message %d not received", i), message);
+                assertEquals("Unexpected message received", i, message.getIntProperty(INDEX));
+            }
+
+            session.rollback();
+
+            for (int i = 0; i < maxPrefetch; i++)
+            {
+                final Message message = messageConsumer.receive(getReceiveTimeout());
+                assertNotNull(String.format("Message %d not received after rollback", i), message);
+                assertEquals("Unexpected message received after rollback", i, message.getIntProperty(INDEX));
+            }
+
+            session.commit();
+
+            final Message message = messageConsumer.receive(getReceiveTimeout());
+            assertNotNull(String.format("Message %d not received", maxPrefetch), message);
+            assertEquals("Unexpected message received", maxPrefetch, message.getIntProperty(INDEX));
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    @Test
+    public void testMessageOrder() throws Exception
+    {
+        final int messageNumber = 4;
+        Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer consumer = session.createConsumer(queue);
+            Utils.sendMessages(connection, queue, messageNumber);
+            connection.start();
+
+            int messageSeen = 0;
+            int expectedIndex = 0;
+            while (expectedIndex < messageNumber)
+            {
+                Message message = consumer.receive(getReceiveTimeout());
+                assertNotNull(String.format("Expected message '%d' is not received", expectedIndex), message);
+                assertEquals("Received message out of order", expectedIndex, message.getIntProperty(INDEX));
+
+                //don't commit transaction for the message until we receive it 5 times
+                if (messageSeen < 5)
+                {
+                    // receive remaining
+                    for (int m = expectedIndex + 1; m < messageNumber; m++)
+                    {
+                        Message remaining = consumer.receive(getReceiveTimeout());
+                        assertNotNull(String.format("Expected remaining message '%d' is not received", m), message);
+                        assertEquals("Received remained message out of order", m, remaining.getIntProperty(INDEX));
+                    }
+
+                    LOGGER.debug(String.format("Rolling back transaction for message with index %d", expectedIndex));
+                    session.rollback();
+                    messageSeen++;
+                }
+                else
+                {
+                    LOGGER.debug(String.format("Committing transaction for message with index %d", expectedIndex));
+                    messageSeen = 0;
+                    expectedIndex++;
+                    session.commit();
+                }
+            }
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    @Test
+    public void testRollbackSoak() throws Exception
+    {
+        final int messageNumber = 20;
+        final int maximumRollbacks = messageNumber * 2;
+        final int numberOfConsumers = 2;
+        long timeout = getReceiveTimeout() * (maximumRollbacks + messageNumber);
+        final AtomicInteger rollbackCounter = new AtomicInteger();
+        final AtomicInteger commitCounter = new AtomicInteger();
+        final AtomicBoolean shutdown = new AtomicBoolean();
+        List<ListenableFuture<Void>> consumerFutures = new ArrayList<>(numberOfConsumers);
+        final Queue queue = createQueue(getTestName());
+        final Connection connection = getConnectionBuilder().setPrefetch(messageNumber / numberOfConsumers).build();
+        try
+        {
+            Utils.sendMessages(connection, queue, messageNumber);
+            final ListeningExecutorService threadPool =
+                    MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfConsumers));
+            try
+            {
+                for (int i = 0; i < numberOfConsumers; ++i)
+                {
+                    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                    final MessageConsumer consumer = session.createConsumer(queue);
+                    consumerFutures.add(threadPool.submit(() -> {
+                        do
+                        {
+                            Message m = consumer.receive(getReceiveTimeout());
+                            if (m != null)
+                            {
+                                if (rollbackCounter.incrementAndGet() <= maximumRollbacks)
+                                {
+                                    session.rollback();
+                                }
+                                else
+                                {
+                                    session.commit();
+                                    commitCounter.incrementAndGet();
+                                }
+                            }
+                        }
+                        while (commitCounter.get() < messageNumber
+                               && !Thread.currentThread().isInterrupted()
+                               && !shutdown.get());
+                        return null;
+                    }));
+                }
+                connection.start();
+
+                final ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(consumerFutures);
+                try
+                {
+                    combinedFuture.get(timeout, TimeUnit.MILLISECONDS);
+                }
+                catch (TimeoutException e)
+                {
+                    fail(String.format(
+                            "Test took more than %.1f seconds. All consumers probably starved."
+                            + " Performed %d rollbacks, consumed %d/%d messages",
+                            timeout / 1000.,
+                            rollbackCounter.get(),
+                            commitCounter.get(),
+                            messageNumber));
+                }
+                finally
+                {
+                    shutdown.set(true);
+                }
+            }
+            finally
+            {
+                threadPool.shutdown();
+                threadPool.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+            }
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
index 39e3322..40b4848 100644
--- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/deliverycount/DeliveryCountTest.java
@@ -60,7 +60,7 @@ public class DeliveryCountTest extends JmsTestBase
             connection.start();
             Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
            _queue = session.createQueue(testQueueName);
-            Utils.sendMessage(session, _queue, 1);
+            Utils.sendMessages(session, _queue, 1);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
index 32c60f5..f40ded7 100644
--- a/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
+++ b/systests/qpid-systests-jms_2.0/src/test/java/org/apache/qpid/systests/jms_2_0/subscription/SharedSubscriptionTest.java
@@ -57,7 +57,7 @@ public class SharedSubscriptionTest extends JmsTestBase
             MessageConsumer consumer1 = subscriber1Session.createSharedConsumer(topic, "subscription");
             MessageConsumer consumer2 = subscriber2Session.createSharedConsumer(topic, "subscription");
 
-            Utils.sendMessage(publishingSession, topic, 2);
+            Utils.sendMessages(publishingSession, topic, 2);
 
             connection.start();
 
@@ -94,7 +94,7 @@ public class SharedSubscriptionTest extends JmsTestBase
             MessageConsumer consumer1 = subscriber1Session.createSharedDurableConsumer(topic, "subscription");
             MessageConsumer consumer2 = subscriber2Session.createSharedDurableConsumer(topic, "subscription");
 
-            Utils.sendMessage(publishingSession, topic, 4);
+            Utils.sendMessages(publishingSession, topic, 4);
 
             connection.start();
 
@@ -224,7 +224,7 @@ public class SharedSubscriptionTest extends JmsTestBase
             MessageConsumer consumer2 = subscriberSession.createSharedConsumer(topic, "testSharedSubscription");
             connection.start();
 
-            Utils.sendMessage(publishingSession, topic, 1);
+            Utils.sendMessages(publishingSession, topic, 1);
 
             Message message1 = consumer1.receive(getReceiveTimeout());
             Message message2 = consumer2.receive(getReceiveTimeout());
@@ -254,7 +254,7 @@ public class SharedSubscriptionTest extends JmsTestBase
             connection.start();
             connection2.start();
 
-            Utils.sendMessage(publishingSession, topic, 1);
+            Utils.sendMessages(publishingSession, topic, 1);
 
             Message message1 = consumer1.receive(getReceiveTimeout());
             Message message2 = consumer2.receive(getReceiveTimeout());
@@ -283,7 +283,7 @@ public class SharedSubscriptionTest extends JmsTestBase
             MessageConsumer consumer1 =
                     subscriber1Session.createSharedDurableConsumer(topic, "subscription", "index>1");
 
-            Utils.sendMessage(publishingSession, topic, 4);
+            Utils.sendMessages(publishingSession, topic, 4);
 
             connection.start();
             connection2.start();
@@ -320,7 +320,7 @@ public class SharedSubscriptionTest extends JmsTestBase
                     "No message should be received as re-subscribing with different topic or selector is equivalent to unsubscribe/subscribe",
                     message2);
 
-            Utils.sendMessage(publishingSession, topic, 4);
+            Utils.sendMessages(publishingSession, topic, 4);
 
             Message message3 = consumer2.receive(getReceiveTimeout());
             assertNotNull("Should receive message 3", message3);
@@ -335,7 +335,7 @@ public class SharedSubscriptionTest extends JmsTestBase
                     "No message should be received as re-subscribing with different topic or selector is equivalent to unsubscribe/subscribe",
                     message4);
 
-            Utils.sendMessage(publishingSession, topic2, 4);
+            Utils.sendMessages(publishingSession, topic2, 4);
 
             Message message5 = consumer3.receive(getReceiveTimeout());
             assertEquals("Unexpected index for message 5", 3, message5.getIntProperty(Utils.INDEX));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/src/test/java/org/apache/qpid/test/client/RollbackOrderTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/client/RollbackOrderTest.java b/systests/src/test/java/org/apache/qpid/test/client/RollbackOrderTest.java
deleted file mode 100644
index ea3352e..0000000
--- a/systests/src/test/java/org/apache/qpid/test/client/RollbackOrderTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.client;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import junit.framework.AssertionFailedError;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/**
- * RollbackOrderTest, QPID-1864, QPID-1871
- *
- * Description:
- *
- * The problem that this test is exposing is that the dispatcher used to be capable
- * of holding on to a message when stopped. This meant that when the rollback was
- * called and the dispatcher stopped it may have hold of a message. So after all
- * the local queues(preDeliveryQueue, SynchronousQueue, PostDeliveryTagQueue)
- * have been cleared the client still had a single message, the one the
- * dispatcher was holding on to.
- *
- * As a result the TxRollback operation would run and then release the dispatcher.
- * Whilst the dispatcher would then proceed to reject the message it was holding
- * the Broker would already have resent that message so the rejection would silently
- * fail.
- *
- * And the client would receive that single message 'early', depending on the
- * number of messages already received when rollback was called.
- *
- *
- * Aims:
- *
- * The tests puts 50 messages on to the queue.
- *
- * The test then tries to cause the dispatcher to stop whilst it is in the process
- * of moving a message from the preDeliveryQueue to a consumers sychronousQueue.
- *
- * To exercise this path we have 50 message flowing to the client to give the
- * dispatcher a bit of work to do moving messages.
- *
- * Then we loop - 10 times
- *  - Validating that the first message received is always message 1.
- *  - Receive a few more so that there are a few messages to reject.
- *  - call rollback, to try and catch the dispatcher mid process.
- *
- * Outcome:
- *
- * The hope is that we catch the dispatcher mid process and cause a BasicReject
- * to fail. Which will be indicated in the log but will also cause that failed
- * rejected message to be the next to be delivered which will not be message 1
- * as expected.
- *
- * We are testing a race condition here but we can check through the log file if
- * the race condition occurred. However, performing that check will only validate
- * the problem exists and will not be suitable as part of a system test.
- *
- * @see org.apache.qpid.test.unit.transacted.CommitRollbackTest
- */
-public class RollbackOrderTest extends QpidBrokerTestCase
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(RollbackOrderTest.class);
-    private Connection _connection;
-    private Queue _queue;
-    private Session _session;
-    private MessageConsumer _consumer;
-
-    @Override public void setUp() throws Exception
-    {
-        super.setUp();
-        _connection = getConnection();
-
-        _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
-        _queue = createTestQueue(_session);
-        _consumer = _session.createConsumer(_queue);
-
-        //Send more messages so it is more likely that the dispatcher is
-        // processing on rollback.
-        sendMessage(_session, _queue, 50);
-        _session.commit();
-
-    }
-
-    public void testOrderingAfterRollback() throws Exception
-    {
-        //Start the session now so we
-        _connection.start();
-
-        for (int i = 0; i < 20; i++)
-        {
-            Message msg = _consumer.receive();
-            assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
-
-            // Pull additional messages through so we have some reject work to do
-            for (int m=1; m <= 5 ; m++)
-            {
-                msg = _consumer.receive();
-                assertEquals("Incorrect Message Received (message " + m + ")", m, msg.getIntProperty(INDEX));
-            }
-
-            _session.rollback();
-        }
-    }
-
-    public void testOrderingAfterRollbackOnMessage() throws Exception
-    {
-        final CountDownLatch count= new CountDownLatch(20);
-        final Exception exceptions[] = new Exception[20];
-        final AtomicBoolean failed = new AtomicBoolean(false);
-
-        _consumer.setMessageListener(new MessageListener()
-        {
-
-            @Override
-            public void onMessage(Message message)
-            {
-
-                Message msg = message;
-                try
-                {
-                    count.countDown();
-                    assertEquals("Incorrect Message Received", 0, msg.getIntProperty(INDEX));
-
-                    _session.rollback();
-                }
-                catch (JMSException e)
-                {
-                    LOGGER.error("Error:" + e.getMessage(), e);
-                    exceptions[(int)count.getCount()] = e;
-                }
-                catch (AssertionFailedError cf)
-                {
-                    // End Test if Equality test fails
-                    while (count.getCount() != 0)
-                    {
-                        count.countDown();
-                    }
-
-                    LOGGER.error("Error:" + cf.getMessage(), cf);
-                    failed.set(true);
-                }
-            }
-        });
-        //Start the session now so we
-        _connection.start();
-
-        count.await(10l, TimeUnit.SECONDS);
-        assertEquals("Not all message received.  Count should be 0.", 0, count.getCount());
-
-        for (Exception e : exceptions)
-        {
-            if (e != null)
-            {
-                LOGGER.error("Encountered exception", e);
-                failed.set(true);
-            }
-        }
-
-        _connection.close();
-
-        assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get());
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
deleted file mode 100644
index c4eb3d5..0000000
--- a/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ /dev/null
@@ -1,670 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- *
- *
- */
-package org.apache.qpid.test.unit.transacted;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.RejectBehaviour;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/**
- * This class tests a number of commits and roll back scenarios
- *
- * Assumptions; - Assumes empty Queue
- *
- * @see org.apache.qpid.test.client.RollbackOrderTest
- */
-public class CommitRollbackTest extends QpidBrokerTestCase
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(CommitRollbackTest.class);
-    private long _positiveTimeout;
-    private long _negativeTimeout;
-
-    private Connection _conn;
-    private Session _session;
-    private MessageProducer _publisher;
-    private Session _pubSession;
-    private MessageConsumer _consumer;
-    private Queue _jmsQueue;
-
-    @Override
-    public void setUp() throws Exception
-    {
-        super.setUp();
-        _positiveTimeout = getReceiveTimeout();
-        _negativeTimeout = getShortReceiveTimeout();
-    }
-
-    private void newConnection() throws Exception
-    {
-        LOGGER.debug("calling newConnection()");
-        _conn = getConnection();
-
-        _session = _conn.createSession(true, Session.SESSION_TRANSACTED);
-
-        final String queueName = getTestQueueName();
-        _jmsQueue = createTestQueue(_session);
-        _session.commit();
-        _consumer = _session.createConsumer(_jmsQueue);
-
-        _pubSession = _conn.createSession(true, Session.SESSION_TRANSACTED);
-
-        _publisher = _pubSession.createProducer(_pubSession.createQueue(queueName));
-
-        _conn.start();
-    }
-
-    /**
-     * PUT a text message, disconnect before commit, confirm it is gone.
-     *
-     * @throws Exception On error
-     */
-    public void testPutThenDisconnect() throws Exception
-    {
-        newConnection();
-
-        assertTrue("session is not transacted", _session.getTransacted());
-        assertTrue("session is not transacted", _pubSession.getTransacted());
-
-        LOGGER.info("sending test message");
-        String MESSAGE_TEXT = "testPutThenDisconnect";
-        _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
-        LOGGER.info("reconnecting without commit");
-        _conn.close();
-
-        newConnection();
-
-        LOGGER.info("receiving result");
-        Message result = _consumer.receive(_negativeTimeout);
-
-        // commit to ensure message is removed from queue
-        _session.commit();
-
-        assertNull("test message was put and disconnected before commit, but is still present", result);
-    }
-
-
-    /**
-     * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different
-     * session as producer
-     *
-     * @throws Exception On error
-     */
-    public void testPutThenRollback() throws Exception
-    {
-        newConnection();
-
-        assertTrue("session is not transacted", _session.getTransacted());
-        assertTrue("session is not transacted", _pubSession.getTransacted());
-
-        LOGGER.info("sending test message");
-        String MESSAGE_TEXT = "testPutThenRollback";
-        _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
-        LOGGER.info("rolling back");
-        _pubSession.rollback();
-
-        LOGGER.info("receiving result");
-        Message result = _consumer.receive(_negativeTimeout);
-
-        assertNull("test message was put and rolled back, but is still present", result);
-    }
-
-    /**
-     * GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection
-     *
-     * @throws Exception On error
-     */
-    public void testGetThenDisconnect() throws Exception
-    {
-        newConnection();
-
-        assertTrue("session is not transacted", _session.getTransacted());
-        assertTrue("session is not transacted", _pubSession.getTransacted());
-
-        LOGGER.info("sending test message");
-        String MESSAGE_TEXT = "testGetThenDisconnect";
-        _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
-        _pubSession.commit();
-
-        LOGGER.info("getting test message");
-
-        Message msg = _consumer.receive(_positiveTimeout);
-        assertNotNull("retrieved message is null", msg);
-
-        LOGGER.info("closing connection");
-        _conn.close();
-
-        newConnection();
-
-        LOGGER.info("receiving result");
-        Message result = _consumer.receive(_negativeTimeout);
-
-        _session.commit();
-
-        assertNotNull("test message was consumed and disconnected before commit, but is gone", result);
-        assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
-    }
-
-    /**
-     * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the
-     * same connection but different session as producer
-     *
-     * @throws Exception On error
-     */
-    public void testGetThenCloseDisconnect() throws Exception
-    {
-        newConnection();
-
-        assertTrue("session is not transacted", _session.getTransacted());
-        assertTrue("session is not transacted", _pubSession.getTransacted());
-
-        LOGGER.info("sending test message");
-        String MESSAGE_TEXT = "testGetThenCloseDisconnect";
-        _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
-        _pubSession.commit();
-
-        LOGGER.info("getting test message");
-
-        Message msg = _consumer.receive(_positiveTimeout);
-        assertNotNull("retrieved message is null", msg);
-        assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
-
-        LOGGER.info("reconnecting without commit");
-        _consumer.close();
-        _conn.close();
-
-        newConnection();
-
-        LOGGER.info("receiving result");
-        Message result = _consumer.receive(_positiveTimeout);
-
-        _session.commit();
-
-        assertNotNull("test message was consumed and disconnected before commit, but is gone", result);
-        assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
-    }
-
-    /**
-     * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt
-     * session to the producer
-     *
-     * @throws Exception On error
-     */
-    public void testGetThenRollback() throws Exception
-    {
-        newConnection();
-
-        assertTrue("session is not transacted", _session.getTransacted());
-        assertTrue("session is not transacted", _pubSession.getTransacted());
-
-        LOGGER.info("sending test message");
-        String MESSAGE_TEXT = "testGetThenRollback";
-        _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
-        _pubSession.commit();
-
-        LOGGER.info("getting test message");
-
-        Message msg = _consumer.receive(_positiveTimeout);
-
-        assertNotNull("retrieved message is null", msg);
-        assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
-
-        LOGGER.info("rolling back");
-
-        _session.rollback();
-
-        LOGGER.info("receiving result");
-
-        Message result = _consumer.receive(_positiveTimeout);
-
-        _session.commit();
-        assertNotNull("test message was consumed and rolled back, but is gone", result);
-        assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
-        assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
-    }
-
-    /**
-     * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same
-     * connection but different session as producer
-     *
-     * @throws Exception On error
-     */
-    public void testGetThenCloseRollback() throws Exception
-    {
-        newConnection();
-
-        assertTrue("session is not transacted", _session.getTransacted());
-        assertTrue("session is not transacted", _pubSession.getTransacted());
-
-        LOGGER.info("sending test message");
-        String MESSAGE_TEXT = "testGetThenCloseRollback";
-        _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
-        _pubSession.commit();
-
-        LOGGER.info("getting test message");
-
-        Message msg = _consumer.receive(_positiveTimeout);
-
-        assertNotNull("retrieved message is null", msg);
-        assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) msg).getText());
-
-        LOGGER.info("Closing consumer");
-        _consumer.close();
-
-        LOGGER.info("rolling back");
-        _session.rollback();
-
-        LOGGER.info("receiving result");
-
-        _consumer = _session.createConsumer(_jmsQueue);
-
-        Message result = _consumer.receive(_positiveTimeout);
-
-        _session.commit();
-        assertNotNull("test message was consumed and rolled back, but is gone", result);
-        assertEquals("test message was correct message", MESSAGE_TEXT, ((TextMessage) result).getText());
-        assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
-    }
-
-    /**
-     * This test sends two messages receives one of them but doesn't ack it.
-     * The consumer is then closed
-     * the first message should be returned as redelivered.
-     *  the second message should be delivered normally.
-     * @throws Exception
-     */
-    public void testSend2ThenCloseAfter1andTryAgain() throws Exception
-    {
-        newConnection();
-
-        assertTrue("session is not transacted", _session.getTransacted());
-        assertTrue("session is not transacted", _pubSession.getTransacted());
-
-        LOGGER.info("sending two test messages");
-        _publisher.send(_pubSession.createTextMessage("1"));
-        _publisher.send(_pubSession.createTextMessage("2"));
-        _pubSession.commit();
-
-        LOGGER.info("getting test message");
-        Message result = _consumer.receive(_positiveTimeout);
-
-        assertNotNull("Message received should not be null", result);
-        assertEquals("1", ((TextMessage) result).getText());
-        assertTrue("Message is marked as redelivered" + result, !result.getJMSRedelivered());
-
-        LOGGER.info("Closing Consumer");
-        
-        _consumer.close();
-
-        LOGGER.info("Creating New consumer");
-        _consumer = _session.createConsumer(_jmsQueue);
-
-        LOGGER.info("receiving result");
-
-
-        // Message 2 may be marked as redelivered if it was prefetched.
-        result = _consumer.receive(_positiveTimeout);
-        assertNotNull("Second message was not consumed, but is gone", result);
-
-        // The first message back will be 2, message 1 has been received but not committed
-        // Closing the consumer does not commit the session.
-
-        // if this is message 1 then it should be marked as redelivered
-        if("1".equals(((TextMessage) result).getText()))
-        {
-            fail("First message was received again");
-        }
-
-        result = _consumer.receive(_negativeTimeout);
-        assertNull("test message should be null:" + result, result);
-
-        _session.commit();
-    }
-
-    public void testPutThenRollbackThenGet() throws Exception
-    {
-        newConnection();
-
-        assertTrue("session is not transacted", _session.getTransacted());
-        assertTrue("session is not transacted", _pubSession.getTransacted());
-
-        LOGGER.info("sending test message");
-        String MESSAGE_TEXT = "testPutThenRollbackThenGet";
-
-        _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-        _pubSession.commit();
-
-        assertNotNull(_consumer.receive(_positiveTimeout));
-
-        _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
-        LOGGER.info("rolling back");
-        _pubSession.rollback();
-
-        LOGGER.info("receiving result");
-        Message result = _consumer.receive(_negativeTimeout);
-        assertNull("test message was put and rolled back, but is still present", result);
-
-        _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
-        _pubSession.commit();
-
-        assertNotNull(_consumer.receive(_positiveTimeout));
-
-        _session.commit();
-    }
-
-    /**
-     * Qpid-1163
-     * Check that when commit is called inside onMessage then
-     * the last message is nor redelivered. 
-     *
-     * @throws Exception
-     */
-    public void testCommitWithinOnMessage() throws Exception
-    {
-        newConnection();
-
-        Queue queue = createTestQueue(_session,"example.queue");
-        _session.commit();
-
-        // create a consumer
-        MessageConsumer cons = _session.createConsumer(queue);
-        MessageProducer prod = _session.createProducer(queue);
-        Message message =  _session.createTextMessage("Message");
-        message.setJMSCorrelationID("m1");
-        prod.send(message);
-        _session.commit();
-        LOGGER.info("Sent message to queue");
-        CountDownLatch cd = new CountDownLatch(1);
-        cons.setMessageListener(new CommitWithinOnMessageListener(cd));
-        _conn.start();
-        cd.await(30, TimeUnit.SECONDS);
-        if( cd.getCount() > 0 )
-        {
-           fail("Did not received message");
-        }
-        // Check that the message has been dequeued
-        _session.close();
-        _conn.close();
-        _conn = getConnection();
-        _conn.start();
-        Session session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        cons = session.createConsumer(queue);        
-        message = cons.receiveNoWait();
-        if(message != null)
-        {
-            if(message.getJMSCorrelationID().equals("m1"))
-            {
-                fail("received message twice");
-            }
-            else
-            {
-                fail("queue should have been empty, received message: " + message);
-            }
-        }
-    }
-
-    /**
-     * This test ensures that after exhausting credit (prefetch), a {@link Session#rollback()} successfully
-     * restores credit and allows the same messages to be re-received.
-     */
-    public void testRollbackSessionAfterCreditExhausted() throws Exception
-    {
-        final int maxPrefetch= 5;
-
-        // We send more messages than prefetch size.  This ensure that if the 0-10 client were to
-        // complete the message commands before the rollback command is sent, the broker would
-        // send additional messages utilising the release credit.  This problem would manifest itself
-        // as an incorrect message (or no message at all) being received at the end of the test.
-
-        final int numMessages = maxPrefetch * 2;
-
-        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, String.valueOf(maxPrefetch));
-
-        newConnection();
-
-        assertEquals("Prefetch not reset", maxPrefetch, ((AMQSession<?, ?>)_session).getDefaultPrefetch());
-
-        assertTrue("session is not transacted", _session.getTransacted());
-        assertTrue("session is not transacted", _pubSession.getTransacted());
-
-        sendMessage(_pubSession, _publisher.getDestination(), numMessages);
-        _pubSession.commit();
-
-        for (int i=0 ;i< maxPrefetch; i++)
-        {
-            final Message message = _consumer.receive(_positiveTimeout);
-            assertNotNull("Received:" + i, message);
-            assertEquals("Unexpected message received", i, message.getIntProperty(INDEX));
-        }
-
-        LOGGER.info("Rolling back");
-        _session.rollback();
-
-        LOGGER.info("Receiving messages");
-
-        Message result = _consumer.receive(_positiveTimeout);
-        assertNotNull("Message expected", result);
-        // Expect the first message
-        assertEquals("Unexpected message received", 0, result.getIntProperty(INDEX));
-    }
-
-    private class CommitWithinOnMessageListener implements MessageListener
-    {
-        private CountDownLatch _cd;
-        private CommitWithinOnMessageListener(CountDownLatch cd)
-        {
-            _cd = cd;
-        }
-        @Override
-        public void onMessage(Message message)
-        {
-            try
-            {
-                LOGGER.info("received message " + message);
-                assertEquals("Wrong message received", message.getJMSCorrelationID(), "m1");
-                LOGGER.info("commit session");
-                _session.commit();
-                _cd.countDown();
-            }
-            catch (JMSException e)
-            {
-                LOGGER.error("OnMessage error",e);
-            }
-        }
-    }
-
-    public void testRollbackSoak() throws Exception
-    {
-        newConnection();
-        _consumer.close();
-
-        final int rollbackTime = 2000;
-        final int numberOfMessages = 1000;
-        final int numberOfConsumers = 2;
-        final long testTimeout = 60*1000L;
-        sendMessage(_pubSession, _jmsQueue, numberOfMessages);
-
-        List<ListenableFuture<Void >> consumerFutures = new ArrayList<>(numberOfConsumers);
-        final ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfConsumers));
-
-        try
-        {
-            final CountDownLatch modeFlippedLatch = new CountDownLatch(1);
-            final AtomicInteger counter = new AtomicInteger();
-            final AtomicInteger rollbackCounter = new AtomicInteger();
-            final long flipModeTime = System.currentTimeMillis() + rollbackTime;
-            final AtomicBoolean shutdown = new AtomicBoolean();
-
-            for (int i = 0; i < numberOfConsumers; ++i)
-            {
-                consumerFutures.add(threadPool.submit(new Callable<Void>()
-                {
-                    @Override
-                    public Void call() throws Exception
-                    {
-                        Session session = _conn.createSession(true, Session.SESSION_TRANSACTED);
-                        final MessageConsumer consumer = session.createConsumer(_jmsQueue);
-
-                        while(!shutdown.get())
-                        {
-                            Message m = consumer.receive(_positiveTimeout);
-                            if (m != null)
-                            {
-                                long currentTime = System.currentTimeMillis();
-                                if (currentTime < flipModeTime)
-                                {
-                                    session.rollback();
-                                    rollbackCounter.incrementAndGet();
-                                }
-                                else
-                                {
-                                    modeFlippedLatch.countDown();
-                                    counter.incrementAndGet();
-                                    session.commit();
-                                }
-                            }
-
-                            if (counter.get() == numberOfMessages)
-                            {
-                                break;
-                            }
-
-                            if (Thread.currentThread().isInterrupted())
-                            {
-                                break;
-                            }
-                        }
-
-                        return null;
-                    }
-                }));
-            }
-
-            final ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(consumerFutures);
-            modeFlippedLatch.await(rollbackTime * 2, TimeUnit.MILLISECONDS);
-            try
-            {
-                combinedFuture.get(testTimeout, TimeUnit.MILLISECONDS);
-                LOGGER.debug("Performed {} rollbacks, consumed {}/{} messages",
-                              rollbackCounter.get(),
-                              counter.get(),
-                              numberOfMessages);
-            }
-            catch (TimeoutException e)
-            {
-                fail(String.format(
-                        "Test took more than %.1f seconds. All consumers probably starved. Performed %d rollbacks, consumed %d/%d messages",
-                        testTimeout / 1000.,
-                        rollbackCounter.get(),
-                        counter.get(),
-                        numberOfMessages));
-            }
-            finally
-            {
-                shutdown.set(true);
-            }
-            assertEquals(String.format(
-                    "Unexpected number of messages received. Performed %d rollbacks, consumed %d/%d messages",
-                    rollbackCounter.get(),
-                    counter.get(),
-                    numberOfMessages), numberOfMessages, counter.get());
-        }
-        finally
-        {
-            threadPool.shutdownNow();
-            threadPool.awaitTermination(2 * _positiveTimeout, TimeUnit.SECONDS);
-        }
-    }
-
-    public void testResendUnseenMessagesAfterRollback() throws Exception
-    {
-        resendAfterRollback();
-    }
-
-    public void testResendUnseenMessagesAfterRollbackWithServerReject() throws Exception
-    {
-        setTestSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.SERVER.toString());
-        resendAfterRollback();
-    }
-
-    private void resendAfterRollback() throws Exception
-    {
-        newConnection();
-
-        assertTrue("session is not transacted", _session.getTransacted());
-        assertTrue("session is not transacted", _pubSession.getTransacted());
-
-        LOGGER.info("sending test message");
-        String MESSAGE_TEXT = "message text";
-
-        _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-        _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
-
-        _pubSession.commit();
-
-        assertNotNull("two messages were sent, but none has been received", _consumer.receive(_positiveTimeout));
-
-        _session.rollback();
-
-        LOGGER.info("receiving result");
-
-        assertNotNull("two messages were sent, but none has been received", _consumer.receive(_positiveTimeout));
-        assertNotNull("two messages were sent, but only one has been received", _consumer.receive(_positiveTimeout));
-        assertNull("Only two messages were sent, but more have been received", _consumer.receive(_negativeTimeout));
-
-        _session.commit();
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 973be36..7bea664 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -112,8 +112,6 @@ org.apache.qpid.client.failover.FailoverBehaviourTest#testFlowControlFlagResetOn
 org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFailoverHandlerTimeoutExpires
 org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowControlFlagResetOnFailover
 
-org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage#*
-
 // Excluded because plugins from Qpid Broker-J are not used in CPP broker
 org.apache.qpid.server.virtualhost.plugin.*
 org.apache.qpid.info.test.*

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/test-profiles/Java010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java010Excludes b/test-profiles/Java010Excludes
index c67bc36..2846875 100755
--- a/test-profiles/Java010Excludes
+++ b/test-profiles/Java010Excludes
@@ -37,9 +37,6 @@ org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartConsumerFlowSt
 // 0-10 is not supported by the MethodRegistry
 org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
 
-//QPID-1864: rollback with subscriptions does not work in 0-10 yet
-org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage
-
 // QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs.
 org.apache.qpid.server.failover.FailoverMethodTest#*
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f3012224/test-profiles/Java10Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index 56ee05a..52e1ba4 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -36,12 +36,6 @@ org.apache.qpid.server.failover.FailoverMethodTest#*
 // Testing that the 0-x implementation of a durable topic does not cause queue growth when messages are excluded by selectors
 org.apache.qpid.test.unit.topic.TopicSessionTest#testNonMatchingMessagesHandledCorrectly
 
-// These tests explicitly meddle with prefetch - maybe there should be equivalents for AMQP 1.0
-org.apache.qpid.test.unit.transacted.CommitRollbackTest#testRollbackSessionAfterCreditExhausted
-
-// Excluded due to client issue QPIDJMS-231: Prefetched messages are not released on consumer close
-org.apache.qpid.test.unit.transacted.CommitRollbackTest#testSend2ThenCloseAfter1andTryAgain
-
 // This test covers the client version specific mechanisms for restricting the types of Object which can be sent via an ObjectMessage
 org.apache.qpid.client.message.ObjectMessageClassWhitelistingTest#*
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org