You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/02 16:28:13 UTC

svn commit: r502624 [4/5] - in /incubator/qpid/branches/perftesting/qpid/java: ./ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/protocol/ client/src/main/java/o...

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=502624&r1=502623&r2=502624
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Fri Feb  2 07:28:08 2007
@@ -20,344 +20,382 @@
  */
 package org.apache.qpid.requestreply;
 
+import java.io.IOException;
 import java.net.InetAddress;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.*;
 
 import org.apache.log4j.Logger;
 
+
+import org.apache.qpid.client.message.TestMessageFactory;
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQNoConsumersException;
 import org.apache.qpid.jms.MessageProducer;
 import org.apache.qpid.jms.Session;
-import org.apache.qpid.ping.AbstractPingProducer;
-import org.apache.qpid.ping.Throttle;
 import org.apache.qpid.topic.Config;
 
+import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
+import uk.co.thebadgerset.junit.extensions.Throttle;
+
 /**
  * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
- * client (see {@link PingPongBouncer} for the bounce back client). It is designed to be run from the command line
- * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session,
- * message producer and message consumer to run the ping-pong cycle on.
- * <p/>
+ * client (see {@link PingPongBouncer} for the bounce back client).
+ *
  * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
  * This means that this class has to do some work to correlate pings with pongs; it expectes the original message
- * id in the ping to be bounced back in the correlation id. If a new temporary queue per ping were used, then
- * this correlation would not need to be done.
- * <p/>
+ * correlation id in the ping to be bounced back in the reply correlation id.
+ *
+ * <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor.
+ * It can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings
+ * within transactions; control the number of pings to send in each transaction; limit its sending rate; and perform
+ * failover testing.
+ *
  * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
  * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
  * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
  * also registered to terminate the ping-pong loop cleanly.
- * <p/>
+ *
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Provide a ping and wait for response cycle.
+ * <tr><td> Provide a ping and wait for all responses cycle.
  * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
  * </table>
  *
- * @todo Make temp queue per ping a command line option.
- * @todo Make the queue name a command line option.
+ * @todo The use of a ping rate {@link #DEFAULT_RATE} and waits between pings {@link #DEFAULT_SLEEP_TIME} are overlapping.
+ *       Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to
+ *       be created and configured by the test runner from the -f command line option and made available through
+ *       the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware
+ *       tests.
+ *
+ * @todo Make shared or unique destinations a configurable option, hard coded to false.
+ *
+ * @todo Make acknowledege mode a test option.
+ *
+ * @todo Make the message listener a static for all replies to be sent to. It won't be any more of a bottle neck than
+ *       having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process
+ *       messages concurrently for different ids. Needs to be static so that when using a chained message listener and
+ *       shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to
+ *       be picked up by the PPP that it is atteched to.
+ *
+ * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock
+ *       pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message
+ *       method add a block that obtains the write lock for the very last message, releases any waiting producer. Means
+ *       that the last message waits until all other messages have been handled before releasing producers but allows
+ *       messages to be processed concurrently, unlike the current synchronized block.
+ *
+ * @todo Set the timeout to be per message correlation id. Restart it every time a message is received (with matching id).
+ *       Means that timeout is measuring situations whether a particular ping stream has pasued for too long, rather than
+ *       the time to send an entire block of messages. This will be better because the timeout won't need to be adjusted
+ *       depending on the total number of messages being sent. Logic to be added to sendAndWait to recheck the timeout
+ *       whenever its wait expires.
+ *
+ * @todo Need to multiply up the number of expected messages for pubsub tests as each can be received by many consumers?
  */
