You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/12/26 20:30:47 UTC

[1/2] qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP 0-x] Add protocol tests for basic.qos and channel.flow

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 8e78fbe60 -> 37365f918


QPID-8038: [Broker-J] [AMQP 0-x] Add protocol tests for basic.qos and channel.flow


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/988006b3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/988006b3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/988006b3

Branch: refs/heads/master
Commit: 988006b38d8dd10914357779966731dc1aa90530
Parents: 8e78fbe
Author: Keith Wall <ke...@gmail.com>
Authored: Mon Dec 25 17:35:12 2017 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Mon Dec 25 17:36:58 2017 +0000

----------------------------------------------------------------------
 .../tests/protocol/v0_8/BasicInteraction.java   |   6 +
 .../qpid/tests/protocol/v0_8/BasicTest.java     | 132 +++++++++++++++++++
 2 files changed, 138 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/988006b3/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
index 2c1d1b3..c004e38 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -243,6 +243,12 @@ public class BasicInteraction
         return _interaction.sendPerformative(new BasicAckBody(_ackDeliveryTag, _ackMultiple));
     }
 
+    public BasicInteraction ackMultiple(final boolean multiple)
+    {
+        _ackMultiple = multiple;
+        return this;
+    }
+
     public BasicInteraction ackDeliveryTag(final long deliveryTag)
     {
         _ackDeliveryTag = deliveryTag;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/988006b3/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
index 79988c0..f1300fb 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -529,6 +529,138 @@ public class BasicTest extends BrokerAdminUsingTestBase
         }
     }
 
