You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/12/27 11:16:14 UTC

svn commit: r1426152 - in /qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest: client/ConsumerParticipant.java jms/QpidQueueCreator.java

Author: kwall
Date: Thu Dec 27 10:16:13 2012
New Revision: 1426152

URL: http://svn.apache.org/viewvc?rev=1426152&view=rev
Log:
NO-JIRA: [Java Broker] Perf Tests - tweak queue drain algorithm to better handle a slow broker (exposed by new batch size tests)

Modified:
    qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
    qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java

Modified: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java?rev=1426152&r1=1426151&r2=1426152&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java (original)
+++ qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java Thu Dec 27 10:16:13 2012
@@ -103,16 +103,22 @@ public class ConsumerParticipant impleme
         }
 
         Date end = new Date();
-        int numberOfMessagesSent = _totalNumberOfMessagesReceived.get();
+        int numberOfMessagesReceived = _totalNumberOfMessagesReceived.get();
         long totalPayloadSize = _totalPayloadSizeOfAllMessagesReceived.get();
         int payloadSize = getPayloadSizeForResultIfConstantOrZeroOtherwise(_allConsumedPayloadSizes);
 
+        if (LOGGER.isInfoEnabled())
+        {
+            LOGGER.info("Consumer {} finished consuming. Number of messages consumed: {}",
+                        getName(), numberOfMessagesReceived);
+        }
+
         ConsumerParticipantResult result = _resultFactory.createForConsumer(
                 getName(),
                 registeredClientName,
                 _command,
                 acknowledgeMode,
-                numberOfMessagesSent,
+                numberOfMessagesReceived,
                 payloadSize,
                 totalPayloadSize,
                 start, end, _messageLatencies);

Modified: qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java?rev=1426152&r1=1426151&r2=1426152&view=diff
==============================================================================
--- qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java (original)
+++ qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java Thu Dec 27 10:16:13 2012
@@ -25,7 +25,6 @@ import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.disttest.DistributedTestException;
@@ -34,12 +33,13 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.FieldTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 public class QpidQueueCreator implements QueueCreator
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class);
     private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable();
     private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime";
-    private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 5000);
+    private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500);
 
     @Override
     public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
@@ -61,10 +61,8 @@ public class QpidQueueCreator implements
 
             // drainQueue method is added because deletion of queue with a lot
             // of messages takes time and might cause the timeout exception
-            if (queueHasMessages(amqSession, destination))
-            {
-                drainQueue(connection, destination);
-            }
+            drainQueue(connection, destination);
+
             deleteQueue(amqSession, destination.getAMQQueueName());
         }
     }
@@ -81,13 +79,12 @@ public class QpidQueueCreator implements
         }
     }
 
-    private boolean queueHasMessages(AMQSession<?, ?> amqSession, AMQDestination destination)
+    private long getQueueDepth(AMQSession<?, ?> amqSession, AMQDestination destination)
     {
         try
         {
             long queueDepth = amqSession.getQueueDepth(destination);
-            LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), queueDepth);
-            return queueDepth > 0;
+            return queueDepth;
         }
         catch (Exception e)
         {
@@ -103,10 +100,19 @@ public class QpidQueueCreator implements
             LOGGER.debug("About to drain the queue {}", destination.getQueueName());
             noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
             MessageConsumer messageConsumer = noAckSession.createConsumer(destination);
+
+            long currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination);
             int counter = 0;
-            while(messageConsumer.receive(_drainPollTimeout) != null)
+            while (currentQueueDepth > 0)
             {
-                counter++;
+                LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), currentQueueDepth);
+
+                while(messageConsumer.receive(_drainPollTimeout) != null)
+                {
+                    counter++;
+                }
+
+                currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination);
             }
             LOGGER.info("Drained {} message(s) from queue {} ", counter, destination.getQueueName());
             messageConsumer.close();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org