-public class PingPongProducer extends AbstractPingProducer implements Runnable, MessageListener, ExceptionListener
+public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
 {
     private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
 
-    /**
-     * Used to set up a default message size.
-     */
-    protected static final int DEFAULT_MESSAGE_SIZE = 0;
+    /** Holds the name of the property to get the test message size from. */
+    public static final String MESSAGE_SIZE_PROPNAME = "messagesize";
 
-    /**
-     * This is set and used when the test is for multiple-destinations
-     */
-    protected static final int DEFAULT_DESTINATION_COUNT = 0;
+    /** Holds the name of the property to get the ping queue name from. */
+    public static final String PING_QUEUE_NAME_PROPNAME = "destinationname";
 
-    protected static final int DEFAULT_RATE = 0;
+    /** Holds the name of the property to get the test delivery mode from. */
+    public static final String PERSISTENT_MODE_PROPNAME = "persistent";
 
-    /**
-     * Used to define how long to wait between pings.
-     */
-    protected static final long SLEEP_TIME = 250;
+    /** Holds the name of the property to get the test transactional mode from. */
+    public static final String TRANSACTED_PROPNAME = "transacted";
 
-    /**
-     * Used to define how long to wait before assuming that a ping has timed out.
-     */
-    protected static final long TIMEOUT = 9000;
+    /** Holds the name of the property to get the test broker url from. */
+    public static final String BROKER_PROPNAME = "broker";
 
-    /**
-     * Holds the name of the destination to send pings on.
-     */
-    protected static final String PING_DESTINATION_NAME = "ping";
+    /** Holds the name of the property to get the test broker virtual path. */
+    public static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
 
-    /**
-     * The batch size.
-     */
-    protected static final int DEFAULT_BATCH_SIZE = 100;
+    /** Holds the name of the property to get the message rate from. */
+    public static final String RATE_PROPNAME = "rate";
 
-    protected static final int PREFETCH = 100;
-    protected static final boolean NO_LOCAL = true;
-    protected static final boolean EXCLUSIVE = false;
+    public static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
 
-    /**
-     * The number of priming loops to run.
-     */
-    protected static final int PRIMING_LOOPS = 3;
+    /** Holds the true or false depending on wether it is P2P test or PubSub */
+    public static final String IS_PUBSUB_PROPNAME = "pubsub";
 
-    /**
-     * A source for providing sequential unique correlation ids.
-     */
+    public static final String FAIL_AFTER_COMMIT_PROPNAME = "FailAfterCommit";
+
+    public static final String FAIL_BEFORE_COMMIT_PROPNAME = "FailBeforeCommit";
+
+    public static final String FAIL_AFTER_SEND_PROPNAME = "FailAfterSend";
+
+    public static final String FAIL_BEFORE_SEND_PROPNAME = "FailBeforeSend";
+
+    public static final String FAIL_ONCE_PROPNAME = "FailOnce";
+
+    public static final String USERNAME_PROPNAME = "username";
+
+    public static final String PASSWORD_PROPNAME = "password";
+
+    public static final String SELECTOR_PROPNAME = "selector";
+
+    public static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount";
+
+    /** Holds the name of the property to get the waiting timeout for response messages. */
+    public static final String TIMEOUT_PROPNAME = "timeout";
+
+    public static final String COMMIT_BATCH_SIZE_PROPNAME = "CommitBatchSize";
+
+    /** Used to set up a default message size. */
+    public static final int DEFAULT_MESSAGE_SIZE = 0;
+
+    /** Holds the name of the default destination to send pings on. */
+    public static final String DEFAULT_PING_DESTINATION_NAME = "ping";
+
+    /** Defines the default number of destinations to ping. */
+    public static final int DEFAULT_DESTINATION_COUNT = 1;
+
+    /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */
+    public static final int DEFAULT_RATE = 0;
+
+    /** Defines the default wait between pings. */
+    public static final long DEFAULT_SLEEP_TIME = 250;
+
+    /** Default time to wait before assuming that a ping has timed out. */
+    public static final long DEFAULT_TIMEOUT = 9000;
+
+    /** Defines the default number of pings to send in each transaction when running transactionally. */
+    public static final int DEFAULT_TX_BATCH_SIZE = 100;
+
+    /** Defines the default prefetch size to use when consuming messages. */
+    public static final int DEFAULT_PREFETCH = 100;
+
+    /** Defines the default value of the no local flag to use when consuming messages. */
+    public static final boolean DEFAULT_NO_LOCAL = false;
+
+    /** Defines the default value of the exclusive flag to use when consuming messages. */
+    public static final boolean DEFAULT_EXCLUSIVE = false;
+
+    /** Holds the message delivery mode to use for the test. */
+    public static final boolean DEFAULT_PERSISTENT_MODE = false;
+
+    /** Holds the transactional mode to use for the test. */
+    public static final boolean DEFAULT_TRANSACTED = false;
+
+    /** Holds the default broker url for the test. */
+    public static final String DEFAULT_BROKER = "tcp://localhost:5672";
+
+    /** Holds the default virtual path for the test. */
+    public static final String DEFAULT_VIRTUAL_PATH = "test";
+
+    /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
+    public static final boolean DEFAULT_PUBSUB = false;
+
+    /** Holds the default broker log on username. */
+    public static final String DEFAULT_USERNAME = "guest";
+
+    /** Holds the default broker log on password. */
+    public static final String DEFAULT_PASSWORD = "guest";
+
+    /** Holds the default message selector. */
+    public static final String DEFAULT_SELECTOR = null;
+
+    /** Holds the default failover after commit test flag. */
+    public static final String DEFAULT_FAIL_AFTER_COMMIT = "false";
+
+    /** Holds the default failover before commit test flag. */
+    public static final String DEFAULT_FAIL_BEFORE_COMMIT = "false";
+
+    /** Holds the default failover after send test flag. */
+    public static final String DEFAULT_FAIL_AFTER_SEND = "false";
+
+    /** Holds the default failover before send test flag. */
+    public static final String DEFAULT_FAIL_BEFORE_SEND = "false";
+
+    /** Holds the default failover only once flag, true means only do one failover, false means failover on every commit cycle. */
+    public static final String DEFAULT_FAIL_ONCE = "true";
+
+    /** Holds the default verbose mode. */
+    public static final boolean DEFAULT_VERBOSE = false;
+
+    /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
+    public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
+
+    /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
     private static AtomicLong idGenerator = new AtomicLong(0L);
 
     /**
-     * Holds a map from message ids to latches on which threads wait for replies.
+     * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross
+     * multiple ping producers on the same JVM.
      */
-    private static Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
+    private static Map<String, CountDownLatch> trafficLights =
+        Collections.synchronizedMap(new HashMap<String, CountDownLatch>());
 
-    /**
-     * Destination where the responses messages will arrive
-     */
-    private Destination _replyDestination;
+    /** A convenient formatter to use when time stamping output. */
+    protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
 
     /**
-     * Destination where the producer will be sending message to
+     * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
+     * creating multiple ping producers in the same JVM.
      */
-    private Destination _pingDestination;
+    protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
 
-    /**
-     * Determines whether this producer sends persistent messages from the run method.
-     */
+    /** Destination where the response messages will arrive. */
+    private Destination _replyDestination;
+
+    /** Determines whether this producer sends persistent messages. */
     protected boolean _persistent;
 
-    /**
-     * Holds the message size to send, from the run method.
-     */
+    /** Determines what size of messages this producer sends. */
     protected int _messageSize;
 
-    /**
-     * Used to indicate that the ping loop should print out whenever it pings.
-     */
+    /** Used to indicate that the ping loop should print out whenever it pings. */
     protected boolean _verbose = false;
 
+    /** Holds the session on which ping replies are received. */
     protected Session _consumerSession;
 
-    /**
-     * Used to restrict the sending rate to a specified limit.
-     */
-    private Throttle rateLimiter = null;
+    /** Used to restrict the sending rate to a specified limit. */
+    private Throttle _rateLimiter = null;
+
+    /** Holds a message listener that this message listener chains all its messages to. */
+    private ChainedMessageListener _chainedMessageListener = null;
+
+    /** Flag used to indicate if this is a point to point or pub/sub ping client. */
+    protected boolean _isPubSub = false;
 
     /**
-     * The throttler can only reliably restrict to a few hundred cycles per second, so a throttling batch size is used
-     * to group sends together into batches large enough that the throttler runs slower than that.
+     * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
+     * on the same JVM using this id generator will allow them to ping on the same queues.
      */
-    int _throttleBatchSize;
+    protected AtomicInteger _queueSharedId = new AtomicInteger();
 
-    private MessageListener _messageListener = null;
+    /** Used to tell the ping loop when to terminate, it only runs while this is true. */
+    protected boolean _publish = true;
 
-    private PingPongProducer(String brokerDetails, String username, String password, String virtualpath, boolean transacted,
-                             boolean persistent, int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
-                             boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize, int rate)
-            throws Exception
-    {
-        // Create a connection to the broker.
-        InetAddress address = InetAddress.getLocalHost();
-        String clientID = address.getHostName() + System.currentTimeMillis();
+    /** Holds the connection to the broker. */
+    private Connection _connection;
 
-        setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
+    /** Holds the producer session, needed to create ping messages. */
+    private Session _producerSession;
 
-        // Create transactional or non-transactional sessions, based on the command line arguments.
-        setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
-        _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+    /** Holds the set of destiniations that this ping producer pings. */
+    protected List<Destination> _pingDestinations = new ArrayList<Destination>();
 
-        _persistent = persistent;
-        _messageSize = messageSize;
-        _verbose = verbose;
+    /** Holds the message producer to send the pings through. */
+    protected MessageProducer _producer;
 
-        // Set failover interrupts
-        _failAfterCommit = afterCommit;
-        _failBeforeCommit = beforeCommit;
-        _failAfterSend = afterSend;
-        _failBeforeSend = beforeSend;
-        _failOnce = failOnce;
-        _txBatchSize = batchSize;
+    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
+    protected boolean _failBeforeCommit = false;
 
-        // Calculate a throttling batch size and rate such that the throttle runs slower than 100 cycles per second
-        // and batched sends within each cycle multiply up to give the desired rate.
-        //
-        // total rate = throttle rate * batch size.
-        // 1 < throttle rate < 100
-        // 1 < total rate < 20000
-        if (rate > DEFAULT_RATE)
-        {
-            // Log base 10 over 2 is used here to get a feel for what power of 100 the total rate is.
-            // As the total rate goes up the powers of 100 the batch size goes up by powers of 100 to keep the
-            // throttle rate back into the range 1 to 100.
-            int x = (int) (Math.log10(rate) / 2);
-            _throttleBatchSize = (int) Math.pow(100, x);
-            int throttleRate = rate / _throttleBatchSize;
-
-            _logger.debug("rate = " + rate);
-            _logger.debug("x = " + x);
-            _logger.debug("_throttleBatchSize = " + _throttleBatchSize);
-            _logger.debug("throttleRate = " + throttleRate);
-
-            rateLimiter = new Throttle();
-            rateLimiter.setRate(throttleRate);
-        }
-    }
+    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
+    protected boolean _failAfterCommit = false;
 
-    /**
-     * Creates a ping pong producer with the specified connection details and type.
-     *
-     * @param brokerDetails
-     * @param username
-     * @param password
-     * @param virtualpath
-     * @param transacted
-     * @throws Exception All allowed to fall through. This is only test code...
-     */
-    public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
-                            String destinationName, String selector, boolean transacted, boolean persistent,
-                            int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
-                            boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize,
-                            int noOfDestinations, int rate, boolean pubsub) throws Exception
-    {
-        this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose, afterCommit,
-             beforeCommit, afterSend, beforeSend, failOnce, batchSize, rate);
+    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
+    protected boolean _failBeforeSend = false;
 