+    /**
+     * The Qpid JMS AMQP 0-x client relies on being able to raise and lower qos count during a channels lifetime
+     * to prevent channel starvation. This test supports this qos use-case.
+     */
+    @Test
+    public void qosCountResized() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "A", "B", "C", "D", "E", "F");
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String consumerTag = "A";
+
+            interaction.openAnonymousConnection()
+                       .channel().open()
+                       .consumeResponse(ChannelOpenOkBody.class)
+                       .channel().flow(true)
+                       .consumeResponse(ChannelFlowOkBody.class)
+                       .basic().qosPrefetchCount(3)
+                       .qos()
+                       .consumeResponse(BasicQosOkBody.class)
+                       .basic().consumeConsumerTag(consumerTag)
+                       .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .consume()
+                       .consumeResponse(BasicConsumeOkBody.class);
+
+            final long deliveryTagA = receiveDeliveryHeaderAndBody(interaction, "A");
+            receiveDeliveryHeaderAndBody(interaction, "B");
+            final long deliveryTagC = receiveDeliveryHeaderAndBody(interaction, "C");
+
+            ensureSync(interaction);
+
+            // Raise qos count by one, expect D to arrive
+            interaction.basic().qosPrefetchCount(4).qos()
+                       .consumeResponse(BasicQosOkBody.class);
+
+            long deliveryTagD = receiveDeliveryHeaderAndBody(interaction, "D");
+            ensureSync(interaction);
+
+            // Ack A, expect E to arrive
+            interaction.basic().ackDeliveryTag(deliveryTagA).ack();
+
+            receiveDeliveryHeaderAndBody(interaction, "E");
+            ensureSync(interaction);
+
+            // Lower qos back to 2 and ensure no more messages arrive (message credit will be negative at this point).
+            interaction.basic().qosPrefetchCount(2).qos()
+                       .consumeResponse(BasicQosOkBody.class);
+            ensureSync(interaction);
+
+            // Ack B and C and ensure still no more messages arrive (message credit will now be zero)
+            interaction.basic()
+                       .ackMultiple(true).ackDeliveryTag(deliveryTagC).ack();
+            ensureSync(interaction);
+
+            // Ack D and ensure F delivery arrives
+            interaction.basic()
+                       .ackMultiple(false).ackDeliveryTag(deliveryTagD).ack();
+
+            receiveDeliveryHeaderAndBody(interaction, "F");
+
+            interaction.channel().close().consumeResponse(ChannelCloseOkBody.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));
+        }
+    }
+
+    /**
+     * The Qpid JMS AMQP 0-x client is capable of polling fors message.  It does this using a combination of
+     * basic.qos (count one) and regulating the flow using channel.flow. This test supports this use-case.
+     */
+    @Test
+    public void pollingUsingFlow() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "A", "B", "C");
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String consumerTag = "A";
+
+            interaction.openAnonymousConnection()
+                       .channel().open()
+                       .consumeResponse(ChannelOpenOkBody.class)
+                       .basic().qosPrefetchCount(1)
+                       .qos()
+                       .consumeResponse(BasicQosOkBody.class)
+                       .channel().flow(false)
+                       .consumeResponse(ChannelFlowOkBody.class)
+                       .basic().consumeConsumerTag(consumerTag)
+                       .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .consume()
+                       .consumeResponse(BasicConsumeOkBody.class);
+
+            ensureSync(interaction);
+
+            interaction.channel().flow(true)
+                       .consumeResponse(ChannelFlowOkBody.class);
+
+            long deliveryTagA = receiveDeliveryHeaderAndBody(interaction, "A");
+
+            interaction.channel().flow(false)
+                       .consumeResponse(ChannelFlowOkBody.class)
+                       .basic().ackDeliveryTag(deliveryTagA).ack();
+
+            ensureSync(interaction);
+
+            interaction.channel().flow(true)
+                       .consumeResponse(ChannelFlowOkBody.class);
+
+            long deliveryTagB = receiveDeliveryHeaderAndBody(interaction, "B");
+
+            interaction.channel().flow(false)
+                       .consumeResponse(ChannelFlowOkBody.class)
+                       .basic().ackDeliveryTag(deliveryTagB).ack()
+                       .channel().close().consumeResponse(ChannelCloseOkBody.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+        }
+    }
+
+    private long receiveDeliveryHeaderAndBody(final Interaction interaction, String expectedMessageContent) throws Exception
+    {
+        BasicDeliverBody delivery = interaction.consumeResponse().getLatestResponse(BasicDeliverBody.class);
+        ContentBody content = interaction.consumeResponse(ContentHeaderBody.class)
+                                         .consumeResponse().getLatestResponse(ContentBody.class);
+
+        assertThat(getContent(content), is(equalTo(expectedMessageContent)));
+        return delivery.getDeliveryTag();
+    }
+
     private void ensureSync(final Interaction interaction) throws Exception
     {
         interaction.exchange()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-broker-j git commit: QPID-6933: [System Tests] Refactor Prefetch tests

Posted by kw...@apache.org.
QPID-6933: [System Tests] Refactor Prefetch tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/37365f91
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/37365f91
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/37365f91

Branch: refs/heads/master
Commit: 37365f91801ffbf070f0580fa7fda7ec91f66fd0
Parents: 988006b
Author: Keith Wall <ke...@gmail.com>
Authored: Mon Dec 25 17:36:14 2017 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Tue Dec 26 18:11:18 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/qpid/systests/Utils.java    |   4 +-
 .../extensions/prefetch/PrefetchTest.java       | 254 ++++++++++++++++
 .../client/prefetch/PrefetchBehaviourTest.java  | 297 -------------------
 .../qpid/systest/prefetch/ZeroPrefetchTest.java |  81 -----
 test-profiles/CPPExcludes                       |   2 -
 test-profiles/Java010Excludes                   |   2 -
 test-profiles/Java10UninvestigatedTestsExcludes |   4 +-
 test-profiles/JavaPre010Excludes                |   3 -
 8 files changed, 257 insertions(+), 390 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
index c2dfe75..01e75f1 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
@@ -52,13 +52,13 @@ public class Utils
         }
     }
 
