You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/06/01 16:45:00 UTC
qpid-broker-j git commit: NO-JIRA: [Java System Tests] Fix racey
ProducerFlowControlTest#testCapacityExceedsBlock
Repository: qpid-broker-j
Updated Branches:
refs/heads/master f42e83fdd -> bdb6338f3
NO-JIRA: [Java System Tests] Fix racey ProducerFlowControlTest#testCapacityExceedsBlock
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/bdb6338f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/bdb6338f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/bdb6338f
Branch: refs/heads/master
Commit: bdb6338f3b5afb7648506dbe35a39cf5f5564374
Parents: f42e83f
Author: Keith Wall <kw...@apache.org>
Authored: Thu Jun 1 17:40:43 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Thu Jun 1 17:44:37 2017 +0100
----------------------------------------------------------------------
.../server/queue/ProducerFlowControlTest.java | 64 ++++++--------------
1 file changed, 17 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bdb6338f/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
index 06472aa..ca75e35 100644
--- a/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -41,7 +41,6 @@ import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.server.logging.AbstractTestLogging;
@@ -68,6 +67,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
private int _messageSizeIncludingHeader;
private Session _utilitySession;
+ @Override
public void setUp() throws Exception
{
getDefaultBrokerConfiguration().addHttpManagementConfiguration();
@@ -101,6 +101,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
_messageSizeIncludingHeader = getQueueDepthBytes(tmpQueueName);
}
+ @Override
public void tearDown() throws Exception
{
try
@@ -131,7 +132,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
_producer = _producerSession.createProducer(_queue);
// try to send 5 messages (should block after 4)
- CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5, 5L).getSendLatch();
+ CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5).getSendLatch();
assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true, 5000));
assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
@@ -164,7 +165,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
_producer = _producerSession.createProducer(_queue);
// try to send 5 messages (should block after 4)
- sendMessagesAsync(_producer, _producerSession, 5, 5L);
+ sendMessagesAsync(_producer, _producerSession, 5);
List<String> results = waitAndFindMatches("QUE-1003", 7000);
@@ -193,7 +194,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
// try to send 5 messages (should block after 4)
- CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5, 5L).getSendLatch();
+ CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5).getSendLatch();
assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true,5000));
@@ -236,7 +237,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer myproducer = session.createProducer(_queue);
- MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 5L);
+ MessageSender sender = sendMessagesAsync(myproducer, session, numMessages);
}
_consumer = _consumerSession.createConsumer(_queue);
@@ -282,7 +283,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
assertFalse("Queue should not be overfull", isFlowStopped(queueUrl));
// try to send 2 messages (should block after 1)
- sendMessagesAsync(_producer, _producerSession, 2, 50L);
+ sendMessagesAsync(_producer, _producerSession, 2);
waitForFlowControlAndMessageCount(queueUrl, 1, 2000);
@@ -372,7 +373,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
_producer = _producerSession.createProducer(_queue);
// try to send 5 messages (should block after 4)
- sendMessagesAsync(_producer, _producerSession, 5, 5L);
+ sendMessagesAsync(_producer, _producerSession, 5);
assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true,5000));
@@ -440,10 +441,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging
private MessageSender sendMessagesAsync(final MessageProducer producer,
final Session producerSession,
- final int numMessages,
- long sleepPeriod)
+ final int numMessages)
{
- MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod);
+ MessageSender sender = new MessageSender(producer, producerSession, numMessages);
new Thread(sender).start();
return sender;
}
@@ -456,21 +456,20 @@ public class ProducerFlowControlTest extends AbstractTestLogging
private final int _numMessages;
private volatile JMSException _exception;
private CountDownLatch _sendLatch = new CountDownLatch(1);
- private long _sleepPeriod;
- public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+ public MessageSender(MessageProducer producer, Session producerSession, int numMessages)
{
_senderProducer = producer;
_senderSession = producerSession;
_numMessages = numMessages;
- _sleepPeriod = sleepPeriod;
}
+ @Override
public void run()
{
try
{
- sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod);
+ sendMessages(_senderProducer, _senderSession, _numMessages);
}
catch (JMSException e)
{
@@ -487,7 +486,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
return _sendLatch;
}
- private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+ private void sendMessages(MessageProducer producer, Session producerSession, int numMessages)
throws JMSException
{
@@ -496,38 +495,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging
producer.send(nextMessage(msg, producerSession));
_sentMessages.incrementAndGet();
-
- try
- {
- if(!isBroker10())
- {
- ((AMQSession<?,?>)producerSession).sync();
- // TODO: sync a second time in order to ensure that the client has received the flow command
- // before continuing with the next message. This is required because the Broker may legally
- // send the flow command after the sync response. By sync'ing a second time we ensure that
- // the client will has seen/acted on the flow command. The test really ought not have this
- // level of information.
- ((AMQSession<?,?>)producerSession).sync();
- }
- else
- {
- producerSession.createTemporaryQueue().delete();
- }
- }
- catch (QpidException e)
- {
- _logger.error("Error performing sync", e);
- throw new RuntimeException(e);
- }
-
- try
- {
- Thread.sleep(sleepPeriod);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
+ // Cause work that causes a synchronous interaction on the wire. We need to be
+ // sure that the client has received the flow/message.stop etc.
+ producerSession.createTemporaryQueue().delete();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org