-        _destinationCount = noOfDestinations;
-        setPubSub(pubsub);
+    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
+    protected boolean _failAfterSend = false;
 
-        if (noOfDestinations == DEFAULT_DESTINATION_COUNT)
-        {
-            if (destinationName != null)
-            {
-                createPingDestination(destinationName);
-                // Create producer and the consumer
-                createProducer();
-                createConsumer(selector);
-            }
-            else
-            {
-                _logger.error("Destination is not specified");
-                throw new IllegalArgumentException("Destination is not specified");
-            }
-        }
-    }
+    /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
+    protected boolean _failOnce = true;
 
-    private void createPingDestination(String name)
-    {
-        if (isPubSub())
-        {
-            _pingDestination = new AMQTopic(name);
-        }
-        else
-        {
-            _pingDestination = new AMQQueue(name);
-        }
-    }
+    /** Holds the number of sends that should be performed in every transaction when using transactions. */
+    protected int _txBatchSize = 1;
 
     /**
-     * Creates the producer to send the pings on.  If the tests are with nultiple-destinations, then producer
-     * is created with null destination, so that any destination can be specified while sending
+     * Creates a ping producer with the specified parameters, of which there are many. See their individual comments
+     * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on it,
+     * to send and recieve its pings and replies on. The other options are kept, and control how this pinger behaves.
      *
-     * @throws JMSException
+     * @param brokerDetails    The URL of the broker to send pings to.
+     * @param username         The username to log onto the broker with.
+     * @param password         The password to log onto the broker with.
+     * @param virtualpath      The virtual host name to use on the broker.
+     * @param destinationName  The name (or root where multiple destinations are used) of the desitination to send
+     *                         pings to.
+     * @param selector         The selector to filter replies with.
+     * @param transacted       Indicates whether or not pings are sent and received in transactions.
+     * @param persistent       Indicates whether pings are sent using peristent delivery.
+     * @param messageSize      Specifies the size of ping messages to send.
+     * @param verbose          Indicates that information should be printed to the console on every ping.
+     * @param afterCommit      Indicates that the user should be promted to terminate a broker after commits to test failover.
+     * @param beforeCommit     Indicates that the user should be promted to terminate a broker before commits to test failover.
+     * @param afterSend        Indicates that the user should be promted to terminate a broker after sends to test failover.
+     * @param beforeSend       Indicates that the user should be promted to terminate a broker before sends to test failover.
+     * @param failOnce         Indicates that the failover testing behaviour should only happen on the first commit, not all.
+     * @param txBatchSize      Specifies the number of pings to send in each transaction.
+     * @param noOfDestinations The number of destinations to ping. Must be 1 or more.
+     * @param rate             Specified the number of pings per second to send. Setting this to 0 means send as fast as
+     *                         possible, with no rate restriction.
+     * @param pubsub
+     *
+     * @throws Exception Any exceptions are allowed to fall through.
      */