-    public static void sendMessages(final Connection connection, final Destination destination, final int messageNumber)
+    public static void sendMessages(final Connection connection, final Destination destination, final int count)
             throws JMSException
     {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         try
         {
-            sendMessages(session, destination, messageNumber);
+            sendMessages(session, destination, count);
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/prefetch/PrefetchTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/prefetch/PrefetchTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/prefetch/PrefetchTest.java
new file mode 100644
index 0000000..728148e
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/prefetch/PrefetchTest.java
@@ -0,0 +1,254 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systests.jms_1_1.extensions.prefetch;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.apache.qpid.server.model.Protocol.AMQP_0_8;
+import static org.apache.qpid.server.model.Protocol.AMQP_0_9;
+import static org.apache.qpid.server.model.Protocol.AMQP_0_9_1;
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assume.assumeThat;
+
+import java.util.EnumSet;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.systests.Utils;
+
+public class PrefetchTest extends JmsTestBase
+{
+    private static final EnumSet<Protocol> PRE_010_PROTOCOLS = EnumSet.of(AMQP_0_8, AMQP_0_9, AMQP_0_9_1);
+
+    @Test
+    public void prefetch() throws Exception
+    {
+        Connection connection1 = getConnectionBuilder().setPrefetch(3).build();
+        Queue queue = createQueue(getTestName());
+        try
+        {
+            connection1.start();
+
+            final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer1 = session1.createConsumer(queue);
+
+            Utils.sendMessages(connection1, queue, 6);
+
+            final Message receivedMessage = consumer1.receive(getReceiveTimeout());
+            assertNotNull("First message was not received", receivedMessage);
+            assertEquals("Received message has unexpected index", 0, receivedMessage.getIntProperty(INDEX));
+
+            forceSync(session1);
+
+            observeNextAvailableMessage(queue, 4);
+        }
+        finally
+        {
+            connection1.close();
+        }
+    }
+
+    /**
+     * send two messages to the queue, consume and acknowledge one message on connection 1
+     * create a second connection and attempt to consume the second message - this will only be possible
+     * if the first connection has no prefetch
+     */
+    @Test
+    public void prefetchDisabled() throws Exception
+    {
+        Connection connection1 = getConnectionBuilder().setPrefetch(0).build();
+        Queue queue = createQueue(getTestName());
+        try
+        {
+            connection1.start();
+
+            final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer1 = session1.createConsumer(queue);
+
+            Utils.sendMessages(connection1, queue, 2);
+
+            final Message receivedMessage = consumer1.receive(getReceiveTimeout());
+            assertNotNull("First message was not received", receivedMessage);
+            assertEquals("Message property was not as expected", 0, receivedMessage.getIntProperty(INDEX));
+
+            observeNextAvailableMessage(queue, 1);
+        }
+        finally
+        {
+            connection1.close();
+        }
+    }
+
+    @Test
+    public void connectionStopReleasesPrefetchedMessages() throws Exception
+    {
+        assumeThat("Only 0-10 implements this feature", getProtocol(), is(equalTo(Protocol.AMQP_0_10)));
+
+        Connection connection1 = getConnectionBuilder().setPrefetch(3).build();
+        Queue queue = createQueue(getTestName());
+        try
+        {
+            connection1.start();
+
+            final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer1 = session1.createConsumer(queue);
+
+            Utils.sendMessages(connection1, queue, 6);
+
+            final Message receivedMessage = consumer1.receive(getReceiveTimeout());
+            assertNotNull("First message was not received", receivedMessage);
+            assertEquals("Received message has unexpected index", 0, receivedMessage.getIntProperty(INDEX));
+
+            forceSync(session1);
+
+            connection1.stop();
+
+            observeNextAvailableMessage(queue, 1);
+        }
+        finally
+        {
+            connection1.close();
+        }
+    }
+
+    @Test
+    public void consumerCloseReleasesPrefetchedMessages() throws Exception
+    {
+        assumeThat("Only 0-10 implements this feature", getProtocol(), is(equalTo(Protocol.AMQP_0_10)));
+
+        Connection connection1 = getConnectionBuilder().setPrefetch(3).build();
+        Queue queue = createQueue(getTestName());
+        try
+        {
+            connection1.start();
+
+            final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer1 = session1.createConsumer(queue);
+
+            Utils.sendMessages(connection1, queue, 6);
+
+            final Message receivedMessage = consumer1.receive(getReceiveTimeout());
+            assertNotNull("First message was not received", receivedMessage);
+            assertEquals("Received message has unexpected index", 0, receivedMessage.getIntProperty(INDEX));
+
+            forceSync(session1);
+
+            consumer1.close();
+
+            observeNextAvailableMessage(queue, 1);
+        }
+        finally
+        {
+            connection1.close();
+        }
+    }
+
+    @Test
+    public void consumeBeyondPrefetch() throws Exception
+    {
+        Connection connection1 = getConnectionBuilder().setPrefetch(1).build();
+        Queue queue = createQueue(getTestName());
+        try
+        {
+            connection1.start();
+
+            final Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer consumer1 = session1.createConsumer(queue);
+
+            Utils.sendMessages(connection1, queue, 5);
+
+            Message message = consumer1.receive(getReceiveTimeout());
+            assertNotNull(message);
+            assertEquals(0, message.getIntProperty(INDEX));
+
+            message = consumer1.receive(getReceiveTimeout());
+            assertNotNull(message);
+            assertEquals(1, message.getIntProperty(INDEX));
+            message = consumer1.receive(getReceiveTimeout());
+            assertNotNull(message);
+            assertEquals(2, message.getIntProperty(INDEX));
+
+            forceSync(session1);
+
+            // In pre 0-10, in a transaction session the client does not ack the message until the commit occurs
+            // so the message observed by another connection will have the index 3 rather than 4.
+            Connection connection2 = getConnection();
+            try
+            {
+                Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
+                MessageConsumer consumer2 = session2.createConsumer(queue);
+                connection2.start();
+
+                message = consumer2.receive(getReceiveTimeout());
+                assertNotNull(message);
+                assertEquals("Received message has unexpected index",
+                             PRE_010_PROTOCOLS.contains(getProtocol()) ? 3 : 4,
+                             message.getIntProperty(INDEX));
+
+                session2.rollback();
+            }
+            finally
+            {
+                connection2.close();
+            }
+        }
+        finally
+        {
+            connection1.close();
+        }
+    }
+
+    private void observeNextAvailableMessage(final Queue queue, final int expectedIndex) throws JMSException, NamingException
+    {
+        Connection connection2 = getConnection();
+        try
+        {
+            connection2.start();
+            final Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer2 = session2.createConsumer(queue);
+
+            final Message receivedMessage2 = consumer2.receive(getReceiveTimeout());
+            assertNotNull("Observer connection did not receive message", receivedMessage2);
+            assertEquals("Message received by the observer connection has unexpected index", expectedIndex, receivedMessage2.getIntProperty(INDEX));
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
+
+    private void forceSync(final Session session1) throws Exception
+    {
+        session1.createTemporaryQueue().delete();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
deleted file mode 100644
index 330012f..0000000
--- a/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.client.prefetch;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-
-public class PrefetchBehaviourTest extends QpidBrokerTestCase
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(PrefetchBehaviourTest.class);
-    private Connection _normalConnection;
-    private AtomicBoolean _exceptionCaught;
-    private CountDownLatch _processingStarted;
-    private CountDownLatch _processingCompleted;
-
-    @Override
-    protected void setUp() throws Exception
-    {
-        super.setUp();
-        _normalConnection = getConnection();
-        _exceptionCaught = new AtomicBoolean();
-        _processingStarted = new CountDownLatch(1);
-        _processingCompleted = new CountDownLatch(1);
-    }
-
-    /**
-     * Verifies that a slow processing asynchronous transacted consumer with prefetch=1 only
-     * gets 1 of the messages sent, with the second consumer picking up the others while the
-     * slow consumer is processing, i.e that prefetch=1 actually does what it says on the tin.
-     */
-    public void testPrefetchOneWithAsynchronousTransactedConsumer() throws Exception
-    {
-        final long processingTime = 5000;
-        
-        //create a second connection with prefetch set to 1
-        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
-        Connection prefetch1Connection = getConnection();
-
-        prefetch1Connection.start();
-        _normalConnection.start();
-
-        //create an asynchronous consumer with simulated slow processing
-        final Session prefetch1session = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED);
-        Queue queue = prefetch1session.createQueue(getTestQueueName());
-        MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue);
-        prefetch1consumer.setMessageListener(new MessageListener()
-        {
-            @Override
-            public void onMessage(Message message)
-            {
-                try
-                {
-                    LOGGER.debug("starting processing");
-                    _processingStarted.countDown();
-                    LOGGER.debug("processing started");
-
-                    //simulate message processing
-                    Thread.sleep(processingTime);
-
-                    prefetch1session.commit();
-
-                    _processingCompleted.countDown();
-                }
-                catch(Exception e)
-                {
-                    LOGGER.error("Exception caught in message listener");
-                    _exceptionCaught.set(true);
-                }
-            }
-        });
-
-        //create producer and send 5 messages
-        Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED);
-        MessageProducer producer = producerSession.createProducer(queue);
-
-        for (int i = 0; i < 5; i++)
-        {
-            producer.send(producerSession.createTextMessage("test"));
-        }
-        producerSession.commit();
-
-        //wait for the first message to start being processed by the async consumer
-        assertTrue("Async processing failed to start in allowed timeframe", _processingStarted.await(2000, TimeUnit.MILLISECONDS));
-        LOGGER.debug("proceeding with test");
-
-        //try to consumer the other messages with another consumer while the async procesisng occurs
-        Session normalSession = _normalConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer normalConsumer = normalSession.createConsumer(queue);        
-        
-        Message msg;
-        // Check that other consumer gets the other 4 messages
-        for (int i = 0; i < 4; i++)
-        {
-            msg = normalConsumer.receive(1500);
-            assertNotNull("Consumer should receive 4 messages",msg);                
-        }
-        msg = normalConsumer.receive(250);
-        assertNull("Consumer should not have received a 5th message",msg);
-
-        //wait for the other consumer to finish to ensure it completes ok
-        LOGGER.debug("waiting for async consumer to complete");
-        assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS));
-        assertFalse("Unexpected exception during async message processing",_exceptionCaught.get());
-    }
-
-    /**
-     * This test was originally known as AMQConnectionTest#testPrefetchSystemProperty.
-     *
-     */
-    public void testMessagesAreDistributedBetweenConsumersWithLowPrefetch() throws Exception
-    {
-        // This test is flaky. There is no guarantee that the messages have been sent to
-        // consumerA's prefetch buffer by the time consumerB calls receive().
-        Queue queue = getTestQueue();
-
-        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
-
-        Connection connection = getConnection();
-        connection.start();
-        // Create Consumer A
-        Session consSessA = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumerA = consSessA.createConsumer(queue);
-
-        // ensure message delivery to consumer A is started (required for 0-8..0-9-1)
-        final Message msg = consumerA.receiveNoWait();
-        assertNull(msg);
-
-        Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
-        sendMessage(producerSession, queue, 3);
-
-        // Create Consumer B
-        MessageConsumer consumerB = null;
-        if (isBroker010())
-        {
-            // 0-10 prefetch is per consumer so we create Consumer B on the same session as Consumer A
-            consumerB = consSessA.createConsumer(queue);
-        }
-        else
-        {
-            // 0-8..0-9-1 prefetch is per session so we create Consumer B on a separate session
-            Session consSessB = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            consumerB = consSessB.createConsumer(queue);
-        }
-
-        // As message delivery to consumer A is already started, the first two messages should
-        // now be with consumer A.  The last message will still be on the Broker as consumer A's
-        // credit is exhausted and message delivery for consumer B is not yet running.
-
-        // As described by QPID-3747, for 0-10 we *must* check Consumer B before Consumer A.
-        // If we were to reverse the order, the SessionComplete will restore Consumer A's credit,
-        // and the third message could be delivered to either Consumer A or Consumer B.
-
-        // Check that consumer B gets the last (third) message.
-        final Message msgConsumerB = consumerB.receive(1500);
-        assertNotNull("Consumer B should have received a message", msgConsumerB);
-        assertEquals("Consumer B received message with unexpected index", 2, msgConsumerB.getIntProperty(INDEX));
-
-        // Now check that consumer A has indeed got the first two messages.
-        for (int i = 0; i < 2; i++)
-        {
-            final Message msgConsumerA = consumerA.receive(1500);
-            assertNotNull("Consumer A should have received a message " + i, msgConsumerA);
-            assertEquals("Consumer A received message with unexpected index", i, msgConsumerA.getIntProperty(INDEX));
-        }
-    }
-
-    /**
-     * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer.
-     * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them.
-     *                Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue.
-     *                Try to receive all 10 messages.
-     */
-    public void testConnectionStop() throws Exception
-    {
-        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10");
-        Connection con = getConnection();
-        con.start();
-        Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}");
-
-        MessageProducer prod = ssn.createProducer(queue);
-        for (int i=0; i<10;i++)
-        {
-           prod.send(ssn.createTextMessage("Msg" + i));
-        }
-
-        MessageConsumer consumer = ssn.createConsumer(queue);
-        // This is to ensure we get the first client to prefetch.
-        Message msg = consumer.receive(1000);
-        assertNotNull("The first consumer should get one message",msg);
-        con.stop();
-
-        Connection con2 = getConnection();
-        con2.start();
-        Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer2 = ssn2.createConsumer(queue);
-        for (int i=0; i<9;i++)
-        {
-           TextMessage m = (TextMessage)consumer2.receive(1000);
-           assertNotNull("The second consumer should get 9 messages, but received only " + i,m);
-        }
-    }
-
-    public void testPrefetchWindowExpandsOnReceiveTransaction() throws Exception
-    {
-
-        _normalConnection.start();
-
-        //create a second connection with prefetch set to 1
-
-        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
-        Connection prefetch1Connection = getConnection();
-        Session consumerSession = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED);
-        MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getTestQueueName()));
-
-
-        Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED);
-        Queue queue = producerSession.createQueue(getTestQueueName());
-        MessageProducer producer = producerSession.createProducer(queue);
-
-        for (int i = 0; i < 5; i++)
-        {
-            producer.send(producerSession.createTextMessage("test"));
-        }
-        producerSession.commit();
-
-
-        prefetch1Connection.start();
-
-
-
-        Message message = consumer.receive(1000l);
-        assertNotNull(message);
-        message = consumer.receive(1000l);
-        assertNotNull(message);
-        message = consumer.receive(1000l);
-        assertNotNull(message);
-
-
-        Connection secondConsumerConnection = getConnection();
-        Session secondConsumerSession = secondConsumerConnection.createSession(true, Session.SESSION_TRANSACTED);
-        MessageConsumer secondConsumer = secondConsumerSession.createConsumer(consumerSession.createQueue(getTestQueueName()));
-        secondConsumerConnection.start();
-
-        message = secondConsumer.receive(1000l);
-        assertNotNull(message);
-
-        message = secondConsumer.receive(1000l);
-        assertNotNull(message);
-
-        consumerSession.commit();
-        secondConsumerSession.commit();
-
-        message = consumer.receive(1000l);
-        assertNull(message);
-
-    }
-
-
-}
-

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java b/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
deleted file mode 100644
index e313222..0000000
--- a/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.systest.prefetch;
-
-import java.util.UUID;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-public class ZeroPrefetchTest extends QpidBrokerTestCase
-{
-
-    private static final String TEST_PROPERTY_NAME = "testProp";
-
-    // send two messages to the queue, consume and acknowledge one message on connection 1
-    // create a second connection and attempt to consume the second message - this will only be possible
-    // if the first connection has no prefetch
-    public void testZeroPrefetch() throws Exception
-    {
-        Connection prefetch1Connection = getConnectionWithPrefetch(0);
-
-        prefetch1Connection.start();
-
-        final Session prefetch1session = prefetch1Connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = createTestQueue(prefetch1session);
-        MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue);
-
-
-        Session producerSession = prefetch1Connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = producerSession.createProducer(queue);
-        Message firstMessage = producerSession.createMessage();
-        String firstPropertyValue = UUID.randomUUID().toString();
-        firstMessage.setStringProperty(TEST_PROPERTY_NAME, firstPropertyValue);
-        producer.send(firstMessage);
-
-        Message secondMessage = producerSession.createMessage();
-        String secondPropertyValue = UUID.randomUUID().toString();
-        secondMessage.setStringProperty(TEST_PROPERTY_NAME, secondPropertyValue);
-        producer.send(secondMessage);
-
-
-        final Message receivedMessage = prefetch1consumer.receive(2000l);
-        assertNotNull("First message was not received", receivedMessage);
-        assertEquals("Message property was not as expected", firstPropertyValue, receivedMessage.getStringProperty(TEST_PROPERTY_NAME));
-
-        Connection prefetch2Connection = getConnectionWithPrefetch(0);
-
-        prefetch2Connection.start();
-        final Session prefetch2session = prefetch2Connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer prefetch2consumer = prefetch2session.createConsumer(queue);
-
-        final Message receivedMessage2 = prefetch2consumer.receive(2000l);
-        assertNotNull("Second message was not received", receivedMessage2);
-        assertEquals("Message property was not as expected", secondPropertyValue, receivedMessage2.getStringProperty(TEST_PROPERTY_NAME));
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 3c1ef49..dcfc337 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -167,8 +167,6 @@ org.apache.qpid.systest.MessageCompressionTest#*
 
 org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091
 
-org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction
-
 org.apache.qpid.server.queue.ArrivalTimeFilterTest#*
 org.apache.qpid.server.queue.EnsureNondestructiveConsumersTest#*
 org.apache.qpid.server.protocol.v0_8.*

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/test-profiles/Java010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java010Excludes b/test-profiles/Java010Excludes
index 4934dd3..d0448fa 100755
--- a/test-profiles/Java010Excludes
+++ b/test-profiles/Java010Excludes
@@ -55,8 +55,6 @@ org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange
 
 org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091
 
-org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction
-
 // QPID-6722: Race client side means that session close can end in exception when failover is in progress.
 org.apache.qpid.client.failover.FailoverBehaviourTest#testConnectionCloseInterruptsFailover
 org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testConnectionCloseInterruptsFailover

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/test-profiles/Java10UninvestigatedTestsExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10UninvestigatedTestsExcludes b/test-profiles/Java10UninvestigatedTestsExcludes
index 349fb71..4402248 100644
--- a/test-profiles/Java10UninvestigatedTestsExcludes
+++ b/test-profiles/Java10UninvestigatedTestsExcludes
@@ -20,9 +20,7 @@
 // This file should eventually be removed as all the systests are moved to either
 // working, defined as broken, or excluded as they test version specific functionality
 
-org.apache.qpid.client.prefetch.PrefetchBehaviourTest#*
-
-QPID-XXXX: It could be a broker bug. The issue requires further inevestigation
+QPID-XXXX: It could be a broker bug. The issue requires further investigation
 org.apache.qpid.systest.AnonymousProducerTest#testPublishIntoNonExistingQueue
 org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicyMessageDepth
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/test-profiles/JavaPre010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaPre010Excludes b/test-profiles/JavaPre010Excludes
index d1883a9..d59fab3 100644
--- a/test-profiles/JavaPre010Excludes
+++ b/test-profiles/JavaPre010Excludes
@@ -55,9 +55,6 @@ org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnsupportedSASLMe
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForSameUser
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForDifferentUsers
 
-// QPID-3604 This fix is applied only to the 0-10 code, hence this test does not work for pre 0-10.
-org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testConnectionStop
-
 // QPID-3396
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testExceptionWhenUserPassIsRequired
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org