You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/12/09 12:56:42 UTC

svn commit: r1773369 - in /qpid/java/trunk: systests/src/test/java/org/apache/qpid/server/queue/ systests/src/test/java/org/apache/qpid/test/client/ test-profiles/

Author: lquack
Date: Fri Dec  9 12:56:41 2016
New Revision: 1773369

URL: http://svn.apache.org/viewvc?rev=1773369&view=rev
Log:
QPID-7546: fix producer flow control test and move test for client flow control behaviour into separate test class

Added:
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ConsumerFlowControlTest.java
      - copied, changed from r1773368, qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ProducerFlowControlTest.java
Removed:
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java
Modified:
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
    qpid/java/trunk/test-profiles/CPPExcludes
    qpid/java/trunk/test-profiles/Java010Excludes
    qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes
    qpid/java/trunk/test-profiles/Java10Excludes

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=1773369&r1=1773368&r2=1773369&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java Fri Dec  9 12:56:41 2016
@@ -53,8 +53,6 @@ public class ProducerFlowControlTest ext
 {
     private static final Logger _logger = LoggerFactory.getLogger(ProducerFlowControlTest.class);
 
-    private static final int TIMEOUT = 10000;
-
     private Connection _producerConnection;
     private Connection _consumerConnection;
     private Session _producerSession;
@@ -163,32 +161,6 @@ public class ProducerFlowControlTest ext
         assertEquals("Did not find correct number of UNDERFULL queue underfull messages", 1, results.size());
     }
 
-
-    public void testClientLogMessages() throws Exception
-    {
-        String queueName = getTestQueueName();
-
-        setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
-        setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
-
-        Session session = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-        createAndBindQueueWithFlowControlEnabled(session, queueName, 1000, 800);
-        _producer = session.createProducer(_queue);
-
-        // try to send 5 messages (should block after 4)
-        MessageSender sender = sendMessagesAsync(_producer, session, 5, 50L);
-
-        List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT);
-        assertTrue("No delay messages logged by client",results.size()!=0);
-
-        List<String> failedMessages = waitAndFindMatches("Message send failed due to timeout waiting on broker enforced"
-                                                  + " flow control", TIMEOUT);
-        assertEquals("Incorrect number of send failure messages logged by client (got " + results.size() + " delay "
-                     + "messages)",1,failedMessages.size());
-    }
-
-
     public void testFlowControlOnCapacityResumeEqual() throws Exception
     {
         String queueName = getTestQueueName();
@@ -266,30 +238,6 @@ public class ProducerFlowControlTest ext
 
     }
 
-    public void testSendTimeout() throws Exception
-    {
-        String queueName = getTestQueueName();
-        final String expectedMsg = isBroker010() ? "Exception when sending message:timed out waiting for message credit"
-                : "Unable to send message for 3 seconds due to broker enforced flow control";
-
-        setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
-        Session session = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 1000, 800);
-        _producer = session.createProducer(_queue);
-
-        // try to send 5 messages (should block after 4)
-        MessageSender sender = sendMessagesAsync(_producer, session, 5, 100L);
-
-        Exception e = sender.awaitSenderException(10000);
-
-        assertNotNull("No timeout exception on sending", e);
-
-
-        assertEquals("Unexpected exception reason", expectedMsg, e.getMessage());
-
-    }
-
     public void testFlowControlAttributeModificationViaREST() throws Exception
     {
         String queueName = getTestQueueName();
@@ -305,11 +253,11 @@ public class ProducerFlowControlTest ext
                      ((Number) queueAttributes.get(org.apache.qpid.server.model.Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)).intValue());
         assertEquals("FlowResumeCapacity was not the expected value", 0,
                      ((Number) queueAttributes.get(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)).intValue());
-        
+
         //set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent
         setFlowLimits(queueUrl, 250, 250);
         assertFalse("Queue should not be overfull", isFlowStopped(queueUrl));
-        
+
         // try to send 2 messages (should block after 1)
         sendMessagesAsync(_producer, _producerSession, 2, 50L);
 
@@ -318,9 +266,10 @@ public class ProducerFlowControlTest ext
         //check only 1 message was sent, and queue is overfull
         assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get());
         assertTrue("Queue should be overfull", isFlowStopped(queueUrl));
-        
+
+        int queueDepthBytes = getQueueDepthBytes(queueName);
         //raise the attribute values, causing the queue to become underfull and allow the second message to be sent.