-    public void createProducer() throws JMSException
+    public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
+                            String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
+                            boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
+                            boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
+                            boolean pubsub) throws Exception
     {
-        if (getDestinationsCount() > DEFAULT_DESTINATION_COUNT)
+        // Check that one or more destinations were specified.
+        if (noOfDestinations < 1)
         {
-            // create producer with initial destination as null for test with multiple-destinations
-            // In this case, a different destination will be used while sending the message
-            _producer = (MessageProducer) getProducerSession().createProducer(null);
+            throw new IllegalArgumentException("There must be at least one destination.");
         }
-        else
-        {
-            // Create a producer with known destination to send the pings on.
-            _producer = (MessageProducer) getProducerSession().createProducer(_pingDestination);
 
-        }
-
-        _producer.setDisableMessageTimestamp(true);
-        _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-    }
+        // Create a connection to the broker.
+        InetAddress address = InetAddress.getLocalHost();
+        String clientID = address.getHostName() + System.currentTimeMillis();
 
-    /**
-     * Creates the temporary destination to listen to the responses
-     *
-     * @param selector
-     * @throws JMSException
-     */
-    public void createConsumer(String selector) throws JMSException
-    {
-        // Create a temporary destination to get the pongs on.
-        if (isPubSub())
-        {
-            _replyDestination = _consumerSession.createTemporaryTopic();
-        }
-        else
-        {
-            _replyDestination = _consumerSession.createTemporaryQueue();
-        }
+        _connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath);
 
-        // Create a message consumer to get the replies with and register this to be called back by it.
-        MessageConsumer consumer = _consumerSession.createConsumer(_replyDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
-        consumer.setMessageListener(this);
-    }
+        // Create transactional or non-transactional sessions, based on the command line arguments.
+        _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+        _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
 
-    /**
-     * Creates consumer instances for each destination. This is used when test is being done with multiple destinations.
-     *
-     * @param selector
-     * @throws JMSException
-     */
-    public void createConsumers(String selector) throws JMSException
-    {
-        for (int i = 0; i < getDestinationsCount(); i++)
+        // Set up a throttle to control the send rate, if a rate > 0 is specified.
+        if (rate > 0)
         {
-            MessageConsumer consumer =
-                    getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
-            consumer.setMessageListener(this);
+            _rateLimiter = new BatchedThrottle();
+            _rateLimiter.setRate(rate);
         }
-    }
-
 
-    public Session getConsumerSession()
-    {
-        return _consumerSession;
-    }
+        // Create the temporary queue for replies.
+        _replyDestination = _consumerSession.createTemporaryQueue();
 
-    public Destination getPingDestination()
-    {
-        return _pingDestination;
-    }
+        // Create the producer and the consumers for all reply destinations.
+        createProducer();
+        createPingDestinations(noOfDestinations, selector, destinationName, true);
+        createReplyConsumers(getReplyDestinations(), selector);
 
-    protected void setPingDestination(Destination destination)
-    {
-        _pingDestination = destination;
+        // Keep all the remaining options.
+        _persistent = persistent;
+        _messageSize = messageSize;
+        _verbose = verbose;
+        _failAfterCommit = afterCommit;
+        _failBeforeCommit = beforeCommit;
+        _failAfterSend = afterSend;
+        _failBeforeSend = beforeSend;
+        _failOnce = failOnce;
+        _txBatchSize = txBatchSize;
+        _isPubSub = pubsub;
     }
 
     /**
-     * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs
+     * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
      * to be started to bounce the pings back again.
-     * <p/>
-     * <p/>The command line takes from 2 to 4 arguments:
-     * <p/><table>
-     * <tr><td>brokerDetails <td> The broker connection string.
-     * <tr><td>virtualPath   <td> The virtual path.
-     * <tr><td>transacted    <td> A boolean flag, telling this client whether or not to use transactions.
-     * <tr><td>size          <td> The size of ping messages to use, in bytes.
-     * </table>
      *
-     * @param args The command line arguments as defined above.
+     * @param args The command line arguments.
      */
     public static void main(String[] args) throws Exception
     {
@@ -372,22 +410,22 @@
         }
 
         String brokerDetails = config.getHost() + ":" + config.getPort();
-        String virtualpath = "/test";
-        String selector = config.getSelector();
+        String virtualpath = "test";
+        String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
         boolean verbose = true;
         boolean transacted = config.isTransacted();
         boolean persistent = config.usePersistentMessages();
         int messageSize = (config.getPayload() != 0) ? config.getPayload() : DEFAULT_MESSAGE_SIZE;
         //int messageCount = config.getMessages();
         int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DEFAULT_DESTINATION_COUNT;
-        int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_BATCH_SIZE;
+        int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_TX_BATCH_SIZE;
         int rate = (config.getRate() != 0) ? config.getRate() : DEFAULT_RATE;
         boolean pubsub = config.isPubSub();
 
         String destName = config.getDestination();
         if (destName == null)
         {
-            destName = PING_DESTINATION_NAME;
+            destName = DEFAULT_PING_DESTINATION_NAME;
         }
 
         boolean afterCommit = false;
@@ -429,15 +467,13 @@
         }
 
         // Create a ping producer to handle the request/wait/reply cycle.
-        PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath,
-                                                             destName, selector, transacted, persistent, messageSize, verbose,
-                                                             afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
-                                                             destCount, rate, pubsub);
+        PingPongProducer pingProducer =
+            new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
+                                 transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
+                                 beforeSend, failOnce, batchSize, destCount, rate, pubsub);
 
         pingProducer.getConnection().start();
 
