You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/02/25 15:57:44 UTC

svn commit: r916304 - in /qpid/branches/0.5.x-dev/qpid/java/perftests: etc/scripts/ src/main/java/org/apache/qpid/ping/ src/main/java/org/apache/qpid/requestreply/

Author: ritchiem
Date: Thu Feb 25 14:57:44 2010
New Revision: 916304

URL: http://svn.apache.org/viewvc?rev=916304&view=rev
Log:
QPID-2421 : Augmented Async Performance test to take new 'preFill' value, that puts <n> messages onto the broker destination before the test begins.
When running on a non-TX'd producer session the use of the new 'delayBeforeConsume' will pause the client for <n> ms before the test starts, giving the producer session time to flush.
This new functionality can be explored with the new 'testWithPreFill' script.
The 'numConsumer' parameter was augmented to allow a 0 value which disables all the consumers. This can be seen with the 'fillBroker' script.
To complement that a new 'consumeOnly' boolean was added to disable sending messages. This can be seen with the 'drainBroker' scripts.

All scripts are located in java/perftests/etc/scripts


Added:
    qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/drainBroker.sh   (with props)
    qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/fillBroker.sh   (with props)
    qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/testWithPreFill.sh   (with props)
Modified:
    qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
    qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
    qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Added: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/drainBroker.sh
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/drainBroker.sh?rev=916304&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/drainBroker.sh (added)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/drainBroker.sh Thu Feb 25 14:57:44 2010
@@ -0,0 +1,41 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if [ -z "$QPID_HOME" ]; then
+    export QPID_HOME=$(dirname $(dirname $(dirname $(readlink -f $0))))
+    export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
+# Parse arguements taking all - prefixed args as JAVA_OPTS
+for arg in "$@"; do
+    if [[ $arg == -java:* ]]; then
+        JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2`  "
+    else
+        ARGS="${ARGS}$arg "
+    fi
+done
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-all.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java        JAVA_VM=-server        JAVA_MEM=-Xmx1024m        QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run -Xms256m -Dlog4j.configuration=file://${QPID_HOME}/etc/perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn org.apache.qpid.junit.extensions.TKTestRunner -n PQBT-TX-Qpid-01   -s[1000]   -c[1]                   -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf   persistent=true  pubsub=false uniqueDests=true  numConsumers=1  transacted=true  consTransacted=true  consAckMode=0   commitBatchSize=1   batchSize=1000   messageSize=256   destinationCount=1  rate=0  maxPending=0  consumeOnly=true consumeOnly=true${ARGS}

Propchange: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/drainBroker.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/fillBroker.sh
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/fillBroker.sh?rev=916304&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/fillBroker.sh (added)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/fillBroker.sh Thu Feb 25 14:57:44 2010
@@ -0,0 +1,41 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if [ -z "$QPID_HOME" ]; then
+    export QPID_HOME=$(dirname $(dirname $(dirname $(readlink -f $0))))
+    export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
+# Parse arguements taking all - prefixed args as JAVA_OPTS
+for arg in "$@"; do
+    if [[ $arg == -java:* ]]; then
+        JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2`  "
+    else
+        ARGS="${ARGS}$arg "
+    fi
+done
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-all.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java        JAVA_VM=-server        JAVA_MEM=-Xmx1024m        QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run -Xms256m -Dlog4j.configuration=file://${QPID_HOME}/etc/perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn org.apache.qpid.junit.extensions.TKTestRunner -n PQBT-TX-Qpid-01   -s[1000]   -c[1]                   -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf   persistent=true  pubsub=false uniqueDests=true  numConsumers=0   transacted=true  consTransacted=true  consAckMode=0   commitBatchSize=1   batchSize=1000   messageSize=256   destinationCount=1  rate=0  maxPending=0  ${ARGS}