-        setFlowLimits(queueUrl, 300, 300);
+        setFlowLimits(queueUrl, queueDepthBytes + 200, queueDepthBytes);
 
         waitForFlowControlAndMessageCount(queueUrl, 2, 2000);
 
@@ -329,7 +278,7 @@ public class ProducerFlowControlTest ext
         assertTrue("Queue should be overfull", isFlowStopped(queueUrl));
 
         //raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity still exceeded
-        setFlowLimits(queueUrl, 700, 300);
+        setFlowLimits(queueUrl, 2 * queueDepthBytes + 100, queueDepthBytes);
         assertTrue("Queue should be overfull", isFlowStopped(queueUrl));
 
         //receive a message, check queue becomes underfull
@@ -337,7 +286,7 @@ public class ProducerFlowControlTest ext
         _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
         
-        _consumer.receive();
+        assertNotNull("Should have received first message", _consumer.receive(RECEIVE_TIMEOUT));
 
         if(!isBroker10())
         {
@@ -345,9 +294,17 @@ public class ProducerFlowControlTest ext
             ((AMQSession<?, ?>) _consumerSession).sync();
         }
 
-        assertFalse("Queue should not be overfull", isFlowStopped(queueUrl));
+        _restTestHelper.waitForAttributeChanged(queueUrl, org.apache.qpid.server.model.Queue.QUEUE_FLOW_STOPPED, false);
 
-        _consumer.receive();
+        assertNotNull("Should have received second message", _consumer.receive(RECEIVE_TIMEOUT));
+    }
+
+    private int getQueueDepthBytes(final String queueName) throws IOException
+    {
+        // On AMQP 1.0 the size of the message on the broker is not necessarily the size of the message we sent. Therefore, get the actual size from the broker
+        final String requestUrl = String.format("queue/%1$s/%1$s/%2$s/getStatistics?statistics=[\"queueDepthBytes\"]", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
+        final Map<String, Object> queueAttributes = _restTestHelper.getJsonAsMap(requestUrl);
+        return ((Number) queueAttributes.get("queueDepthBytes")).intValue();
     }
 
     private void waitForFlowControlAndMessageCount(final String queueUrl, final int messageCount, final int timeout) throws InterruptedException, IOException
@@ -395,14 +352,13 @@ public class ProducerFlowControlTest ext
         {
             // delete queue with a consumer session
             ((AMQSession<?, ?>) _consumerSession).sendQueueDelete(queueName);
-
-            _consumer = _consumerSession.createConsumer(_queue);
         }
         else
         {
             deleteEntityUsingAmqpManagement(getTestQueueName(), _consumerSession, "org.apache.qpid.Queue");
             createTestQueue(_consumerSession);
         }
+        _consumer = _consumerSession.createConsumer(_queue);
         _consumerConnection.start();
 
         Message message = _consumer.receive(1000l);
@@ -455,60 +411,6 @@ public class ProducerFlowControlTest ext
         return sender;
     }
 