-        // Run a few priming pings to remove warm up time from test results.
-        //pingProducer.prime(PRIMING_LOOPS);
         // Create a shutdown hook to terminate the ping-pong producer.
         Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
 
@@ -450,50 +486,105 @@
         pingThread.join();
     }
 
-    private static void usage()
+    /**
+     * Convenience method for a short pause.
+     *
+     * @param sleepTime The time in milliseconds to pause for.
+     */
+    public static void pause(long sleepTime)
+    {
+        if (sleepTime > 0)
+        {
+            try
+            {
+                Thread.sleep(sleepTime);
+            }
+            catch (InterruptedException ie)
+            { }
+        }
+    }
+
+    /**
+     * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply
+     * to destination of this pinger.
+     *
+     * @return The single reply to destination of this pinger, wrapped in a list.
+     */
+    public List<Destination> getReplyDestinations()
+    {
+        _logger.debug("public List<Destination> getReplyDestinations(): called");
+
+        List<Destination> replyDestinations = new ArrayList<Destination>();
+        replyDestinations.add(_replyDestination);
+
+        return replyDestinations;
+    }
+
+    /**
+     * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery
+     * flag is set accoring the ping producer creation options.
+     *
+     * @throws JMSException Any JMSExceptions are allowed to fall through.
+     */
+    public void createProducer() throws JMSException
     {
-        System.err.println("Usage: TestPingPublisher \n" + "-host : broker host" + "-port : broker port" +
-                           "-destinationname : queue/topic name\n" +
-                           "-transacted : (true/false). Default is false\n" +
-                           "-persistent : (true/false). Default is false\n" +
-                           "-pubsub     : (true/false). Default is false\n" +
-                           "-selector   : selector string\n" +
-                           "-payload    : paylaod size. Default is 0\n" +
-                           //"-messages   : no of messages to be sent (if 0, the ping loop will run indefinitely)\n" +
-                           "-destinationscount : no of destinations for multi-destinations test\n" +
-                           "-batchsize  : batch size\n" +
-                           "-rate : thruput rate\n");
+        _logger.debug("public void createProducer(): called");
+
+        _producer = (MessageProducer) _producerSession.createProducer(null);
+        //_producer.setDisableMessageTimestamp(true);
+        _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
     }
 
     /**
-     * Primes the test loop by sending a few messages, then introduces a short wait. This allows the bounce back client
-     * on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling
-     * this a few times, in order to prime the JVMs JIT compilation.
+     * Creates consumers for the specified number of destinations. The destinations themselves are also created by
+     * this method.
+     *
+     * @param noOfDestinations The number of destinations to create consumers for.
+     * @param selector         The message selector to filter the consumers with.
+     * @param rootName         The root of the name, or actual name if only one is being created.
+     * @param unique           <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share
+     *                         the numbering with all pingers on the same JVM.
      *
-     * @param x The number of priming loops to run.
-     * @throws JMSException All underlying exceptions are allowed to fall through.
+     * @throws JMSException Any JMSExceptions are allowed to fall through.
      */
-    public void prime(int x) throws JMSException
+    public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
+                                throws JMSException
     {
-        for (int i = 0; i < x; i++)
+        _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations
+                      + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = "
+                      + unique + "): called");
+
+        // Create the desired number of ping destinations and consumers for them.
+        for (int i = 0; i < noOfDestinations; i++)
         {
-            // Create and send a small message.
-            Message first = getTestMessage(_replyDestination, 0, false);
-            sendMessage(first);
+            AMQDestination destination = null;
 
-            commitTx();
+            int id;
 
-            try
+            // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
+            if (unique)
             {
-                Thread.sleep(100);
+                id = _queueJVMSequenceID.incrementAndGet();
             }
-            catch (InterruptedException ignore)
+            else
             {
-
+                id = _queueSharedId.incrementAndGet();
             }
-        }
 
+            // Check if this is a pub/sub pinger, in which case create topics.
+            if (_isPubSub)
+            {
+                destination = new AMQTopic(rootName + id);
+            }
+            // Otherwise this is a p2p pinger, in which case create queues.
+            else
+            {
+                destination = new AMQQueue(rootName + id);
+            }
 
+            // Keep the destination.
+            _pingDestinations.add(destination);
+        }
     }
 
     /**
@@ -505,60 +596,72 @@
      */
     public void onMessage(Message message)
     {
+        _logger.debug("public void onMessage(Message message): called");
 
         try
         {
-
-            // Store the reply, if it has a correlation id that is expected.
+            // Extract the messages correlation id.
             String correlationID = message.getJMSCorrelationID();
+            _logger.debug("correlationID = " + correlationID);
 
-            if (_verbose)
-            {
-                _logger.info(timestampFormatter.format(new Date()) + ": Got reply with correlation id, " + correlationID);
-                //_logger.debug("Received from : " + message.getJMSDestination());
-            }
-
-            // Turn the traffic light to green.
+            // Countdown on the traffic light if there is one for the matching correlation id.
             CountDownLatch trafficLight = trafficLights.get(correlationID);
 
             if (trafficLight != null)
             {
-                if (_messageListener != null)
-                {
-                    synchronized (trafficLight)
-                    {
-                        _messageListener.onMessage(message);
-                        trafficLight.countDown();
-                    }
-                }
-                else
+                _logger.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+
+                // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
+                // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
+                // ensures that each thread will get a unique value for the remaining messages.
+                long trueCount = -1;
+                long remainingCount = -1;
+
+                synchronized (trafficLight)
                 {
                     trafficLight.countDown();
-                }
 
-                _logger.trace("Reply was expected, decrementing the latch for the id.");
+                    trueCount = trafficLight.getCount();
+                    remainingCount = trueCount - 1;
 
-                long remainingCount = trafficLight.getCount();
+                    _logger.debug("remainingCount = " + remainingCount);
+                    _logger.debug("trueCount = " + trueCount);
 
-                if ((remainingCount % _txBatchSize) == 0)
-                {
-                    commitTx(getConsumerSession());
-                }
+                    // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
+                    // blocked, even on the last message.
+                    if ((remainingCount % _txBatchSize) == 0)
+                    {
+                        commitTx(_consumerSession);
+                    }
+
+                    // Forward the message and remaining count to any interested chained message listener.
+                    if (_chainedMessageListener != null)
+                    {
+                        _chainedMessageListener.onMessage(message, (int) remainingCount);
+                    }
 
+                    // Check if this is the last message, in which case release any waiting producers. This is done
+                    // after the transaction has been committed and any listeners notified.
+                    if (trueCount == 1)
+                    {
+                        trafficLight.countDown();
+                    }
+                }
             }
             else
             {
-                _logger.trace("There was no thread waiting for reply: " + correlationID);
+                _logger.debug("There was no thread waiting for reply: " + correlationID);
             }
 
+            // Print out ping times for every message in verbose mode only.
             if (_verbose)
             {
-                Long timestamp = message.getLongProperty("timestamp");
+                Long timestamp = message.getLongProperty(MESSAGE_TIMESTAMP_PROPNAME);
 
                 if (timestamp != null)
                 {
-                    long diff = System.currentTimeMillis() - timestamp;
-                    _logger.trace("Time for round trip: " + diff);
+                    long diff = System.nanoTime() - timestamp;
+                    _logger.trace("Time for round trip (nanos): " + diff);
                 }
             }
         }
