You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/12/26 20:30:47 UTC
[1/2] qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP 0-x] Add
protocol tests for basic.qos and channel.flow
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 8e78fbe60 -> 37365f918
QPID-8038: [Broker-J] [AMQP 0-x] Add protocol tests for basic.qos and channel.flow
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/988006b3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/988006b3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/988006b3
Branch: refs/heads/master
Commit: 988006b38d8dd10914357779966731dc1aa90530
Parents: 8e78fbe
Author: Keith Wall <ke...@gmail.com>
Authored: Mon Dec 25 17:35:12 2017 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Mon Dec 25 17:36:58 2017 +0000
----------------------------------------------------------------------
.../tests/protocol/v0_8/BasicInteraction.java | 6 +
.../qpid/tests/protocol/v0_8/BasicTest.java | 132 +++++++++++++++++++
2 files changed, 138 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/988006b3/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
index 2c1d1b3..c004e38 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -243,6 +243,12 @@ public class BasicInteraction
return _interaction.sendPerformative(new BasicAckBody(_ackDeliveryTag, _ackMultiple));
}
+ public BasicInteraction ackMultiple(final boolean multiple)
+ {
+ _ackMultiple = multiple;
+ return this;
+ }
+
public BasicInteraction ackDeliveryTag(final long deliveryTag)
{
_ackDeliveryTag = deliveryTag;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/988006b3/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
index 79988c0..f1300fb 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -529,6 +529,138 @@ public class BasicTest extends BrokerAdminUsingTestBase
}
}
+ /**
+ * The Qpid JMS AMQP 0-x client relies on being able to raise and lower qos count during a channels lifetime
+ * to prevent channel starvation. This test supports this qos use-case.
+ */
+ @Test
+ public void qosCountResized() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "A", "B", "C", "D", "E", "F");
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ String consumerTag = "A";
+
+ interaction.openAnonymousConnection()
+ .channel().open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .channel().flow(true)
+ .consumeResponse(ChannelFlowOkBody.class)
+ .basic().qosPrefetchCount(3)
+ .qos()
+ .consumeResponse(BasicQosOkBody.class)
+ .basic().consumeConsumerTag(consumerTag)
+ .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .consume()
+ .consumeResponse(BasicConsumeOkBody.class);
+
+ final long deliveryTagA = receiveDeliveryHeaderAndBody(interaction, "A");
+ receiveDeliveryHeaderAndBody(interaction, "B");
+ final long deliveryTagC = receiveDeliveryHeaderAndBody(interaction, "C");
+
+ ensureSync(interaction);
+
+ // Raise qos count by one, expect D to arrive
+ interaction.basic().qosPrefetchCount(4).qos()
+ .consumeResponse(BasicQosOkBody.class);
+
+ long deliveryTagD = receiveDeliveryHeaderAndBody(interaction, "D");
+ ensureSync(interaction);
+
+ // Ack A, expect E to arrive
+ interaction.basic().ackDeliveryTag(deliveryTagA).ack();
+
+ receiveDeliveryHeaderAndBody(interaction, "E");
+ ensureSync(interaction);
+
+ // Lower qos back to 2 and ensure no more messages arrive (message credit will be negative at this point).
+ interaction.basic().qosPrefetchCount(2).qos()
+ .consumeResponse(BasicQosOkBody.class);
+ ensureSync(interaction);
+
+ // Ack B and C and ensure still no more messages arrive (message credit will now be zero)
+ interaction.basic()
+ .ackMultiple(true).ackDeliveryTag(deliveryTagC).ack();
+ ensureSync(interaction);
+
+ // Ack D and ensure F delivery arrives
+ interaction.basic()
+ .ackMultiple(false).ackDeliveryTag(deliveryTagD).ack();
+
+ receiveDeliveryHeaderAndBody(interaction, "F");
+
+ interaction.channel().close().consumeResponse(ChannelCloseOkBody.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));
+ }
+ }
+
+ /**
+ * The Qpid JMS AMQP 0-x client is capable of polling fors message. It does this using a combination of
+ * basic.qos (count one) and regulating the flow using channel.flow. This test supports this use-case.
+ */
+ @Test
+ public void pollingUsingFlow() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "A", "B", "C");
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ String consumerTag = "A";
+
+ interaction.openAnonymousConnection()
+ .channel().open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .basic().qosPrefetchCount(1)
+ .qos()
+ .consumeResponse(BasicQosOkBody.class)
+ .channel().flow(false)
+ .consumeResponse(ChannelFlowOkBody.class)
+ .basic().consumeConsumerTag(consumerTag)
+ .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .consume()
+ .consumeResponse(BasicConsumeOkBody.class);
+
+ ensureSync(interaction);
+
+ interaction.channel().flow(true)
+ .consumeResponse(ChannelFlowOkBody.class);
+
+ long deliveryTagA = receiveDeliveryHeaderAndBody(interaction, "A");
+
+ interaction.channel().flow(false)
+ .consumeResponse(ChannelFlowOkBody.class)
+ .basic().ackDeliveryTag(deliveryTagA).ack();
+
+ ensureSync(interaction);
+
+ interaction.channel().flow(true)
+ .consumeResponse(ChannelFlowOkBody.class);
+
+ long deliveryTagB = receiveDeliveryHeaderAndBody(interaction, "B");
+
+ interaction.channel().flow(false)
+ .consumeResponse(ChannelFlowOkBody.class)
+ .basic().ackDeliveryTag(deliveryTagB).ack()
+ .channel().close().consumeResponse(ChannelCloseOkBody.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+ }
+ }
+
+ private long receiveDeliveryHeaderAndBody(final Interaction interaction, String expectedMessageContent) throws Exception
+ {
+ BasicDeliverBody delivery = interaction.consumeResponse().getLatestResponse(BasicDeliverBody.class);
+ ContentBody content = interaction.consumeResponse(ContentHeaderBody.class)
+ .consumeResponse().getLatestResponse(ContentBody.class);
+
+ assertThat(getContent(content), is(equalTo(expectedMessageContent)));
+ return delivery.getDeliveryTag();
+ }
+
private void ensureSync(final Interaction interaction) throws Exception
{
interaction.exchange()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-broker-j git commit: QPID-6933: [System Tests] Refactor
Prefetch tests
Posted by kw...@apache.org.
QPID-6933: [System Tests] Refactor Prefetch tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/37365f91
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/37365f91
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/37365f91
Branch: refs/heads/master
Commit: 37365f91801ffbf070f0580fa7fda7ec91f66fd0
Parents: 988006b
Author: Keith Wall <ke...@gmail.com>
Authored: Mon Dec 25 17:36:14 2017 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Tue Dec 26 18:11:18 2017 +0000
----------------------------------------------------------------------
.../java/org/apache/qpid/systests/Utils.java | 4 +-
.../extensions/prefetch/PrefetchTest.java | 254 ++++++++++++++++
.../client/prefetch/PrefetchBehaviourTest.java | 297 -------------------
.../qpid/systest/prefetch/ZeroPrefetchTest.java | 81 -----
test-profiles/CPPExcludes | 2 -
test-profiles/Java010Excludes | 2 -
test-profiles/Java10UninvestigatedTestsExcludes | 4 +-
test-profiles/JavaPre010Excludes | 3 -
8 files changed, 257 insertions(+), 390 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
index c2dfe75..01e75f1 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/Utils.java
@@ -52,13 +52,13 @@ public class Utils
}
}
- public static void sendMessages(final Connection connection, final Destination destination, final int messageNumber)
+ public static void sendMessages(final Connection connection, final Destination destination, final int count)
throws JMSException
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
- sendMessages(session, destination, messageNumber);
+ sendMessages(session, destination, count);
}
finally
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/prefetch/PrefetchTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/prefetch/PrefetchTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/prefetch/PrefetchTest.java
new file mode 100644
index 0000000..728148e
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/prefetch/PrefetchTest.java
@@ -0,0 +1,254 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systests.jms_1_1.extensions.prefetch;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.apache.qpid.server.model.Protocol.AMQP_0_8;
+import static org.apache.qpid.server.model.Protocol.AMQP_0_9;
+import static org.apache.qpid.server.model.Protocol.AMQP_0_9_1;
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assume.assumeThat;
+
+import java.util.EnumSet;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.systests.Utils;
+
+public class PrefetchTest extends JmsTestBase
+{
+ private static final EnumSet<Protocol> PRE_010_PROTOCOLS = EnumSet.of(AMQP_0_8, AMQP_0_9, AMQP_0_9_1);
+
+ @Test
+ public void prefetch() throws Exception
+ {
+ Connection connection1 = getConnectionBuilder().setPrefetch(3).build();
+ Queue queue = createQueue(getTestName());
+ try
+ {
+ connection1.start();
+
+ final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer1 = session1.createConsumer(queue);
+
+ Utils.sendMessages(connection1, queue, 6);
+
+ final Message receivedMessage = consumer1.receive(getReceiveTimeout());
+ assertNotNull("First message was not received", receivedMessage);
+ assertEquals("Received message has unexpected index", 0, receivedMessage.getIntProperty(INDEX));
+
+ forceSync(session1);
+
+ observeNextAvailableMessage(queue, 4);
+ }
+ finally
+ {
+ connection1.close();
+ }
+ }
+
+ /**
+ * send two messages to the queue, consume and acknowledge one message on connection 1
+ * create a second connection and attempt to consume the second message - this will only be possible
+ * if the first connection has no prefetch
+ */
+ @Test
+ public void prefetchDisabled() throws Exception
+ {
+ Connection connection1 = getConnectionBuilder().setPrefetch(0).build();
+ Queue queue = createQueue(getTestName());
+ try
+ {
+ connection1.start();
+
+ final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer1 = session1.createConsumer(queue);
+
+ Utils.sendMessages(connection1, queue, 2);
+
+ final Message receivedMessage = consumer1.receive(getReceiveTimeout());
+ assertNotNull("First message was not received", receivedMessage);
+ assertEquals("Message property was not as expected", 0, receivedMessage.getIntProperty(INDEX));
+
+ observeNextAvailableMessage(queue, 1);
+ }
+ finally
+ {
+ connection1.close();
+ }
+ }
+
+ @Test
+ public void connectionStopReleasesPrefetchedMessages() throws Exception
+ {
+ assumeThat("Only 0-10 implements this feature", getProtocol(), is(equalTo(Protocol.AMQP_0_10)));
+
+ Connection connection1 = getConnectionBuilder().setPrefetch(3).build();
+ Queue queue = createQueue(getTestName());
+ try
+ {
+ connection1.start();
+
+ final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer1 = session1.createConsumer(queue);
+
+ Utils.sendMessages(connection1, queue, 6);
+
+ final Message receivedMessage = consumer1.receive(getReceiveTimeout());
+ assertNotNull("First message was not received", receivedMessage);
+ assertEquals("Received message has unexpected index", 0, receivedMessage.getIntProperty(INDEX));
+
+ forceSync(session1);
+
+ connection1.stop();
+
+ observeNextAvailableMessage(queue, 1);
+ }
+ finally
+ {
+ connection1.close();
+ }
+ }
+
+ @Test
+ public void consumerCloseReleasesPrefetchedMessages() throws Exception
+ {
+ assumeThat("Only 0-10 implements this feature", getProtocol(), is(equalTo(Protocol.AMQP_0_10)));
+
+ Connection connection1 = getConnectionBuilder().setPrefetch(3).build();
+ Queue queue = createQueue(getTestName());
+ try
+ {
+ connection1.start();
+
+ final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer1 = session1.createConsumer(queue);
+
+ Utils.sendMessages(connection1, queue, 6);
+
+ final Message receivedMessage = consumer1.receive(getReceiveTimeout());
+ assertNotNull("First message was not received", receivedMessage);
+ assertEquals("Received message has unexpected index", 0, receivedMessage.getIntProperty(INDEX));
+
+ forceSync(session1);
+
+ consumer1.close();
+
+ observeNextAvailableMessage(queue, 1);
+ }
+ finally
+ {
+ connection1.close();
+ }
+ }
+
+ @Test
+ public void consumeBeyondPrefetch() throws Exception
+ {
+ Connection connection1 = getConnectionBuilder().setPrefetch(1).build();
+ Queue queue = createQueue(getTestName());
+ try
+ {
+ connection1.start();
+
+ final Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer1 = session1.createConsumer(queue);
+
+ Utils.sendMessages(connection1, queue, 5);
+
+ Message message = consumer1.receive(getReceiveTimeout());
+ assertNotNull(message);
+ assertEquals(0, message.getIntProperty(INDEX));
+
+ message = consumer1.receive(getReceiveTimeout());
+ assertNotNull(message);
+ assertEquals(1, message.getIntProperty(INDEX));
+ message = consumer1.receive(getReceiveTimeout());
+ assertNotNull(message);
+ assertEquals(2, message.getIntProperty(INDEX));
+
+ forceSync(session1);
+
+ // In pre 0-10, in a transaction session the client does not ack the message until the commit occurs
+ // so the message observed by another connection will have the index 3 rather than 4.
+ Connection connection2 = getConnection();
+ try
+ {
+ Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer2 = session2.createConsumer(queue);
+ connection2.start();
+
+ message = consumer2.receive(getReceiveTimeout());
+ assertNotNull(message);
+ assertEquals("Received message has unexpected index",
+ PRE_010_PROTOCOLS.contains(getProtocol()) ? 3 : 4,
+ message.getIntProperty(INDEX));
+
+ session2.rollback();
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+ finally
+ {
+ connection1.close();
+ }
+ }
+
+ private void observeNextAvailableMessage(final Queue queue, final int expectedIndex) throws JMSException, NamingException
+ {
+ Connection connection2 = getConnection();
+ try
+ {
+ connection2.start();
+ final Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createConsumer(queue);
+
+ final Message receivedMessage2 = consumer2.receive(getReceiveTimeout());
+ assertNotNull("Observer connection did not receive message", receivedMessage2);
+ assertEquals("Message received by the observer connection has unexpected index", expectedIndex, receivedMessage2.getIntProperty(INDEX));
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+
+ private void forceSync(final Session session1) throws Exception
+ {
+ session1.createTemporaryQueue().delete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
deleted file mode 100644
index 330012f..0000000
--- a/systests/src/test/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.client.prefetch;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-
-public class PrefetchBehaviourTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(PrefetchBehaviourTest.class);
- private Connection _normalConnection;
- private AtomicBoolean _exceptionCaught;
- private CountDownLatch _processingStarted;
- private CountDownLatch _processingCompleted;
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- _normalConnection = getConnection();
- _exceptionCaught = new AtomicBoolean();
- _processingStarted = new CountDownLatch(1);
- _processingCompleted = new CountDownLatch(1);
- }
-
- /**
- * Verifies that a slow processing asynchronous transacted consumer with prefetch=1 only
- * gets 1 of the messages sent, with the second consumer picking up the others while the
- * slow consumer is processing, i.e that prefetch=1 actually does what it says on the tin.
- */
- public void testPrefetchOneWithAsynchronousTransactedConsumer() throws Exception
- {
- final long processingTime = 5000;
-
- //create a second connection with prefetch set to 1
- setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
- Connection prefetch1Connection = getConnection();
-
- prefetch1Connection.start();
- _normalConnection.start();
-
- //create an asynchronous consumer with simulated slow processing
- final Session prefetch1session = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = prefetch1session.createQueue(getTestQueueName());
- MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue);
- prefetch1consumer.setMessageListener(new MessageListener()
- {
- @Override
- public void onMessage(Message message)
- {
- try
- {
- LOGGER.debug("starting processing");
- _processingStarted.countDown();
- LOGGER.debug("processing started");
-
- //simulate message processing
- Thread.sleep(processingTime);
-
- prefetch1session.commit();
-
- _processingCompleted.countDown();
- }
- catch(Exception e)
- {
- LOGGER.error("Exception caught in message listener");
- _exceptionCaught.set(true);
- }
- }
- });
-
- //create producer and send 5 messages
- Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = producerSession.createProducer(queue);
-
- for (int i = 0; i < 5; i++)
- {
- producer.send(producerSession.createTextMessage("test"));
- }
- producerSession.commit();
-
- //wait for the first message to start being processed by the async consumer
- assertTrue("Async processing failed to start in allowed timeframe", _processingStarted.await(2000, TimeUnit.MILLISECONDS));
- LOGGER.debug("proceeding with test");
-
- //try to consumer the other messages with another consumer while the async procesisng occurs
- Session normalSession = _normalConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer normalConsumer = normalSession.createConsumer(queue);
-
- Message msg;
- // Check that other consumer gets the other 4 messages
- for (int i = 0; i < 4; i++)
- {
- msg = normalConsumer.receive(1500);
- assertNotNull("Consumer should receive 4 messages",msg);
- }
- msg = normalConsumer.receive(250);
- assertNull("Consumer should not have received a 5th message",msg);
-
- //wait for the other consumer to finish to ensure it completes ok
- LOGGER.debug("waiting for async consumer to complete");
- assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS));
- assertFalse("Unexpected exception during async message processing",_exceptionCaught.get());
- }
-
- /**
- * This test was originally known as AMQConnectionTest#testPrefetchSystemProperty.
- *
- */
- public void testMessagesAreDistributedBetweenConsumersWithLowPrefetch() throws Exception
- {
- // This test is flaky. There is no guarantee that the messages have been sent to
- // consumerA's prefetch buffer by the time consumerB calls receive().
- Queue queue = getTestQueue();
-
- setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());
-
- Connection connection = getConnection();
- connection.start();
- // Create Consumer A
- Session consSessA = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumerA = consSessA.createConsumer(queue);
-
- // ensure message delivery to consumer A is started (required for 0-8..0-9-1)
- final Message msg = consumerA.receiveNoWait();
- assertNull(msg);
-
- Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
- sendMessage(producerSession, queue, 3);
-
- // Create Consumer B
- MessageConsumer consumerB = null;
- if (isBroker010())
- {
- // 0-10 prefetch is per consumer so we create Consumer B on the same session as Consumer A
- consumerB = consSessA.createConsumer(queue);
- }
- else
- {
- // 0-8..0-9-1 prefetch is per session so we create Consumer B on a separate session
- Session consSessB = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumerB = consSessB.createConsumer(queue);
- }
-
- // As message delivery to consumer A is already started, the first two messages should
- // now be with consumer A. The last message will still be on the Broker as consumer A's
- // credit is exhausted and message delivery for consumer B is not yet running.
-
- // As described by QPID-3747, for 0-10 we *must* check Consumer B before Consumer A.
- // If we were to reverse the order, the SessionComplete will restore Consumer A's credit,
- // and the third message could be delivered to either Consumer A or Consumer B.
-
- // Check that consumer B gets the last (third) message.
- final Message msgConsumerB = consumerB.receive(1500);
- assertNotNull("Consumer B should have received a message", msgConsumerB);
- assertEquals("Consumer B received message with unexpected index", 2, msgConsumerB.getIntProperty(INDEX));
-
- // Now check that consumer A has indeed got the first two messages.
- for (int i = 0; i < 2; i++)
- {
- final Message msgConsumerA = consumerA.receive(1500);
- assertNotNull("Consumer A should have received a message " + i, msgConsumerA);
- assertEquals("Consumer A received message with unexpected index", i, msgConsumerA.getIntProperty(INDEX));
- }
- }
-
- /**
- * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer.
- * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them.
- * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue.
- * Try to receive all 10 messages.
- */
- public void testConnectionStop() throws Exception
- {
- setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10");
- Connection con = getConnection();
- con.start();
- Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}");
-
- MessageProducer prod = ssn.createProducer(queue);
- for (int i=0; i<10;i++)
- {
- prod.send(ssn.createTextMessage("Msg" + i));
- }
-
- MessageConsumer consumer = ssn.createConsumer(queue);
- // This is to ensure we get the first client to prefetch.
- Message msg = consumer.receive(1000);
- assertNotNull("The first consumer should get one message",msg);
- con.stop();
-
- Connection con2 = getConnection();
- con2.start();
- Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer2 = ssn2.createConsumer(queue);
- for (int i=0; i<9;i++)
- {
- TextMessage m = (TextMessage)consumer2.receive(1000);
- assertNotNull("The second consumer should get 9 messages, but received only " + i,m);
- }
- }
-
- public void testPrefetchWindowExpandsOnReceiveTransaction() throws Exception
- {
-
- _normalConnection.start();
-
- //create a second connection with prefetch set to 1
-
- setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
- Connection prefetch1Connection = getConnection();
- Session consumerSession = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getTestQueueName()));
-
-
- Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = producerSession.createQueue(getTestQueueName());
- MessageProducer producer = producerSession.createProducer(queue);
-
- for (int i = 0; i < 5; i++)
- {
- producer.send(producerSession.createTextMessage("test"));
- }
- producerSession.commit();
-
-
- prefetch1Connection.start();
-
-
-
- Message message = consumer.receive(1000l);
- assertNotNull(message);
- message = consumer.receive(1000l);
- assertNotNull(message);
- message = consumer.receive(1000l);
- assertNotNull(message);
-
-
- Connection secondConsumerConnection = getConnection();
- Session secondConsumerSession = secondConsumerConnection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer secondConsumer = secondConsumerSession.createConsumer(consumerSession.createQueue(getTestQueueName()));
- secondConsumerConnection.start();
-
- message = secondConsumer.receive(1000l);
- assertNotNull(message);
-
- message = secondConsumer.receive(1000l);
- assertNotNull(message);
-
- consumerSession.commit();
- secondConsumerSession.commit();
-
- message = consumer.receive(1000l);
- assertNull(message);
-
- }
-
-
-}
-
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java b/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
deleted file mode 100644
index e313222..0000000
--- a/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.systest.prefetch;
-
-import java.util.UUID;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-public class ZeroPrefetchTest extends QpidBrokerTestCase
-{
-
- private static final String TEST_PROPERTY_NAME = "testProp";
-
- // send two messages to the queue, consume and acknowledge one message on connection 1
- // create a second connection and attempt to consume the second message - this will only be possible
- // if the first connection has no prefetch
- public void testZeroPrefetch() throws Exception
- {
- Connection prefetch1Connection = getConnectionWithPrefetch(0);
-
- prefetch1Connection.start();
-
- final Session prefetch1session = prefetch1Connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = createTestQueue(prefetch1session);
- MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue);
-
-
- Session producerSession = prefetch1Connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
- Message firstMessage = producerSession.createMessage();
- String firstPropertyValue = UUID.randomUUID().toString();
- firstMessage.setStringProperty(TEST_PROPERTY_NAME, firstPropertyValue);
- producer.send(firstMessage);
-
- Message secondMessage = producerSession.createMessage();
- String secondPropertyValue = UUID.randomUUID().toString();
- secondMessage.setStringProperty(TEST_PROPERTY_NAME, secondPropertyValue);
- producer.send(secondMessage);
-
-
- final Message receivedMessage = prefetch1consumer.receive(2000l);
- assertNotNull("First message was not received", receivedMessage);
- assertEquals("Message property was not as expected", firstPropertyValue, receivedMessage.getStringProperty(TEST_PROPERTY_NAME));
-
- Connection prefetch2Connection = getConnectionWithPrefetch(0);
-
- prefetch2Connection.start();
- final Session prefetch2session = prefetch2Connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer prefetch2consumer = prefetch2session.createConsumer(queue);
-
- final Message receivedMessage2 = prefetch2consumer.receive(2000l);
- assertNotNull("Second message was not received", receivedMessage2);
- assertEquals("Message property was not as expected", secondPropertyValue, receivedMessage2.getStringProperty(TEST_PROPERTY_NAME));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 3c1ef49..dcfc337 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -167,8 +167,6 @@ org.apache.qpid.systest.MessageCompressionTest#*
org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091
-org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction
-
org.apache.qpid.server.queue.ArrivalTimeFilterTest#*
org.apache.qpid.server.queue.EnsureNondestructiveConsumersTest#*
org.apache.qpid.server.protocol.v0_8.*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/test-profiles/Java010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java010Excludes b/test-profiles/Java010Excludes
index 4934dd3..d0448fa 100755
--- a/test-profiles/Java010Excludes
+++ b/test-profiles/Java010Excludes
@@ -55,8 +55,6 @@ org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange
org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091
-org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testPrefetchWindowExpandsOnReceiveTransaction
-
// QPID-6722: Race client side means that session close can end in exception when failover is in progress.
org.apache.qpid.client.failover.FailoverBehaviourTest#testConnectionCloseInterruptsFailover
org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testConnectionCloseInterruptsFailover
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/test-profiles/Java10UninvestigatedTestsExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10UninvestigatedTestsExcludes b/test-profiles/Java10UninvestigatedTestsExcludes
index 349fb71..4402248 100644
--- a/test-profiles/Java10UninvestigatedTestsExcludes
+++ b/test-profiles/Java10UninvestigatedTestsExcludes
@@ -20,9 +20,7 @@
// This file should eventually be removed as all the systests are moved to either
// working, defined as broken, or excluded as they test version specific functionality
-org.apache.qpid.client.prefetch.PrefetchBehaviourTest#*
-
-QPID-XXXX: It could be a broker bug. The issue requires further inevestigation
+QPID-XXXX: It could be a broker bug. The issue requires further investigation
org.apache.qpid.systest.AnonymousProducerTest#testPublishIntoNonExistingQueue
org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicyMessageDepth
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37365f91/test-profiles/JavaPre010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaPre010Excludes b/test-profiles/JavaPre010Excludes
index d1883a9..d59fab3 100644
--- a/test-profiles/JavaPre010Excludes
+++ b/test-profiles/JavaPre010Excludes
@@ -55,9 +55,6 @@ org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnsupportedSASLMe
org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForSameUser
org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerificationForDifferentUsers
-// QPID-3604 This fix is applied only to the 0-10 code, hence this test does not work for pre 0-10.
-org.apache.qpid.client.prefetch.PrefetchBehaviourTest#testConnectionStop
-
// QPID-3396
org.apache.qpid.test.unit.client.connection.ConnectionTest#testExceptionWhenUserPassIsRequired
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org