-    private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
-            throws JMSException
-    {
-
-        for (int msg = 0; msg < numMessages; msg++)
-        {
-            producer.send(nextMessage(msg, producerSession));
-            _sentMessages.incrementAndGet();
-
-
-            try
-            {
-                if(!isBroker10())
-                {
-                    ((AMQSession<?,?>)producerSession).sync();
-                    // TODO: sync a second time in order to ensure that the client has received the flow command
-                    // before continuing with the next message.  This is required because the Broker may legally
-                    // send the flow command after the sync response. By sync'ing a second time we ensure that
-                    // the client will has seen/acted on the flow command.  The test really ought not have this
-                    // level of information.
-                    ((AMQSession<?,?>)producerSession).sync();
-                }
-                else
-                {
-                    producerSession.createTemporaryQueue().delete();
-                }
-            }
-            catch (QpidException e)
-            {
-                _logger.error("Error performing sync", e);
-                throw new RuntimeException(e);
-            }
-
-            try
-            {
-                Thread.sleep(sleepPeriod);
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    private static final byte[] BYTE_300 = new byte[300];
-
-    private Message nextMessage(int msg, Session producerSession) throws JMSException
-    {
-        BytesMessage send = producerSession.createBytesMessage();
-        send.writeBytes(BYTE_300);
-        send.setIntProperty("msg", msg);
-
-        return send;
-    }
 
     private class MessageSender implements Runnable
     {
@@ -540,10 +442,59 @@ public class ProducerFlowControlTest ext
             }
         }
 
-        public Exception awaitSenderException(long timeout) throws InterruptedException
+        private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+                throws JMSException
+        {
+
+            for (int msg = 0; msg < numMessages; msg++)
+            {
+                producer.send(nextMessage(msg, producerSession));
+                _sentMessages.incrementAndGet();
+
+
+                try
+                {
+                    if(!isBroker10())
+                    {
+                        ((AMQSession<?,?>)producerSession).sync();
+                        // TODO: sync a second time in order to ensure that the client has received the flow command
+                        // before continuing with the next message.  This is required because the Broker may legally
+                        // send the flow command after the sync response. By sync'ing a second time we ensure that
+                        // the client will has seen/acted on the flow command.  The test really ought not have this
+                        // level of information.
+                        ((AMQSession<?,?>)producerSession).sync();
+                    }
+                    else
+                    {
+                        producerSession.createTemporaryQueue().delete();
+                    }
+                }
+                catch (QpidException e)
+                {
+                    _logger.error("Error performing sync", e);
+                    throw new RuntimeException(e);
+                }
+
+                try
+                {
+                    Thread.sleep(sleepPeriod);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        private final byte[] BYTE_300 = new byte[300];
+
+        private Message nextMessage(int msg, Session producerSession) throws JMSException
         {
-            _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS);
-            return _exception;
+            BytesMessage send = producerSession.createBytesMessage();
+            send.writeBytes(BYTE_300);
+            send.setIntProperty("msg", msg);
+
+            return send;
         }
     }
 }

Copied: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ConsumerFlowControlTest.java (from r1773368, qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ConsumerFlowControlTest.java?p2=qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ConsumerFlowControlTest.java&p1=qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java&r1=1773368&r2=1773369&rev=1773369&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ConsumerFlowControlTest.java Fri Dec  9 12:56:41 2016
@@ -35,9 +35,9 @@ import org.apache.qpid.client.AMQSession
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
-public class FlowControlTest extends QpidBrokerTestCase
+public class ConsumerFlowControlTest extends QpidBrokerTestCase
 {
-    private static final Logger _logger = LoggerFactory.getLogger(FlowControlTest.class);
+    private static final Logger _logger = LoggerFactory.getLogger(ConsumerFlowControlTest.class);
 
     private Connection _clientConnection;
     private Session _clientSession;
@@ -168,7 +168,7 @@ public class FlowControlTest extends Qpi
         assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg"));
 
         r2.acknowledge();
-        r3.acknowledge();                                                                 
+        r3.acknowledge();
         recv1.close();
         recv2.close();
         consumerSession1.close();

Added: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ProducerFlowControlTest.java?rev=1773369&view=auto
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ProducerFlowControlTest.java (added)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/ProducerFlowControlTest.java Fri Dec  9 12:56:41 2016
@@ -0,0 +1,124 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.test.client;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.systest.rest.RestTestHelper;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
+public class ProducerFlowControlTest extends AbstractTestLogging
+{
+    private static final long TIMEOUT = 5000;
+    private Queue _queue;
+    private Connection _producerConnection;
+    private Session _producerSession;
+    private RestTestHelper _restTestHelper;
+    private String _queueUrl;
+    private MessageProducer _producer;
+    private BytesMessage _message;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        getDefaultBrokerConfiguration().addHttpManagementConfiguration();
+        super.setUp();
+
+        _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
+
+        setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
+        setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
+
+        _producerConnection = getConnectionWithSyncPublishing();
+        _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        String queueName = getTestQueueName();
+        _queueUrl = String.format("queue/%1$s/%1$s/%2$s", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
+        _queue = createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 1000, 800);
+
+        _producer = _producerSession.createProducer(_queue);
+        _message = _producerSession.createBytesMessage();
+        _message.writeBytes(new byte[1100]);
+
+        _monitor.markDiscardPoint();
+    }
+
+    public void testClientLogMessages() throws Exception
+    {
+        _producer.send(_message);
+        _restTestHelper.waitForAttributeChanged(_queueUrl, org.apache.qpid.server.model.Queue.QUEUE_FLOW_STOPPED, Boolean.TRUE);
+        try
+        {
+            _producer.send(_message);
+            fail("Producer should be blocked by flow control");
+        }
+        catch (JMSException e)
+        {
+            final String expectedMsg = isBroker010() ? "Exception when sending message:timed out waiting for message credit"
+                    : "Unable to send message for 3 seconds due to broker enforced flow control";
+            assertEquals("Unexpected exception reason", expectedMsg, e.getMessage());
+        }
+
+        List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT);
+        assertTrue("No delay messages logged by client",results.size()!=0);
+
+        List<String> failedMessages = waitAndFindMatches("Message send failed due to timeout waiting on broker enforced"
+                                                         + " flow control", TIMEOUT);
+        assertEquals("Incorrect number of send failure messages logged by client (got " + results.size() + " delay "
+                     + "messages)",1,failedMessages.size());
+    }
+
+
+    private Queue createAndBindQueueWithFlowControlEnabled(Session session,
+                                                           String queueName,
+                                                           int capacity,
+                                                           int resumeCapacity) throws Exception
+    {
+        final Map<String, Object> arguments = new HashMap<String, Object>();
+        arguments.put("x-qpid-capacity", capacity);
+        arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
+        ((AMQSession<?, ?>) session).createQueue(queueName, true, false, false, arguments);
+        Queue queue = session.createQueue("direct://amq.direct/"
+                                    + queueName
+                                    + "/"
+                                    + queueName
+                                    + "?durable='"
+                                    + false
+                                    + "'&autodelete='"
+                                    + true
+                                    + "'");
+        ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
+        return queue;
+    }
+}
+

Modified: qpid/java/trunk/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/CPPExcludes?rev=1773369&r1=1773368&r2=1773369&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/CPPExcludes (original)
+++ qpid/java/trunk/test-profiles/CPPExcludes Fri Dec  9 12:56:41 2016
@@ -44,7 +44,7 @@ org.apache.qpid.server.queue.SortedQueue
 org.apache.qpid.test.unit.client.MaxDeliveryCountTest#*
 
 //this test checks explicitly for 0-8 flow control semantics
-org.apache.qpid.test.client.FlowControlTest#*
+org.apache.qpid.test.client.ConsumerFlowControlTest#*
 
 // 0-10 c++ broker doesn't implement virtual hosts, or those wackhy exchanges
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnresolvedVirtualHostFailure

Modified: qpid/java/trunk/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java010Excludes?rev=1773369&r1=1773368&r2=1773369&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java010Excludes (original)
+++ qpid/java/trunk/test-profiles/Java010Excludes Fri Dec  9 12:56:41 2016
@@ -24,7 +24,7 @@ org.apache.qpid.test.unit.client.connect
 org.apache.qpid.systest.rest.BrokerRestTest#testSetCloseOnNoRoute
 
 //this test checks explicitly for 0-8 flow control semantics
-org.apache.qpid.test.client.FlowControlTest#*
+org.apache.qpid.test.client.ConsumerFlowControlTest#*
 
 // 0-10 protocol doesn't support message bouncing
 org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#*

Modified: qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes?rev=1773369&r1=1773368&r2=1773369&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes (original)
+++ qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes Fri Dec  9 12:56:41 2016
@@ -83,7 +83,3 @@ org.apache.qpid.test.unit.topic.Temporar
 // These tests require some way to set properties on the link established by the client
 org.apache.qpid.server.queue.ConsumerPriorityTest#*
 org.apache.qpid.server.queue.ArrivalTimeFilterTest#*
-
-// Broker should issue drain to client when flow control is enforced, so that existing credit is used up (test will also need updating)
-org.apache.qpid.server.queue.ProducerFlowControlTest#*
-

Modified: qpid/java/trunk/test-profiles/Java10Excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10Excludes?rev=1773369&r1=1773368&r2=1773369&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10Excludes (original)
+++ qpid/java/trunk/test-profiles/Java10Excludes Fri Dec  9 12:56:41 2016
@@ -108,7 +108,10 @@ org.apache.qpid.test.unit.basic.Property
 org.apache.qpid.test.unit.basic.PropertyValueTest#testLargeHeader_010_HeadersFillContentHeaderFrame
 
 // This test concerns 0-8/0-10 bytes limiting flow control
-org.apache.qpid.test.client.FlowControlTest#*
+org.apache.qpid.test.client.ConsumerFlowControlTest#*
+
+// Tests are tests of the 0-x client behaviour
+org.apache.qpid.test.client.ProducerFlowControlTest#*
 
 // Failover tests are tests of the 0-x client behaviour
 org.apache.qpid.client.failover.FailoverBehaviourTest#*



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org