@@ -566,32 +669,70 @@
         {
             _logger.warn("There was a JMSException: " + e.getMessage(), e);
         }
+
+        _logger.debug("public void onMessage(Message message): ending");
     }
 
     /**
      * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
-     * before a reply arrives, then a null reply is returned from this method.
+     * before a reply arrives, then a null reply is returned from this method. This method generates a new unqiue
+     * correlation id for the messages.
      *
      * @param message  The message to send.
      * @param numPings The number of ping messages to send.
      * @param timeout  The timeout in milliseconds.
+     *
      * @return The number of replies received. This may be less than the number sent if the timeout terminated the
      *         wait for all prematurely.
+     *
      * @throws JMSException All underlying JMSExceptions are allowed to fall through.
      */
     public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
     {
-        String messageCorrelationId = null;
+        _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+                      + timeout + "): called");
+
+        // Create a unique correlation id to put on the messages before sending them.
+        String messageCorrelationId = Long.toString(idGenerator.incrementAndGet());
+
+        return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
+    }
+
+    /**
+     * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
+     * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
+     * the correlation id.
+     *
+     * @param message              The message to send.
+     * @param numPings             The number of ping messages to send.
+     * @param timeout              The timeout in milliseconds.
+     * @param messageCorrelationId The message correlation id.
+     *
+     * @return The number of replies received. This may be less than the number sent if the timeout terminated the
+     *         wait for all prematurely.
+     *
+     * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+     */
+    public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
+                            throws JMSException, InterruptedException
+    {
+        _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+                      + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
 
         try
         {
-            // Put a unique correlation id on the message before sending it.
-            messageCorrelationId = Long.toString(getNewID());
+            // Create a count down latch to count the number of replies with. This is created before the messages are
+            // sent so that the replies cannot be received before the count down is created.
+            // One is added to this, so that the last reply becomes a special case. The special case is that the
+            // chained message listener must be called before this sender can be unblocked, but that decrementing the
+            // countdown needs to be done before the chained listener can be called.
+            CountDownLatch trafficLight = new CountDownLatch(numPings + 1);
+            trafficLights.put(messageCorrelationId, trafficLight);
 
+            // Send the specifed number of messages.
             pingNoWaitForReply(message, numPings, messageCorrelationId);
 
-            CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
-            // Block the current thread until a reply to the message is received, or it times out.
+            // Block the current thread until replies to all the message are received, or it times out.
             trafficLight.await(timeout, TimeUnit.MILLISECONDS);
 
             // Work out how many replies were receieved.
@@ -606,45 +747,37 @@
                 _logger.info("Got all replies on id, " + messageCorrelationId);
             }
 
-            commitTx(getConsumerSession());
+            commitTx(_consumerSession);
+
+            _logger.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
 
             return numReplies;
         }
+        // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived,
+        // so will be a memory leak if this is not done.
         finally
         {
-            removeLock(messageCorrelationId);
+            trafficLights.remove(messageCorrelationId);
         }
     }
 
-    public long getNewID()
-    {
-        return idGenerator.incrementAndGet();
-    }
-
-    public CountDownLatch removeLock(String correlationID)
-    {
-        return trafficLights.remove(correlationID);
-    }
-
-
-    /*
-    * Sends the specified ping message but does not wait for a correlating reply.
-    *
-    * @param message  The message to send.
-    * @param numPings The number of pings to send.
-    * @return The reply, or null if no reply arrives before the timeout.
-    * @throws JMSException All underlying JMSExceptions are allowed to fall through.
-    */
-    public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException
+    /**
+     * Sends the specified number of ping messages and does not wait for correlating replies.
+     *
+     * @param message              The message to send.
+     * @param numPings             The number of pings to send.
+     * @param messageCorrelationId A correlation id to place on all messages sent.
+     *
+     * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+     */
+    public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
     {
-        // Create a count down latch to count the number of replies with. This is created before the message is sent
-        // so that the message is not received before the count down is created.
-        CountDownLatch trafficLight = new CountDownLatch(numPings);
-        trafficLights.put(messageCorrelationId, trafficLight);
+        _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+                      + ", String messageCorrelationId = " + messageCorrelationId + "): called");
 
         message.setJMSCorrelationID(messageCorrelationId);
 
-        // Set up a committed flag to detect uncommitted message at the end of the send loop. This may occurr if the
+        // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
         // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
         // needed.
         boolean committed = false;
