You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/06/01 16:45:00 UTC

qpid-broker-j git commit: NO-JIRA: [Java System Tests] Fix racey ProducerFlowControlTest#testCapacityExceedsBlock

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master f42e83fdd -> bdb6338f3


NO-JIRA: [Java System Tests] Fix racey ProducerFlowControlTest#testCapacityExceedsBlock


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/bdb6338f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/bdb6338f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/bdb6338f

Branch: refs/heads/master
Commit: bdb6338f3b5afb7648506dbe35a39cf5f5564374
Parents: f42e83f
Author: Keith Wall <kw...@apache.org>
Authored: Thu Jun 1 17:40:43 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Thu Jun 1 17:44:37 2017 +0100

----------------------------------------------------------------------
 .../server/queue/ProducerFlowControlTest.java   | 64 ++++++--------------
 1 file changed, 17 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bdb6338f/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
index 06472aa..ca75e35 100644
--- a/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -41,7 +41,6 @@ import javax.jms.Session;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.QpidException;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.server.logging.AbstractTestLogging;
@@ -68,6 +67,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
     private int _messageSizeIncludingHeader;
     private Session _utilitySession;
 
+    @Override
     public void setUp() throws Exception
     {
         getDefaultBrokerConfiguration().addHttpManagementConfiguration();
@@ -101,6 +101,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
         _messageSizeIncludingHeader = getQueueDepthBytes(tmpQueueName);
     }
 
+    @Override
     public void tearDown() throws Exception
     {
         try
@@ -131,7 +132,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
         _producer = _producerSession.createProducer(_queue);
 
         // try to send 5 messages (should block after 4)
-        CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5, 5L).getSendLatch();
+        CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5).getSendLatch();
 
         assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true, 5000));
         assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
@@ -164,7 +165,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
         _producer = _producerSession.createProducer(_queue);
 
         // try to send 5 messages (should block after 4)
-        sendMessagesAsync(_producer, _producerSession, 5, 5L);
+        sendMessagesAsync(_producer, _producerSession, 5);
 
         List<String> results = waitAndFindMatches("QUE-1003", 7000);
 
@@ -193,7 +194,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
 
 
         // try to send 5 messages (should block after 4)
-        CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5, 5L).getSendLatch();
+        CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5).getSendLatch();
 
         assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true,5000));
 
@@ -236,7 +237,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
             Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
 
             MessageProducer myproducer = session.createProducer(_queue);
-            MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 5L);
+            MessageSender sender = sendMessagesAsync(myproducer, session, numMessages);
         }
 
         _consumer = _consumerSession.createConsumer(_queue);
@@ -282,7 +283,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
         assertFalse("Queue should not be overfull", isFlowStopped(queueUrl));
 
         // try to send 2 messages (should block after 1)
-        sendMessagesAsync(_producer, _producerSession, 2, 50L);
+        sendMessagesAsync(_producer, _producerSession, 2);
 
         waitForFlowControlAndMessageCount(queueUrl, 1, 2000);
 
@@ -372,7 +373,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
         _producer = _producerSession.createProducer(_queue);
 
         // try to send 5 messages (should block after 4)
-        sendMessagesAsync(_producer, _producerSession, 5, 5L);
+        sendMessagesAsync(_producer, _producerSession, 5);
 
         assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true,5000));
 
@@ -440,10 +441,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging
 
     private MessageSender sendMessagesAsync(final MessageProducer producer,
                                             final Session producerSession,
-                                            final int numMessages,
-                                            long sleepPeriod)
+                                            final int numMessages)
     {
-        MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod);
+        MessageSender sender = new MessageSender(producer, producerSession, numMessages);
         new Thread(sender).start();
         return sender;
     }
@@ -456,21 +456,20 @@ public class ProducerFlowControlTest extends AbstractTestLogging
         private final int _numMessages;
         private volatile JMSException _exception;
         private CountDownLatch _sendLatch = new CountDownLatch(1);
-        private long _sleepPeriod;
 
-        public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+        public MessageSender(MessageProducer producer, Session producerSession, int numMessages)
         {
             _senderProducer = producer;
             _senderSession = producerSession;
             _numMessages = numMessages;
-            _sleepPeriod = sleepPeriod;
         }
 
+        @Override
         public void run()
         {
             try
             {
-                sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod);
+                sendMessages(_senderProducer, _senderSession, _numMessages);
             }
             catch (JMSException e)
             {
@@ -487,7 +486,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
             return _sendLatch;
         }
 
-        private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+        private void sendMessages(MessageProducer producer, Session producerSession, int numMessages)
                 throws JMSException
         {
 
@@ -496,38 +495,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging
                 producer.send(nextMessage(msg, producerSession));
                 _sentMessages.incrementAndGet();
 
-
-                try
-                {
-                    if(!isBroker10())
-                    {
-                        ((AMQSession<?,?>)producerSession).sync();
-                        // TODO: sync a second time in order to ensure that the client has received the flow command
-                        // before continuing with the next message.  This is required because the Broker may legally
-                        // send the flow command after the sync response. By sync'ing a second time we ensure that
-                        // the client will has seen/acted on the flow command.  The test really ought not have this
-                        // level of information.
-                        ((AMQSession<?,?>)producerSession).sync();
-                    }
-                    else
-                    {
-                        producerSession.createTemporaryQueue().delete();
-                    }
-                }
-                catch (QpidException e)
-                {
-                    _logger.error("Error performing sync", e);
-                    throw new RuntimeException(e);
-                }
-
-                try
-                {
-                    Thread.sleep(sleepPeriod);
-                }
-                catch (InterruptedException e)
-                {
-                    throw new RuntimeException(e);
-                }
+                // Cause work that causes a synchronous interaction on the wire.  We need to be
+                // sure that the client has received the flow/message.stop etc.
+                producerSession.createTemporaryQueue().delete();
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org