You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/12/09 12:56:42 UTC
svn commit: r1773369 - in /qpid/java/trunk:
systests/src/test/java/org/apache/qpid/server/queue/
systests/src/test/java/org/apache/qpid/test/client/ test-profiles/
Author: lquack
Date: Fri Dec 9 12:56:41 2016
New Revision: 1773369
URL: http://svn.apache.org/viewvc?rev=1773369&view=rev
Log:
QPID-7546: fix producer flow control test and move test for client flow control behaviour into separate test class
Added:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ConsumerFlowControlTest.java
- copied, changed from r1773368, qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ProducerFlowControlTest.java
Removed:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
qpid/java/trunk/test-profiles/CPPExcludes
qpid/java/trunk/test-profiles/Java010Excludes
qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes
qpid/java/trunk/test-profiles/Java10Excludes
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=1773369&r1=1773368&r2=1773369&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java Fri Dec 9 12:56:41 2016
@@ -53,8 +53,6 @@ public class ProducerFlowControlTest ext
{
private static final Logger _logger = LoggerFactory.getLogger(ProducerFlowControlTest.class);
- private static final int TIMEOUT = 10000;
-
private Connection _producerConnection;
private Connection _consumerConnection;
private Session _producerSession;
@@ -163,32 +161,6 @@ public class ProducerFlowControlTest ext
assertEquals("Did not find correct number of UNDERFULL queue underfull messages", 1, results.size());
}
-
- public void testClientLogMessages() throws Exception
- {
- String queueName = getTestQueueName();
-
- setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
- setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
-
- Session session = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- createAndBindQueueWithFlowControlEnabled(session, queueName, 1000, 800);
- _producer = session.createProducer(_queue);
-
- // try to send 5 messages (should block after 4)
- MessageSender sender = sendMessagesAsync(_producer, session, 5, 50L);
-
- List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT);
- assertTrue("No delay messages logged by client",results.size()!=0);
-
- List<String> failedMessages = waitAndFindMatches("Message send failed due to timeout waiting on broker enforced"
- + " flow control", TIMEOUT);
- assertEquals("Incorrect number of send failure messages logged by client (got " + results.size() + " delay "
- + "messages)",1,failedMessages.size());
- }
-
-
public void testFlowControlOnCapacityResumeEqual() throws Exception
{
String queueName = getTestQueueName();
@@ -266,30 +238,6 @@ public class ProducerFlowControlTest ext
}
- public void testSendTimeout() throws Exception
- {
- String queueName = getTestQueueName();
- final String expectedMsg = isBroker010() ? "Exception when sending message:timed out waiting for message credit"
- : "Unable to send message for 3 seconds due to broker enforced flow control";
-
- setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
- Session session = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 1000, 800);
- _producer = session.createProducer(_queue);
-
- // try to send 5 messages (should block after 4)
- MessageSender sender = sendMessagesAsync(_producer, session, 5, 100L);
-
- Exception e = sender.awaitSenderException(10000);
-
- assertNotNull("No timeout exception on sending", e);
-
-
- assertEquals("Unexpected exception reason", expectedMsg, e.getMessage());
-
- }
-
public void testFlowControlAttributeModificationViaREST() throws Exception
{
String queueName = getTestQueueName();
@@ -305,11 +253,11 @@ public class ProducerFlowControlTest ext
((Number) queueAttributes.get(org.apache.qpid.server.model.Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)).intValue());
assertEquals("FlowResumeCapacity was not the expected value", 0,
((Number) queueAttributes.get(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)).intValue());
-
+
//set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent
setFlowLimits(queueUrl, 250, 250);
assertFalse("Queue should not be overfull", isFlowStopped(queueUrl));
-
+
// try to send 2 messages (should block after 1)
sendMessagesAsync(_producer, _producerSession, 2, 50L);
@@ -318,9 +266,10 @@ public class ProducerFlowControlTest ext
//check only 1 message was sent, and queue is overfull
assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get());
assertTrue("Queue should be overfull", isFlowStopped(queueUrl));
-
+
+ int queueDepthBytes = getQueueDepthBytes(queueName);
//raise the attribute values, causing the queue to become underfull and allow the second message to be sent.
- setFlowLimits(queueUrl, 300, 300);
+ setFlowLimits(queueUrl, queueDepthBytes + 200, queueDepthBytes);
waitForFlowControlAndMessageCount(queueUrl, 2, 2000);
@@ -329,7 +278,7 @@ public class ProducerFlowControlTest ext
assertTrue("Queue should be overfull", isFlowStopped(queueUrl));
//raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity still exceeded
- setFlowLimits(queueUrl, 700, 300);
+ setFlowLimits(queueUrl, 2 * queueDepthBytes + 100, queueDepthBytes);
assertTrue("Queue should be overfull", isFlowStopped(queueUrl));
//receive a message, check queue becomes underfull
@@ -337,7 +286,7 @@ public class ProducerFlowControlTest ext
_consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
- _consumer.receive();
+ assertNotNull("Should have received first message", _consumer.receive(RECEIVE_TIMEOUT));
if(!isBroker10())
{
@@ -345,9 +294,17 @@ public class ProducerFlowControlTest ext
((AMQSession<?, ?>) _consumerSession).sync();
}
- assertFalse("Queue should not be overfull", isFlowStopped(queueUrl));
+ _restTestHelper.waitForAttributeChanged(queueUrl, org.apache.qpid.server.model.Queue.QUEUE_FLOW_STOPPED, false);
- _consumer.receive();
+ assertNotNull("Should have received second message", _consumer.receive(RECEIVE_TIMEOUT));
+ }
+
+ private int getQueueDepthBytes(final String queueName) throws IOException
+ {
+ // On AMQP 1.0 the size of the message on the broker is not necessarily the size of the message we sent. Therefore, get the actual size from the broker
+ final String requestUrl = String.format("queue/%1$s/%1$s/%2$s/getStatistics?statistics=[\"queueDepthBytes\"]", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
+ final Map<String, Object> queueAttributes = _restTestHelper.getJsonAsMap(requestUrl);
+ return ((Number) queueAttributes.get("queueDepthBytes")).intValue();
}
private void waitForFlowControlAndMessageCount(final String queueUrl, final int messageCount, final int timeout) throws InterruptedException, IOException
@@ -395,14 +352,13 @@ public class ProducerFlowControlTest ext
{
// delete queue with a consumer session
((AMQSession<?, ?>) _consumerSession).sendQueueDelete(queueName);
-
- _consumer = _consumerSession.createConsumer(_queue);
}
else
{
deleteEntityUsingAmqpManagement(getTestQueueName(), _consumerSession, "org.apache.qpid.Queue");
createTestQueue(_consumerSession);
}
+ _consumer = _consumerSession.createConsumer(_queue);
_consumerConnection.start();
Message message = _consumer.receive(1000l);
@@ -455,60 +411,6 @@ public class ProducerFlowControlTest ext
return sender;
}
- private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
- throws JMSException
- {
-
- for (int msg = 0; msg < numMessages; msg++)
- {
- producer.send(nextMessage(msg, producerSession));
- _sentMessages.incrementAndGet();
-
-
- try
- {
- if(!isBroker10())
- {
- ((AMQSession<?,?>)producerSession).sync();
- // TODO: sync a second time in order to ensure that the client has received the flow command
- // before continuing with the next message. This is required because the Broker may legally
- // send the flow command after the sync response. By sync'ing a second time we ensure that
- // the client will has seen/acted on the flow command. The test really ought not have this
- // level of information.
- ((AMQSession<?,?>)producerSession).sync();
- }
- else
- {
- producerSession.createTemporaryQueue().delete();
- }
- }
- catch (QpidException e)
- {
- _logger.error("Error performing sync", e);
- throw new RuntimeException(e);
- }
-
- try
- {
- Thread.sleep(sleepPeriod);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- private static final byte[] BYTE_300 = new byte[300];
-
- private Message nextMessage(int msg, Session producerSession) throws JMSException
- {
- BytesMessage send = producerSession.createBytesMessage();
- send.writeBytes(BYTE_300);
- send.setIntProperty("msg", msg);
-
- return send;
- }
private class MessageSender implements Runnable
{
@@ -540,10 +442,59 @@ public class ProducerFlowControlTest ext
}
}
- public Exception awaitSenderException(long timeout) throws InterruptedException
+ private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+ throws JMSException
+ {
+
+ for (int msg = 0; msg < numMessages; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ _sentMessages.incrementAndGet();
+
+
+ try
+ {
+ if(!isBroker10())
+ {
+ ((AMQSession<?,?>)producerSession).sync();
+ // TODO: sync a second time in order to ensure that the client has received the flow command
+ // before continuing with the next message. This is required because the Broker may legally
+ // send the flow command after the sync response. By sync'ing a second time we ensure that
+ // the client will has seen/acted on the flow command. The test really ought not have this
+ // level of information.
+ ((AMQSession<?,?>)producerSession).sync();
+ }
+ else
+ {
+ producerSession.createTemporaryQueue().delete();
+ }
+ }
+ catch (QpidException e)
+ {
+ _logger.error("Error performing sync", e);
+ throw new RuntimeException(e);
+ }
+
+ try
+ {
+ Thread.sleep(sleepPeriod);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private final byte[] BYTE_300 = new byte[300];
+
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
{
- _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS);
- return _exception;
+ BytesMessage send = producerSession.createBytesMessage();
+ send.writeBytes(BYTE_300);
+ send.setIntProperty("msg", msg);
+
+ return send;
}
}
}
Copied: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ConsumerFlowControlTest.java (from r1773368, qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ConsumerFlowControlTest.java?p2=qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ConsumerFlowControlTest.java&p1=qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java&r1=1773368&r2=1773369&rev=1773369&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ConsumerFlowControlTest.java Fri Dec 9 12:56:41 2016
@@ -35,9 +35,9 @@ import org.apache.qpid.client.AMQSession
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
-public class FlowControlTest extends QpidBrokerTestCase
+public class ConsumerFlowControlTest extends QpidBrokerTestCase
{
- private static final Logger _logger = LoggerFactory.getLogger(FlowControlTest.class);
+ private static final Logger _logger = LoggerFactory.getLogger(ConsumerFlowControlTest.class);
private Connection _clientConnection;
private Session _clientSession;
@@ -168,7 +168,7 @@ public class FlowControlTest extends Qpi
assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
r2.acknowledge();
- r3.acknowledge();
+ r3.acknowledge();
recv1.close();
recv2.close();
consumerSession1.close();
Added: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ProducerFlowControlTest.java?rev=1773369&view=auto
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ProducerFlowControlTest.java (added)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ProducerFlowControlTest.java Fri Dec 9 12:56:41 2016
@@ -0,0 +1,124 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.test.client;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.systest.rest.RestTestHelper;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
+public class ProducerFlowControlTest extends AbstractTestLogging
+{
+ private static final long TIMEOUT = 5000;
+ private Queue _queue;
+ private Connection _producerConnection;
+ private Session _producerSession;
+ private RestTestHelper _restTestHelper;
+ private String _queueUrl;
+ private MessageProducer _producer;
+ private BytesMessage _message;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ getDefaultBrokerConfiguration().addHttpManagementConfiguration();
+ super.setUp();
+
+ _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
+
+ setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
+ setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
+
+ _producerConnection = getConnectionWithSyncPublishing();
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String queueName = getTestQueueName();
+ _queueUrl = String.format("queue/%1$s/%1$s/%2$s", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
+ _queue = createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 1000, 800);
+
+ _producer = _producerSession.createProducer(_queue);
+ _message = _producerSession.createBytesMessage();
+ _message.writeBytes(new byte[1100]);
+
+ _monitor.markDiscardPoint();
+ }
+
+ public void testClientLogMessages() throws Exception
+ {
+ _producer.send(_message);
+ _restTestHelper.waitForAttributeChanged(_queueUrl, org.apache.qpid.server.model.Queue.QUEUE_FLOW_STOPPED, Boolean.TRUE);
+ try
+ {
+ _producer.send(_message);
+ fail("Producer should be blocked by flow control");
+ }
+ catch (JMSException e)
+ {
+ final String expectedMsg = isBroker010() ? "Exception when sending message:timed out waiting for message credit"
+ : "Unable to send message for 3 seconds due to broker enforced flow control";
+ assertEquals("Unexpected exception reason", expectedMsg, e.getMessage());
+ }
+
+ List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT);
+ assertTrue("No delay messages logged by client",results.size()!=0);
+
+ List<String> failedMessages = waitAndFindMatches("Message send failed due to timeout waiting on broker enforced"
+ + " flow control", TIMEOUT);
+ assertEquals("Incorrect number of send failure messages logged by client (got " + results.size() + " delay "
+ + "messages)",1,failedMessages.size());
+ }
+
+
+ private Queue createAndBindQueueWithFlowControlEnabled(Session session,
+ String queueName,
+ int capacity,
+ int resumeCapacity) throws Exception
+ {
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity", capacity);
+ arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
+ ((AMQSession<?, ?>) session).createQueue(queueName, true, false, false, arguments);
+ Queue queue = session.createQueue("direct://amq.direct/"
+ + queueName
+ + "/"
+ + queueName
+ + "?durable='"
+ + false
+ + "'&autodelete='"
+ + true
+ + "'");
+ ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
+ return queue;
+ }
+}
+
Modified: qpid/java/trunk/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/CPPExcludes?rev=1773369&r1=1773368&r2=1773369&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/CPPExcludes (original)
+++ qpid/java/trunk/test-profiles/CPPExcludes Fri Dec 9 12:56:41 2016
@@ -44,7 +44,7 @@ org.apache.qpid.server.queue.SortedQueue
org.apache.qpid.test.unit.client.MaxDeliveryCountTest#*
//this test checks explicitly for 0-8 flow control semantics
-org.apache.qpid.test.client.FlowControlTest#*
+org.apache.qpid.test.client.ConsumerFlowControlTest#*
// 0-10 c++ broker doesn't implement virtual hosts, or those wackhy exchanges
org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnresolvedVirtualHostFailure
Modified: qpid/java/trunk/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java010Excludes?rev=1773369&r1=1773368&r2=1773369&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java010Excludes (original)
+++ qpid/java/trunk/test-profiles/Java010Excludes Fri Dec 9 12:56:41 2016
@@ -24,7 +24,7 @@ org.apache.qpid.test.unit.client.connect
org.apache.qpid.systest.rest.BrokerRestTest#testSetCloseOnNoRoute
//this test checks explicitly for 0-8 flow control semantics
-org.apache.qpid.test.client.FlowControlTest#*
+org.apache.qpid.test.client.ConsumerFlowControlTest#*
// 0-10 protocol doesn't support message bouncing
org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#*
Modified: qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes?rev=1773369&r1=1773368&r2=1773369&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes (original)
+++ qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes Fri Dec 9 12:56:41 2016
@@ -83,7 +83,3 @@ org.apache.qpid.test.unit.topic.Temporar
// These tests require some way to set properties on the link established by the client
org.apache.qpid.server.queue.ConsumerPriorityTest#*
org.apache.qpid.server.queue.ArrivalTimeFilterTest#*
-
-// Broker should issue drain to client when flow control is enforced, so that existing credit is used up (test will also need updating)
-org.apache.qpid.server.queue.ProducerFlowControlTest#*
-
Modified: qpid/java/trunk/test-profiles/Java10Excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10Excludes?rev=1773369&r1=1773368&r2=1773369&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10Excludes (original)
+++ qpid/java/trunk/test-profiles/Java10Excludes Fri Dec 9 12:56:41 2016
@@ -108,7 +108,10 @@ org.apache.qpid.test.unit.basic.Property
org.apache.qpid.test.unit.basic.PropertyValueTest#testLargeHeader_010_HeadersFillContentHeaderFrame
// This test concerns 0-8/0-10 bytes limiting flow control
-org.apache.qpid.test.client.FlowControlTest#*
+org.apache.qpid.test.client.ConsumerFlowControlTest#*
+
+// Tests are tests of the 0-x client behaviour
+org.apache.qpid.test.client.ProducerFlowControlTest#*
// Failover tests are tests of the 0-x client behaviour
org.apache.qpid.client.failover.FailoverBehaviourTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org