@@ -652,55 +785,46 @@
         // Send all of the ping messages.
         for (int i = 0; i < numPings; i++)
         {
-            // Reset the committed flag to indicate that there are uncommitted message.
+            // Reset the committed flag to indicate that there are uncommitted messages.
             committed = false;
 
             // Re-timestamp the message.
-            message.setLongProperty("timestamp", System.currentTimeMillis());
+            message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
 
-            // Check if the test is with multiple-destinations, in which case round robin the destinations
-            // as the messages are sent.
-            if (getDestinationsCount() > DEFAULT_DESTINATION_COUNT)
-            {
-                sendMessage(getDestination(i % getDestinationsCount()), message);
-            }
-            else
-            {
-                sendMessage(message);
-            }
+            // Round robin the destinations as the messages are sent.
+            //return _destinationCount;
+            sendMessage(_pingDestinations.get(i % _pingDestinations.size()), message);
 
-            // Apply message rate throttling if a rate limit has been set up and the throttling batch limit has been
-            // reached. See the comment on the throttle batch size for information about the use of batches here.
-            if ((rateLimiter != null) && ((i % _throttleBatchSize) == 0))
+            // Apply message rate throttling if a rate limit has been set up.
+            if (_rateLimiter != null)
             {
-                rateLimiter.throttle();
+                _rateLimiter.throttle();
             }
 
             // Call commit every time the commit batch size is reached.
             if ((i % _txBatchSize) == 0)
             {
-                commitTx();
+                commitTx(_producerSession);
                 committed = true;
             }
+
+            // Spew out per message timings on every message sonly in verbose mode.
+            if (_verbose)
+            {
+                _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, "
+                             + messageCorrelationId);
+            }
         }
 
         // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
         if (!committed)
         {
-            commitTx();
-        }
-
-        // Spew out per message timings only in verbose mode.
-        if (_verbose)
-        {
-            _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
+            commitTx(_producerSession);
         }
-
     }
 
     /**
-     * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and
-     * waits for replies and inserts short pauses in between each.
+     * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
      */
     public void pingLoop()
     {
@@ -708,13 +832,13 @@
         {
             // Generate a sample message and time stamp it.
             ObjectMessage msg = getTestMessage(_replyDestination, _messageSize, _persistent);
-            msg.setLongProperty("timestamp", System.currentTimeMillis());
+            msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
 
             // Send the message and wait for a reply.
-            pingAndWaitForReply(msg, DEFAULT_BATCH_SIZE, TIMEOUT);
+            pingAndWaitForReply(msg, DEFAULT_TX_BATCH_SIZE, DEFAULT_TIMEOUT);
 
             // Introduce a short pause if desired.
-            pause(SLEEP_TIME);
+            pause(DEFAULT_SLEEP_TIME);
         }
         catch (JMSException e)
         {
@@ -728,79 +852,300 @@
         }
     }
 
-    public Destination getReplyDestination()
+    /*public Destination getReplyDestination()
     {
         return _replyDestination;
+    }*/
+
+    /**
+     * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
+     * here.
+     *
+     * @param messageListener The chained message listener.
+     */
+    public void setChainedMessageListener(ChainedMessageListener messageListener)
+    {
+        _chainedMessageListener = messageListener;
     }
 
-    protected void setReplyDestination(Destination destination)
+    /**
+     * Removes any chained message listeners from this pinger.
+     */
+    public void removeChainedMessageListener()
     {
-        _replyDestination = destination;
+        _chainedMessageListener = null;
     }
 
-    public void setMessageListener(MessageListener messageListener)
+    /**
+     * Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
+     *
+     * @param replyQueue  The reply-to destination for the message.
+     * @param messageSize The desired size of the message in bytes.
+     * @param persistent  <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
+     *
+     * @return A freshly generated test message.
+     *
+     * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
+     */
+    public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
     {
-        _messageListener = messageListener;
+        ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
+
+        // Timestamp the message in nanoseconds.
+        msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+
+        return msg;
     }
 
-    public CountDownLatch getEndLock(String correlationID)
+    /**
+     * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
+     * flag has been cleared.
+     */
+    public void stop()
     {
-        return trafficLights.get(correlationID);
+        _publish = false;
     }
 
-    /*
-    * When the test is being performed with multiple queues, then this method will be used, which has a loop to
-    * pick up the next queue from the queues list and sends message to it.
-    *
-    * @param message
-    * @param numPings
-    * @throws JMSException
-    */
-    /*private void pingMultipleQueues(Message message, int numPings) throws JMSException
+    /**
+     * Implements a ping loop that repeatedly pings until the publish flag becomes false.
+     */
+    public void run()
     {
-        int queueIndex = 0;
-        for (int i = 0; i < numPings; i++)
+        // Keep running until the publish flag is cleared.
+        while (_publish)
         {
-            // Re-timestamp the message.
-            message.setLongProperty("timestamp", System.currentTimeMillis());
+            pingLoop();
+        }
+    }
 
-            sendMessage(getDestination(queueIndex++), message);
+    /**
+     * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
+     * connection, this clears the publish flag which in turn will halt the ping loop.
+     *
+     * @param e The exception that triggered this callback method.
+     */
+    public void onException(JMSException e)
+    {
+        _publish = false;
+        _logger.debug("There was a JMSException: " + e.getMessage(), e);
+    }
 
-            // reset the counter to get the first queue
-            if (queueIndex == (getDestinationsCount() - 1))
+    /**
+     * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
+     * with the runtime system as a shutdown hook.
+     *
+     * @return A shutdown hook for the ping loop.
+     */
+    public Thread getShutdownHook()
+    {
+        return new Thread(new Runnable()
             {
-                queueIndex = 0;
-            }
+                public void run()
+                {
+                    stop();
+                }
+            });
+    }
+
+    /**
+     * Gets the underlying connection that this ping client is running on.
+     *
+     * @return The underlying connection that this ping client is running on.
+     */
+    public Connection getConnection()
+    {
+        return _connection;
+    }
+
+    /**
+     * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
+     *
+     * @param destinations The destinations to listen to.
+     * @param selector     A selector to filter the messages with.
+     *
+     * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
+     */
+    public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
+    {
+        _logger.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+                      + ", String selector = " + selector + "): called");
+
+        for (Destination destination : destinations)
+        {
+            // Create a consumer for the destination and set this pinger to listen to its messages.
+            MessageConsumer consumer =
+                _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
+                                                selector);
+            consumer.setMessageListener(this);
         }
