You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/12/27 11:16:14 UTC
svn commit: r1426152 - in
/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest:
client/ConsumerParticipant.java jms/QpidQueueCreator.java
Author: kwall
Date: Thu Dec 27 10:16:13 2012
New Revision: 1426152
URL: http://svn.apache.org/viewvc?rev=1426152&view=rev
Log:
NO-JIRA: [Java Broker] Perf Tests - tweak queue drain algorithm to better handle a slow broker (exposed by new batch size tests)
Modified:
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
Modified: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java?rev=1426152&r1=1426151&r2=1426152&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java (original)
+++ qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java Thu Dec 27 10:16:13 2012
@@ -103,16 +103,22 @@ public class ConsumerParticipant impleme
}
Date end = new Date();
- int numberOfMessagesSent = _totalNumberOfMessagesReceived.get();
+ int numberOfMessagesReceived = _totalNumberOfMessagesReceived.get();
long totalPayloadSize = _totalPayloadSizeOfAllMessagesReceived.get();
int payloadSize = getPayloadSizeForResultIfConstantOrZeroOtherwise(_allConsumedPayloadSizes);
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Consumer {} finished consuming. Number of messages consumed: {}",
+ getName(), numberOfMessagesReceived);
+ }
+
ConsumerParticipantResult result = _resultFactory.createForConsumer(
getName(),
registeredClientName,
_command,
acknowledgeMode,
- numberOfMessagesSent,
+ numberOfMessagesReceived,
payloadSize,
totalPayloadSize,
start, end, _messageLatencies);
Modified: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java?rev=1426152&r1=1426151&r2=1426152&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java (original)
+++ qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java Thu Dec 27 10:16:13 2012
@@ -25,7 +25,6 @@ import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
-import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.disttest.DistributedTestException;
@@ -34,12 +33,13 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.FieldTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class QpidQueueCreator implements QueueCreator
{
private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class);
private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable();
private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime";
- private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 5000);
+ private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500);
@Override
public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
@@ -61,10 +61,8 @@ public class QpidQueueCreator implements
// drainQueue method is added because deletion of queue with a lot
// of messages takes time and might cause the timeout exception
- if (queueHasMessages(amqSession, destination))
- {
- drainQueue(connection, destination);
- }
+ drainQueue(connection, destination);
+
deleteQueue(amqSession, destination.getAMQQueueName());
}
}
@@ -81,13 +79,12 @@ public class QpidQueueCreator implements
}
}
- private boolean queueHasMessages(AMQSession<?, ?> amqSession, AMQDestination destination)
+ private long getQueueDepth(AMQSession<?, ?> amqSession, AMQDestination destination)
{
try
{
long queueDepth = amqSession.getQueueDepth(destination);
- LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), queueDepth);
- return queueDepth > 0;
+ return queueDepth;
}
catch (Exception e)
{
@@ -103,10 +100,19 @@ public class QpidQueueCreator implements
LOGGER.debug("About to drain the queue {}", destination.getQueueName());
noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
MessageConsumer messageConsumer = noAckSession.createConsumer(destination);
+
+ long currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination);
int counter = 0;
- while(messageConsumer.receive(_drainPollTimeout) != null)
+ while (currentQueueDepth > 0)
{
- counter++;
+ LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), currentQueueDepth);
+
+ while(messageConsumer.receive(_drainPollTimeout) != null)
+ {
+ counter++;
+ }
+
+ currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination);
}
LOGGER.info("Drained {} message(s) from queue {} ", counter, destination.getQueueName());
messageConsumer.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org