Propchange: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/fillBroker.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/testWithPreFill.sh
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/testWithPreFill.sh?rev=916304&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/testWithPreFill.sh (added)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/testWithPreFill.sh Thu Feb 25 14:57:44 2010
@@ -0,0 +1,41 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if [ -z "$QPID_HOME" ]; then
+    export QPID_HOME=$(dirname $(dirname $(dirname $(readlink -f $0))))
+    export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
+# Parse arguements taking all - prefixed args as JAVA_OPTS
+for arg in "$@"; do
+    if [[ $arg == -java:* ]]; then
+        JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2`  "
+    else
+        ARGS="${ARGS}$arg "
+    fi
+done
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-all.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java        JAVA_VM=-server        JAVA_MEM=-Xmx1024m        QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run -Xms256m -Dlog4j.configuration=file://${QPID_HOME}/etc/perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn org.apache.qpid.junit.extensions.TKTestRunner -n PQBT-TX-Qpid-01   -s[1000]   -c[1]                   -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf   persistent=true  pubsub=false uniqueDests=true  numConsumers=1  transacted=true  consTransacted=true  consAckMode=0   commitBatchSize=1   batchSize=1000   messageSize=256   destinationCount=1  rate=0  maxPending=0  preFill=1000 delayBeforeConsume=1000 ${ARGS}

Propchange: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/scripts/testWithPreFill.sh
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java?rev=916304&r1=916303&r2=916304&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java Thu Feb 25 14:57:44 2010
@@ -133,8 +133,11 @@
     {
         // _logger.debug("public void testAsyncPingOk(int numPings): called");
 
+        // get prefill count to update the expected count
+        int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
+
         // Ensure that at least one ping was requeusted.
-        if (numPings == 0)
+        if (numPings + preFill == 0)
         {
             _logger.error("Number of pings requested was zero.");
             fail("Number of pings requested was zero.");
@@ -149,16 +152,24 @@
         // String messageCorrelationId = perThreadSetup._correlationId;
         // _logger.debug("messageCorrelationId = " + messageCorrelationId);
 
+
         // Initialize the count and timing controller for the new correlation id.
         PerCorrelationId perCorrelationId = new PerCorrelationId();
         TimingController tc = getTimingController().getControllerForCurrentThread();
         perCorrelationId._tc = tc;
-        perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
+        perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill);
         perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
 
+        // Start the client that will have been paused due to preFill requirement.
+        // or if we have not yet started client because messages are sitting on broker. 
+        if (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+        {
+            pingClient.start();
+        }
+
         // Send the requested number of messages, and wait until they have all been received.
         long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
-        int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId);
+        int numReplies = pingClient.pingAndWaitForReply(null, numPings , preFill, timeout, perThreadSetup._correlationId);
 
         // Check that all the replies were received and log a fail if they were not.
         if (numReplies < perCorrelationId._expectedCount)

Modified: qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java?rev=916304&r1=916303&r2=916304&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java Thu Feb 25 14:57:44 2010
@@ -141,9 +141,65 @@
                 perThreadSetup._pingClient = new PingClient(testParameters);
                 perThreadSetup._pingClient.establishConnection(true, true);
             }
-            // Start the client connection
-            perThreadSetup._pingClient.start();
 
+            // Prefill the broker unless we are in consume only mode. 
+            int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
+            if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0)
+            {
+                // Manually set the correlation ID to 1. This is not ideal but it is the
+                // value that the main test loop will use.
+                perThreadSetup._pingClient.pingNoWaitForReply(null, preFill, "1");
+
+                // Note with a large preFill and non-tx session the messages will be
+                // rapidly pushed in to the mina buffers. OOM's are a real risk here.
+                // Should perhaps consider using a TX session for the prefill.
+
+                long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME);
+
+                //  Only delay if we have consumers and a delayBeforeConsume
+                if ((testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0)
+                    && delayBeforeConsume > 0)
+                {
+
+                    boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
+                    // Only do logging if in verbose mode.
+                    if (verbose)
+                    {
+                        if (delayBeforeConsume > 60000)
+                        {
+                            long minutes = delayBeforeConsume / 60000;
+                            long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000;
+                            long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000);
+                                _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test.");
+                        }
+                        else
+                        {
+                                _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test.");
+                        }
+                    }
+
+                    Thread.sleep(delayBeforeConsume);
+
+                    if (verbose)
+                    {
+                        _logger.info("Starting Test.");
+                    }
+                }
+
+                // We can't start the client's here as the test client has not yet been configured to receieve messages.
+                // only when the test method is executed will the correlationID map be set up and ready to consume
+                // the messages we have sent here.
+            }
+            else //Only start the consumer if we are not preFilling.
+            {
+                // Only start the consumer if we don't have messages waiting to be received.
+                // we need to set up the correlationID mapping first
+                if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+                {
+                    // Start the client connection
+                    perThreadSetup._pingClient.start();
+                }
+            }
             // Attach the per-thread set to the thread.
             threadSetup.set(perThreadSetup);
         }
@@ -157,7 +213,7 @@
      * Performs test fixture clean
      */
     public void threadTearDown()
-    {
+    {                                                                                       
         _logger.debug("public void threadTearDown(): called");
 
         try

Modified: qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=916304&r1=916303&r2=916304&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Thu Feb 25 14:57:44 2010
@@ -324,6 +324,25 @@
     /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
     public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
 
+    /** Holds the name of the property to get the number of message to prefill the broker with before starting the main test. */
+    public static final String PREFILL_PROPNAME = "preFill";
+
+    /** Defines the default value for the number of messages to prefill. 0,default, no messages. */
+    public static final int PREFILL_DEFAULT = 0;
+
+    /** Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled. */
+    public static final String DELAY_BEFORE_CONSUME_PROPNAME = "delayBeforeConsume";
+
+    /** Defines the default value for delay in ms to wait before starting thet test run. 0,default, no delay. */
+    public static final long  DELAY_BEFORE_CONSUME = 0;
+
+    /** Holds the name of the property to get when no messasges should be sent. */
+    public static final String CONSUME_ONLY_PROPNAME = "consumeOnly";
+
+    /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */
+    public static final boolean CONSUME_ONLY_DEFAULT = false;
+
+
     /** Holds the default configuration properties. */
     public static ParsedProperties defaults = new ParsedProperties();
 
@@ -360,6 +379,9 @@
         defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
         defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
         defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
+        defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT);
+        defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME);
+        defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT);        
     }
 
     /** Allows setting of client ID on the connection, rather than through the connection URL. */
@@ -455,6 +477,24 @@
      */
     protected int _maxPendingSize;
 
+    /**
+     * Holds the number of messages to send during the setup phase, before the clients start consuming.
+     */
+    private Integer _preFill;
+
+    /**
+     * Holds the time in ms to wait after preFilling before starting thet test.
+     */
+    private Long _delayBeforeConsume;
+
+    /**
+     * Holds a boolean value of wither this test should just consume, i.e. skips
+     * sending messages, but still expects to receive the specified number.
+     * Use in conjuction with numConsumers=0 to fill the broker.
+     */
+    private boolean _consumeOnly;
+
+
     /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
     private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
 
@@ -588,6 +628,9 @@
         _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
         _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
         _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
+        _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME);
+        _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME);
+        _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME);
 
         // Check that one or more destinations were specified.
         if (_noOfDestinations < 1)
@@ -638,7 +681,10 @@
         }
 
         // Create the destinations to send pings to and receive replies from.
-        _replyDestination = _consumerSession[0].createTemporaryQueue();
+        if (_noOfConsumers > 0)
+        {
+            _replyDestination = _consumerSession[0].createTemporaryQueue();
+        }
         createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
 
         // Create the message producer only if instructed to.
@@ -871,6 +917,14 @@
         {
             _consumer = new MessageConsumer[_noOfConsumers];
 
+            // If we don't have consumers then ensure we have created the
+            // destination.   
+            if (_noOfConsumers == 0)
+            {
+                _producerSession.createConsumer(destination, selector,
+                                                NO_LOCAL_DEFAULT).close();
+            }
+
             for (int i = 0; i < _noOfConsumers; i++)
             {
                 // Create a consumer for the destination and set this pinger to listen to its messages.
@@ -980,6 +1034,11 @@
                         // When running in client ack mode, an ack is done instead of a commit, on the commit batch
                         // size boundaries.
                         long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
+                        // _noOfConsumers can be set to 0 on the command line but we will not get here to
+                        // divide by 0 as this is executed by the onMessage code when a message is recevied.
+                        // no consumers means no message reception.
+
+
                         // log.debug("commitCount = " + commitCount);
 
                         if ((commitCount % _txBatchSize) == 0)
@@ -1014,6 +1073,7 @@
                 else
                 {
                     log.warn("Got unexpected message with correlationId: " + correlationID);
+                    log.warn("Map contains:" + perCorrelationIds.entrySet());
                 }
             }
             else
@@ -1037,13 +1097,18 @@
      * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
      * the correlation id.
      *
+     * Can be augmented through a pre-fill property (PingPongProducer.PREFILL_PROPNAME) that will populate the destination
+     * with a set number of messages so the total pings sent and therefore expected will be PREFILL + numPings.
+     *
+     * If pre-fill is specified then the consumers will start paused to allow the prefilling to occur.
+     *
      * @param message              The message to send. If this is null, one is generated.
      * @param numPings             The number of ping messages to send.
      * @param timeout              The timeout in milliseconds.
      * @param messageCorrelationId The message correlation id. If this is null, one is generated.
      *
      * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait
-     *         for all prematurely.
+     *         for all prematurely. If we are running in noConsumer=0 so send only mode then it will return the no msgs sent.
      *
      * @throws JMSException         All underlying JMSExceptions are allowed to fall through.
      * @throws InterruptedException When interrupted by a timeout
@@ -1051,6 +1116,16 @@
     public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
         throws JMSException, InterruptedException
     {
+        return pingAndWaitForReply(message, numPings, 0, timeout, messageCorrelationId);
+    }
+
+    public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId)
+        throws JMSException, InterruptedException
+    {
+
+        // If we are runnning a consumeOnly test then don't send any messages
+
+
         /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
             + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
 
@@ -1071,29 +1146,41 @@
             // countdown needs to be done before the chained listener can be called.
             PerCorrelationId perCorrelationId = new PerCorrelationId();
 
-            perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1);
+            int totalPingsRequested = numPings + preFill;
+            perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(totalPingsRequested) + 1);
             perCorrelationIds.put(messageCorrelationId, perCorrelationId);
 
             // Set up the current time as the start time for pinging on the correlation id. This is used to determine
             // timeouts.
             perCorrelationId.timeOutStart = System.nanoTime();
 
-            // Send the specifed number of messages.
+            // Send the specifed number of messages for this test            
             pingNoWaitForReply(message, numPings, messageCorrelationId);
 
             boolean timedOut;
             boolean allMessagesReceived;
             int numReplies;
 
+            // We don't have a consumer so don't try and wait for the messages.
+            // this does mean that if the producerSession is !TXed then we may
+            // get to exit before all msgs have been received.
+            //
+            // Return the number of requested messages, this will let the test
+            // report a pass.
+            if (_noOfConsumers == 0)
+            {
+                return totalPingsRequested;
+            }
+
             do
             {
                 // Block the current thread until replies to all the messages are received, or it times out.
                 perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
 
                 // Work out how many replies were receieved.
-                numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount();
+                numReplies = getExpectedNumPings(totalPingsRequested) - (int) perCorrelationId.trafficLight.getCount();
 
-                allMessagesReceived = numReplies == getExpectedNumPings(numPings);
+                allMessagesReceived = numReplies == getExpectedNumPings(totalPingsRequested);
 
                 // log.debug("numReplies = " + numReplies);
                 // log.debug("allMessagesReceived = " + allMessagesReceived);
@@ -1108,7 +1195,7 @@
             }
             while (!timedOut && !allMessagesReceived);
 
-            if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
+            if ((numReplies < getExpectedNumPings(totalPingsRequested)) && _verbose)
             {
                 log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
             }
@@ -1146,6 +1233,12 @@
         /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
             + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
 
+        // If we are runnning a consumeOnly test then don't send any messages
+        if (_consumeOnly)
+        {
+            return;
+        }
+        
         if (message == null)
         {
             message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
@@ -1667,6 +1760,10 @@
     /**
      * Calculates how many pings are expected to be received for the given number sent.
      *
+     * Note : that if you have set noConsumers to 0 then this will also return 0
+     * in the case of PubSub testing. This is correct as without consumers there
+     * will be no-one to receive the sent messages so they will be unable to respond. 
+     *
      * @param numpings The number of pings that will be sent.
      *
      * @return The number that should be received, for the test to pass.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org