-    }*/
+    }
 
     /**
-     * A connection listener that logs out any failover complete events. Could do more interesting things with this
-     * at some point...
+     * Closes the pingers connection.
+     *
+     * @throws JMSException All JMSException are allowed to fall through.
      */
-    public static class FailoverNotifier implements ConnectionListener
+    public void close() throws JMSException
     {
-        public void bytesSent(long count)
+        _logger.debug("public void close(): called");
+
+        if (_connection != null)
         {
+            _connection.close();
         }
+    }
+
+    /**
+     * Convenience method to commit the transaction on the specified session. If the session to commit on is not
+     * a transactional session, this method does nothing (unless the failover after send flag is set).
+     *
+     * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit
+     * is applied. This flag applies whether the pinger is transactional or not.
+     *
+     * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
+     * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
+     * after the commit is applied. These flags will only apply if using a transactional pinger.
+     *
+     * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+     *
+     * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
+     *       method, because commits only apply to transactional pingers, but fail after send applied to transactional
+     *       and non-transactional alike.
+     */
+    protected void commitTx(Session session) throws JMSException
+    {
+        _logger.debug("protected void commitTx(Session session): called");
 
-        public void bytesReceived(long count)
+        _logger.trace("Batch time reached");
+        if (_failAfterSend)
         {
+            _logger.trace("Batch size reached");
+            if (_failOnce)
+            {
+                _failAfterSend = false;
+            }
+
+            _logger.trace("Failing After Send");
+            doFailover();
         }
 
-        public boolean preFailover(boolean redirect)
+        if (session.getTransacted())
         {
-            return true; //Allow failover
+            try
+            {
+                if (_failBeforeCommit)
+                {
+                    if (_failOnce)
+                    {
+                        _failBeforeCommit = false;
+                    }
+
+                    _logger.trace("Failing Before Commit");
+                    doFailover();
+                }
+
+                session.commit();
+
+                if (_failAfterCommit)
+                {
+                    if (_failOnce)
+                    {
+                        _failAfterCommit = false;
+                    }
+
+                    _logger.trace("Failing After Commit");
+                    doFailover();
+                }
+
+                _logger.trace("Session Commited.");
+            }
+            catch (JMSException e)
+            {
+                _logger.trace("JMSException on commit:" + e.getMessage(), e);
+
+                // Warn that the bounce back client is not available.
+                if (e.getLinkedException() instanceof AMQNoConsumersException)
+                {
+                    _logger.debug("No consumers on queue.");
+                }
+
+                try
+                {
+                    session.rollback();
+                    _logger.trace("Message rolled back.");
+                }
+                catch (JMSException jmse)
+                {
+                    _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+
+                    // Both commit and rollback failed. Throw the rollback exception.
+                    throw jmse;
+                }
+            }
+        }
+    }
+
+    /**
+     * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination
+     * of the ping producer. If an explicit destination is set, this overrides the default.
+     *
+     * @param destination The destination to send to.
+     * @param message     The message to send.
+     *
+     * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
+     */
+    protected void sendMessage(Destination destination, Message message) throws JMSException
+    {
+        if (_failBeforeSend)
+        {
+            if (_failOnce)
+            {
+                _failBeforeSend = false;
+            }
+
+            _logger.trace("Failing Before Send");
+            doFailover();
         }
 
-        public boolean preResubscribe()
+        if (destination == null)
+        {
+            _producer.send(message);
+        }
+        else
         {
-            return true; // Allow resubscription
+            _producer.send(destination, message);
         }
+    }
 
-        public void failoverComplete()
+    /**
+     * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
+     * until the user supplied some input on the terminal.
+     */
+    protected void doFailover()
+    {
+        System.out.println("Kill Broker now then press return");
+        try
         {
-            _logger.info("App got failover complete callback.");
+            System.in.read();
         }
+        catch (IOException e)
+        { }
+
+        System.out.println("Continuing.");
+    }
+
+    /**
+     * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's
+     * {@link PingPongProducer#onMessage} method is called, the chained listener set through the
+     * {@link PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected
+     * count of messages with that correlation id.
+     *
+     * Provided only one pinger is producing messages with that correlation id, the chained listener will always be
+     * given unique message counts. It will always be called while the producer waiting for all messages to arrive is
+     * still blocked.
+     */
+    public static interface ChainedMessageListener
+    {
+        public void onMessage(Message message, int remainingCount) throws JMSException;
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j?view=diff&rev=502624&r1=502623&r2=502624
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j Fri Feb  2 07:28:08 2007
@@ -24,19 +24,19 @@
 
 log4j.logger.org.apache.qpid.requestreply=${amqj.test.logging.level}
 log4j.logger.org.apache.qpid.pingpong=${amqj.test.logging.level}
+log4j.logger.org.apache.qpid.ping=${amqj.test.logging.level}
 log4j.logger.org.apache.qpid.topic=${amqj.test.logging.level}
 
 
-log4j.logger.uk.co.thebadgerset.junit.extensions=info, console
+log4j.logger.uk.co.thebadgerset.junit.extensions=${badger.level}, console
 log4j.additivity.uk.co.thebadgerset.junit.extensions=false
-log4j.logger.uk.co.thebadgerset.junit.extensions=info
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.Threshold=all
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 
-#log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
-log4j.appender.console.layout.ConversionPattern=%p [%c] %m%n
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
+#log4j.appender.console.layout.ConversionPattern=%t %p [%c] %m%n
 
 log4j.appender.fileApp=org.apache.log4j.FileAppender
 log4j.appender.fileApp.file=${log.dir}/perftests.volumetest.log