You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/04/05 13:47:51 UTC

svn commit: r525800 [2/3] - in /incubator/qpid/trunk/qpid: ./ java/integrationtests/src/main/java/org/apache/qpid/util/ java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ java/perftests/src/main/java/org/apache/qpid/ping/...

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=525800&r1=525799&r2=525800
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Thu Apr  5 04:47:50 2007
@@ -34,20 +34,22 @@
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQNoConsumersException;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.message.TestMessageFactory;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.jms.MessageProducer;
 import org.apache.qpid.jms.Session;
-import org.apache.qpid.topic.Config;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.util.CommandLineParser;
 
 import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
 import uk.co.thebadgerset.junit.extensions.Throttle;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
 /**
  * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may
@@ -65,31 +67,37 @@
  * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover
  * testing. A complete list of accepted parameters, default values and comments on their usage is provided here:
  *
- * <p/><table><caption>CRC Card</caption>
- * <tr><th> Parameter        <th> Default <th> Comments
- * <tr><td> messageSize      <td> 0       <td> Message size in bytes. Not including any headers.
- * <tr><td> destinationName  <td> ping    <td> The root name to use to generate destination names to ping.
- * <tr><td> persistent       <td> false   <td> Determines whether peristent delivery is used.
- * <tr><td> transacted       <td> false   <td> Determines whether messages are sent/received in transactions.
+ * <p/><table><caption>Parameters</caption>
+ * <tr><th> Parameter        <th> Default  <th> Comments
+ * <tr><td> messageSize      <td> 0        <td> Message size in bytes. Not including any headers.
+ * <tr><td> destinationName  <td> ping     <td> The root name to use to generate destination names to ping.
+ * <tr><td> persistent       <td> false    <td> Determines whether peristent delivery is used.
+ * <tr><td> transacted       <td> false    <td> Determines whether messages are sent/received in transactions.
  * <tr><td> broker           <td> tcp://localhost:5672 <td> Determines the broker to connect to.
- * <tr><td> virtualHost      <td> test    <td> Determines the virtual host to send all ping over.
- * <tr><td> rate             <td> 0       <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
- * <tr><td> verbose          <td> false   <td> The verbose flag for debugging. Prints to console on every message.
- * <tr><td> pubsub           <td> false   <td> Whether to ping topics or queues. Uses p2p by default.
- * <tr><td> failAfterCommit  <td> false   <td> Whether to prompt user to kill broker after a commit batch.
- * <tr><td> failBeforeCommit <td> false   <td> Whether to prompt user to kill broker before a commit batch.
- * <tr><td> failAfterSend    <td> false   <td> Whether to prompt user to kill broker after a send.
- * <tr><td> failBeforeSend   <td> false   <td> Whether to prompt user to kill broker before a send.
- * <tr><td> failOnce         <td> true    <td> Whether to prompt for failover only once.
- * <tr><td> username         <td> guest   <td> The username to access the broker with.
- * <tr><td> password         <td> guest   <td> The password to access the broker with.
- * <tr><td> selector         <td> null    <td> Not used. Defines a message selector to filter pings with.
- * <tr><td> destinationCount <td> 1       <td> The number of receivers listening to the pings.
- * <tr><td> timeout          <td> 30000   <td> In milliseconds. The timeout to stop waiting for replies.
- * <tr><td> commitBatchSize  <td> 1       <td> The number of messages per transaction in transactional mode.
- * <tr><td> uniqueDests      <td> true    <td> Whether each receiver only listens to one ping destination or all.
- * <tr><td> ackMode          <td> NO_ACK  <td> The message acknowledgement mode.
- * <tr><td> pauseBatch       <td> 0       <td> In milliseconds. A pause to insert between transaction batches.
+ * <tr><td> virtualHost      <td> test     <td> Determines the virtual host to send all ping over.
+ * <tr><td> rate             <td> 0        <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
+ * <tr><td> verbose          <td> false    <td> The verbose flag for debugging. Prints to console on every message.
+ * <tr><td> pubsub           <td> false    <td> Whether to ping topics or queues. Uses p2p by default.
+ * <tr><td> failAfterCommit  <td> false    <td> Whether to prompt user to kill broker after a commit batch.
+ * <tr><td> failBeforeCommit <td> false    <td> Whether to prompt user to kill broker before a commit batch.
+ * <tr><td> failAfterSend    <td> false    <td> Whether to prompt user to kill broker after a send.
+ * <tr><td> failBeforeSend   <td> false    <td> Whether to prompt user to kill broker before a send.
+ * <tr><td> failOnce         <td> true     <td> Whether to prompt for failover only once.
+ * <tr><td> username         <td> guest    <td> The username to access the broker with.
+ * <tr><td> password         <td> guest    <td> The password to access the broker with.
+ * <tr><td> selector         <td> null     <td> Not used. Defines a message selector to filter pings with.
+ * <tr><td> destinationCount <td> 1        <td> The number of receivers listening to the pings.
+ * <tr><td> timeout          <td> 30000    <td> In milliseconds. The timeout to stop waiting for replies.
+ * <tr><td> commitBatchSize  <td> 1        <td> The number of messages per transaction in transactional mode.
+ * <tr><td> uniqueDests      <td> true     <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> ackMode          <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
+ *                                               0 - SESSION_TRANSACTED
+ *                                               1 - AUTO_ACKNOWLEDGE
+ *                                               2 - CLIENT_ACKNOWLEDGE
+ *                                               3 - DUPS_OK_ACKNOWLEDGE
+ *                                               257 - NO_ACKNOWLEDGE
+ *                                               258 - PRE_ACKNOWLEDGE
+ * <tr><td> pauseBatch       <td> 0        <td> In milliseconds. A pause to insert between transaction batches.
  * </table>
  *
  * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
@@ -121,7 +129,7 @@
  */
 public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
 {
-    private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
+    private static final Logger log = Logger.getLogger(PingPongProducer.class);
 
     /** Holds the name of the property to get the test message size from. */
     public static final String MESSAGE_SIZE_PROPNAME = "messageSize";
@@ -181,31 +189,31 @@
     public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit";
 
     /** Holds the default failover after commit test flag. */
-    public static final String FAIL_AFTER_COMMIT_DEFAULT = "false";
+    public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false;
 
     /** Holds the name of the proeprty to get the fail before commit flag from. */
     public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit";
 
     /** Holds the default failover before commit test flag. */
-    public static final String FAIL_BEFORE_COMMIT_DEFAULT = "false";
+    public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false;
 
     /** Holds the name of the proeprty to get the fail after send flag from. */
     public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend";
 
     /** Holds the default failover after send test flag. */
-    public static final String FAIL_AFTER_SEND_DEFAULT = "false";
+    public static final boolean FAIL_AFTER_SEND_DEFAULT = false;
 
     /** Holds the name of the property to get the fail before send flag from. */
     public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend";
 
     /** Holds the default failover before send test flag. */
-    public static final String FAIL_BEFORE_SEND_DEFAULT = "false";
+    public static final boolean FAIL_BEFORE_SEND_DEFAULT = false;
 
     /** Holds the name of the property to get the fail once flag from. */
     public static final String FAIL_ONCE_PROPNAME = "failOnce";
 
     /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */
-    public static final String FAIL_ONCE_DEFAULT = "true";
+    public static final boolean FAIL_ONCE_DEFAULT = true;
 
     /** Holds the name of the property to get the broker access username from. */
     public static final String USERNAME_PROPNAME = "username";
@@ -223,7 +231,7 @@
     public static final String SELECTOR_PROPNAME = "selector";
 
     /** Holds the default message selector. */
-    public static final String SELECTOR_DEFAULT = null;
+    public static final String SELECTOR_DEFAULT = "";
 
     /** Holds the name of the proeprty to get the destination count from. */
     public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
@@ -253,7 +261,7 @@
     public static final String ACK_MODE_PROPNAME = "ackMode";
 
     /** Defines the default message acknowledgement mode. */
-    public static final int ACK_MODE_DEFAULT = Session.NO_ACKNOWLEDGE;
+    public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
 
     /** Holds the name of the property to get the pause between batches property from. */
     public static final String PAUSE_AFTER_BATCH_PROPNAME = "pauseBatch";
@@ -273,6 +281,86 @@
     /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
     public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
 
+    /** Holds the default configuration properties. */
+    public static ParsedProperties defaults = new ParsedProperties();
+
+    static
+    {
+        defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
+        defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT);
+        defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
+        defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
+        defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+        defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
+        defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
+        defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
+        defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+        defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
+        defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
+        defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
+        defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
+        defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
+        defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
+        defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
+        defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT);
+        defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT);
+        defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
+        defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
+        defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
+        defaults.setPropertyIfNull(PAUSE_AFTER_BATCH_PROPNAME, PAUSE_AFTER_BATCH_DEFAULT);
+        defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
+    }
+
+    protected String _brokerDetails;
+    protected String _username;
+    protected String _password;
+    protected String _virtualpath;
+    protected String _destinationName;
+    protected String _selector;
+    protected boolean _transacted;
+
+    /** Determines whether this producer sends persistent messages. */
+    protected boolean _persistent;
+
+    /** Holds the acknowledgement mode used for sending and receiving messages. */
+    private int _ackMode;
+
+    /** Determines what size of messages this producer sends. */
+    protected int _messageSize;
+
+    /** Used to indicate that the ping loop should print out whenever it pings. */
+    protected boolean _verbose;
+
+    /** Flag used to indicate if this is a point to point or pub/sub ping client. */
+    protected boolean _isPubSub;
+
+    /** Flag used to indicate if the destinations should be unique client. */
+    protected boolean _isUnique;
+
+    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
+    protected boolean _failBeforeCommit;
+
+    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
+    protected boolean _failAfterCommit;
+
+    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
+    protected boolean _failBeforeSend;
+
+    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
+    protected boolean _failAfterSend;
+
+    /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
+    protected boolean _failOnce;
+
+    /** Holds the number of sends that should be performed in every transaction when using transactions. */
+    protected int _txBatchSize;
+
+    protected int _noOfDestinations;
+    protected int _rate;
+
+    /** Holds the wait time to insert between every batch of messages committed. */
+    private long _pauseBatch;
+
     /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
     private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
 
@@ -286,83 +374,47 @@
     /** A convenient formatter to use when time stamping output. */
     protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
 
-    /**
-     * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
-     * creating multiple ping producers in the same JVM.
-     */
-    protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
-
-    /** Holds the destination where the response messages will arrive. */
-    private Destination _replyDestination;
-
-    /** Determines whether this producer sends persistent messages. */
-    protected boolean _persistent;
+    /** Holds the connection to the broker. */
+    protected Connection _connection;
 
-    /** Holds the acknowledgement mode used for sending and receiving messages. */
-    private int _ackMode = Session.NO_ACKNOWLEDGE;
+    /** Holds the session on which ping replies are received. */
+    protected Session _consumerSession;
 
-    /** Determines what size of messages this producer sends. */
-    protected int _messageSize;
+    /** Holds the producer session, needed to create ping messages. */
+    protected Session _producerSession;
 
-    /** Used to indicate that the ping loop should print out whenever it pings. */
-    protected boolean _verbose = VERBOSE_DEFAULT;
+    /** Holds the destination where the response messages will arrive. */
+    protected Destination _replyDestination;
 
-    /** Holds the session on which ping replies are received. */
-    protected Session _consumerSession;
+    /** Holds the set of destinations that this ping producer pings. */
+    protected List<Destination> _pingDestinations;
 
     /** Used to restrict the sending rate to a specified limit. */
-    private Throttle _rateLimiter = null;
+    protected Throttle _rateLimiter;
 
     /** Holds a message listener that this message listener chains all its messages to. */
-    private ChainedMessageListener _chainedMessageListener = null;
-
-    /** Flag used to indicate if this is a point to point or pub/sub ping client. */
-    protected boolean _isPubSub = PUBSUB_DEFAULT;
+    protected ChainedMessageListener _chainedMessageListener = null;
 
-    /** Flag used to indicate if the destinations should be unique client. */
-    protected static boolean _isUnique = UNIQUE_DESTS_DEFAULT;
+    /**
+     * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
+     * creating multiple ping producers in the same JVM.
+     */
+    protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
 
     /**
      * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
      * on the same JVM using this id generator will allow them to ping on the same queues.
      */
-    protected AtomicInteger _queueSharedId = new AtomicInteger();
+    protected AtomicInteger _queueSharedID = new AtomicInteger();
 
     /** Used to tell the ping loop when to terminate, it only runs while this is true. */
     protected boolean _publish = true;
 
-    /** Holds the connection to the broker. */
-    private Connection _connection;
-
-    /** Holds the producer session, needed to create ping messages. */
-    private Session _producerSession;
-
-    /** Holds the set of destinations that this ping producer pings. */
-    protected List<Destination> _pingDestinations = new ArrayList<Destination>();
-
     /** Holds the message producer to send the pings through. */
     protected MessageProducer _producer;
 
-    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
-    protected boolean _failBeforeCommit = false;
-
-    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
-    protected boolean _failAfterCommit = false;
-
-    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
-    protected boolean _failBeforeSend = false;
-
-    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
-    protected boolean _failAfterSend = false;
-
-    /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
-    protected boolean _failOnce = true;
-
-    /** Holds the number of sends that should be performed in every transaction when using transactions. */
-    protected int _txBatchSize = TX_BATCH_SIZE_DEFAULT;
-
-    /** Holds the wait time to insert between every batch of messages committed. */
-    private static long _pauseBatch = PAUSE_AFTER_BATCH_DEFAULT;
+    /** Holds the message consumer to receive the ping replies through. */
+    protected MessageConsumer _consumer;
 
     /**
      * Holds the number of consumers that will be attached to each topic. Each pings will result in a reply from each of the
@@ -370,202 +422,195 @@
      */
     static int _consumersPerTopic = 1;
 
+    /** The prompt to display when asking the user to kill the broker for failover testing. */
+    private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
+
     /**
-     * Creates a ping producer with the specified parameters, of which there are many. See their individual comments for
-     * details. This constructor creates a connection to the broker and creates producer and consumer sessions on it, to send
-     * and recieve its pings and replies on. The other options are kept, and control how this pinger behaves.
-     *
-     * @param  brokerDetails    The URL of the broker to send pings to.
-     * @param  username         The username to log onto the broker with.
-     * @param  password         The password to log onto the broker with.
-     * @param  virtualpath      The virtual host name to use on the broker.
-     * @param  destinationName  The name (or root where multiple destinations are used) of the desitination to send pings to.
-     * @param  selector         The selector to filter replies with.
-     * @param  transacted       Indicates whether or not pings are sent and received in transactions.
-     * @param  persistent       Indicates whether pings are sent using peristent delivery.
-     * @param  messageSize      Specifies the size of ping messages to send.
-     * @param  verbose          Indicates that information should be printed to the console on every ping.
-     * @param  afterCommit      Indicates that the user should be promted to terminate a broker after commits to test
-     *                          failover.
-     * @param  beforeCommit     Indicates that the user should be promted to terminate a broker before commits to test
-     *                          failover.
-     * @param  afterSend        Indicates that the user should be promted to terminate a broker after sends to test failover.
-     * @param  beforeSend       Indicates that the user should be promted to terminate a broker before sends to test
-     *                          failover.
-     * @param  failOnce         Indicates that the failover testing behaviour should only happen on the first commit, not
-     *                          all.
-     * @param  txBatchSize      Specifies the number of pings to send in each transaction.
-     * @param  noOfDestinations The number of destinations to ping. Must be 1 or more.
-     * @param  rate             Specified the number of pings per second to send. Setting this to 0 means send as fast as
-     *                          possible, with no rate restriction.
-     * @param  pubsub           True to ping topics, false to ping queues.
-     * @param  unique           True to use unique destinations for each ping pong producer, false to share.
+     * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
+     * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
+     * it, to send and recieve its pings and replies on.
+     *
+     * @param  overrides Properties containing any desired overrides to the defaults.
      *
      * @throws Exception Any exceptions are allowed to fall through.
      */
-    public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
-                            String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
-                            boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
-                            boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
-                            boolean pubsub, boolean unique, int ackMode, long pause) throws Exception
-    {
-        _logger.debug("public PingPongProducer(String brokerDetails = " + brokerDetails + ", String username = " + username
-                      + ", String password = " + password + ", String virtualpath = " + virtualpath
-                      + ", String destinationName = " + destinationName + ", String selector = " + selector
-                      + ", boolean transacted = " + transacted + ", boolean persistent = " + persistent
-                      + ", int messageSize = " + messageSize + ", boolean verbose = " + verbose + ", boolean afterCommit = "
-                      + afterCommit + ", boolean beforeCommit = " + beforeCommit + ", boolean afterSend = " + afterSend
-                      + ", boolean beforeSend = " + beforeSend + ", boolean failOnce = " + failOnce + ", int txBatchSize = "
-                      + txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate
-                      + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + ", ackMode = " + ackMode
-                      + "): called");
-
-        // Keep all the relevant options.
-        _persistent = persistent;
-        _messageSize = messageSize;
-        _verbose = verbose;
-        _failAfterCommit = afterCommit;
-        _failBeforeCommit = beforeCommit;
-        _failAfterSend = afterSend;
-        _failBeforeSend = beforeSend;
-        _failOnce = failOnce;
-        _txBatchSize = txBatchSize;
-        _isPubSub = pubsub;
-        _isUnique = unique;
-        _pauseBatch = pause;
+    public PingPongProducer(Properties overrides) throws Exception
+    {
+        log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
 
-        if (ackMode != 0)
-        {
-            _ackMode = ackMode;
-        }
+        // Create a set of parsed properties from the defaults overriden by the passed in values.
+        ParsedProperties properties = new ParsedProperties(defaults);
+        properties.putAll(overrides);
+
+        // Extract the configuration properties to set the pinger up with.
+        _brokerDetails = properties.getProperty(BROKER_PROPNAME);
+        _username = properties.getProperty(USERNAME_PROPNAME);
+        _password = properties.getProperty(PASSWORD_PROPNAME);
+        _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME);
+        _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
+        _selector = properties.getProperty(SELECTOR_PROPNAME);
+        _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+        _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
+        _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
+        _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
+        _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME);
+        _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME);
+        _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME);
+        _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME);
+        _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME);
+        _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME);
+        _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME);
+        _rate = properties.getPropertyAsInteger(RATE_PROPNAME);
+        _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
+        _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
+        _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+        _pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME);
 
         // Check that one or more destinations were specified.
-        if (noOfDestinations < 1)
+        if (_noOfDestinations < 1)
         {
             throw new IllegalArgumentException("There must be at least one destination.");
         }
 
-        // Create a connection to the broker.
+        // Set up a throttle to control the send rate, if a rate > 0 is specified.
+        if (_rate > 0)
+        {
+            _rateLimiter = new BatchedThrottle();
+            _rateLimiter.setRate(_rate);
+        }
+
+        // Create the connection and message producers/consumers.
+        // establishConnection(true, true);
+    }
+
+    /**
+     * Establishes a connection to the broker and creates message consumers and producers based on the parameters
+     * that this ping client was created with.
+     *
+     * @param producer Flag to indicate whether or not the producer should be set up.
+     * @param consumer Flag to indicate whether or not the consumers should be set up.
+     *
+     * @throws Exception Any exceptions are allowed to fall through.
+     */
+    public void establishConnection(boolean producer, boolean consumer) throws Exception
+    {
+        log.debug("public void establishConnection(): called");
+
+        // Generate a unique identifying name for this client, based on it ip address and the current time.
         InetAddress address = InetAddress.getLocalHost();
         String clientID = address.getHostName() + System.currentTimeMillis();
 
-        _connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath);
+        // Create a connection to the broker.
+        createConnection(clientID);
 
         // Create transactional or non-transactional sessions, based on the command line arguments.
-        _producerSession = (Session) getConnection().createSession(transacted, _ackMode);
-        _consumerSession = (Session) getConnection().createSession(transacted, _ackMode);
+        _producerSession = (Session) getConnection().createSession(_transacted, _ackMode);
+        _consumerSession = (Session) getConnection().createSession(_transacted, _ackMode);
 
-        // Set up a throttle to control the send rate, if a rate > 0 is specified.
-        if (rate > 0)
+        // Create the destinations to send pings to and receive replies from.
+        _replyDestination = _consumerSession.createTemporaryQueue();
+        createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique);
+
+        // Create the message producer only if instructed to.
+        if (producer)
         {
-            _rateLimiter = new BatchedThrottle();
-            _rateLimiter.setRate(rate);
+            createProducer();
         }
 
-        // Create the temporary queue for replies.
-        _replyDestination = _consumerSession.createTemporaryQueue();
+        // Create the message consumer only if instructed to.
+        if (consumer)
+        {
+            createReplyConsumers(getReplyDestinations(), _selector);
+        }
+    }
 
-        // Create the producer and the consumers for all reply destinations.
-        createProducer();
-        createPingDestinations(noOfDestinations, selector, destinationName, unique);
-        createReplyConsumers(getReplyDestinations(), selector);
+    /**
+     * Establishes a connection to the broker, based on the configuration parameters that this ping client was
+     * created with.
+     *
+     * @param clientID The clients identifier.
+     *
+     * @throws AMQException Any underlying exceptions are allowed to fall through.
+     * @throws URLSyntaxException Any underlying exceptions are allowed to fall through.
+     */
+    protected void createConnection(String clientID) throws AMQException, URLSyntaxException
+    {
+        _connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath);
     }
 
     /**
-     * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs to be
-     * started to bounce the pings back again.
+     * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
+     * to be started to bounce the pings back again.
      *
      * @param  args The command line arguments.
-     *
-     * @throws Exception When something went wrong with the test
      */
-    public static void main(String[] args) throws Exception
+    public static void main(String[] args)
     {
-        // Extract the command line.
-        Config config = new Config();
-        config.setOptions(args);
-        if (args.length == 0)
+        try
         {
-            _logger.info("Running test with default values...");
-            // usage();
-            // System.exit(0);
-        }
+            Properties options = processCommandLine(args);
+
+            // Create a ping producer overriding its defaults with all options passed on the command line.
+            PingPongProducer pingProducer = new PingPongProducer(options);
+            pingProducer.establishConnection(true, true);
+
+            // Start the ping producers dispatch thread running.
+            pingProducer.getConnection().start();
 
-        String brokerDetails = config.getHost() + ":" + config.getPort();
-        String virtualpath = VIRTUAL_HOST_DEFAULT;
-        String selector = (config.getSelector() == null) ? SELECTOR_DEFAULT : config.getSelector();
-        boolean verbose = true;
-        boolean transacted = config.isTransacted();
-        boolean persistent = config.usePersistentMessages();
-        int messageSize = (config.getPayload() != 0) ? config.getPayload() : MESSAGE_SIZE_DEAFULT;
-        // int messageCount = config.getMessages();
-        int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DESTINATION_COUNT_DEFAULT;
-        int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : TX_BATCH_SIZE_DEFAULT;
-        int rate = (config.getRate() != 0) ? config.getRate() : RATE_DEFAULT;
-        boolean pubsub = config.isPubSub();
+            // Create a shutdown hook to terminate the ping-pong producer.
+            Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
 
-        String destName = config.getDestination();
-        if (destName == null)
+            // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+            pingProducer.getConnection().setExceptionListener(pingProducer);
+
+            // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
+            Thread pingThread = new Thread(pingProducer);
+            pingThread.run();
+            pingThread.join();
+        }
+        catch (Exception e)
         {
-            destName = PING_QUEUE_NAME_DEFAULT;
+            System.err.println(e.getMessage());
+            log.error("Top level handler caught execption.", e);
+            System.exit(1);
         }
+    }
+
+    /**
+     * Extracts all name=value pairs from the command line, sets them all as system properties and also returns
+     * a map of properties containing them.
+     *
+     * @param args The command line.
+     *
+     * @return A set of properties containing all name=value pairs from the command line.
+     *
+     * @todo This is a commonly used piece of code. Make it accept a command line definition and move it into the
+     *       CommandLineParser class.
+     */
+    protected static Properties processCommandLine(String[] args)
+    {
+        // Use the command line parser to evaluate the command line.
+        CommandLineParser commandLine = new CommandLineParser(new String[][] {});
 
-        boolean afterCommit = false;
-        boolean beforeCommit = false;
-        boolean afterSend = false;
-        boolean beforeSend = false;
-        boolean failOnce = false;
+        // Capture the command line arguments or display errors and correct usage and then exit.
+        Properties options = null;
 
-        for (String arg : args)
+        try
         {
-            if (arg.startsWith("failover:"))
-            {
-                // failover:<before|after>:<send:commit> | failover:once
-                String[] parts = arg.split(":");
-                if (parts.length == 3)
-                {
-                    if (parts[2].equals("commit"))
-                    {
-                        afterCommit = parts[1].equals("after");
-                        beforeCommit = parts[1].equals("before");
-                    }
-
-                    if (parts[2].equals("send"))
-                    {
-                        afterSend = parts[1].equals("after");
-                        beforeSend = parts[1].equals("before");
-                    }
+            options = commandLine.parseCommandLine(args);
 
-                    if (parts[1].equals("once"))
-                    {
-                        failOnce = true;
-                    }
-                }
-                else
-                {
-                    System.out.println("Unrecognized failover request:" + arg);
-                }
-            }
+            // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
+            // overridden values from there.
+            commandLine.addCommandLineToSysProperties();
+        }
+        catch (IllegalArgumentException e)
+        {
+            System.out.println(commandLine.getErrors());
+            System.out.println(commandLine.getUsage());
+            System.exit(1);
         }
 
-        // Create a ping producer to handle the request/wait/reply cycle.
-        PingPongProducer pingProducer =
-            new PingPongProducer(brokerDetails, USERNAME_DEFAULT, PASSWORD_DEFAULT, virtualpath, destName, selector,
-                                 transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
-                                 beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0, 0);
-
-        pingProducer.getConnection().start();
-
-        // Create a shutdown hook to terminate the ping-pong producer.
-        Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
-
-        // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
-        pingProducer.getConnection().setExceptionListener(pingProducer);
-
-        // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
-        Thread pingThread = new Thread(pingProducer);
-        pingThread.run();
-        pingThread.join();
+        return options;
     }
 
     /**
@@ -582,9 +627,7 @@
                 Thread.sleep(sleepTime);
             }
             catch (InterruptedException ie)
-            {
-                // ignore
-            }
+            { }
         }
     }
 
@@ -596,27 +639,30 @@
      */
     public List<Destination> getReplyDestinations()
     {
-        _logger.debug("public List<Destination> getReplyDestinations(): called");
+        log.debug("public List<Destination> getReplyDestinations(): called");
 
         List<Destination> replyDestinations = new ArrayList<Destination>();
         replyDestinations.add(_replyDestination);
 
+        log.debug("replyDestinations = " + replyDestinations);
+
         return replyDestinations;
     }
 
     /**
-     * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery flag
-     * is set accoring the ping producer creation options.
+     * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery
+     * flag is set accoring the ping producer creation options.
      *
      * @throws JMSException Any JMSExceptions are allowed to fall through.
      */
     public void createProducer() throws JMSException
     {
-        _logger.debug("public void createProducer(): called");
+        log.debug("public void createProducer(): called");
 
         _producer = (MessageProducer) _producerSession.createProducer(null);
-        // _producer.setDisableMessageTimestamp(true);
         _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+        log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
     }
 
     /**
@@ -632,13 +678,16 @@
      * @throws JMSException Any JMSExceptions are allowed to fall through.
      */
     public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
-                                throws JMSException
+        throws JMSException
     {
-        _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations
-                      + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = "
-                      + unique + "): called");
+        log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
+            + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + "): called");
+
+        _pingDestinations = new ArrayList<Destination>();
 
         // Create the desired number of ping destinations and consumers for them.
+        log.debug("Creating " + noOfDestinations + " destinations to ping.");
+
         for (int i = 0; i < noOfDestinations; i++)
         {
             AMQDestination destination;
@@ -648,26 +697,26 @@
             // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
             if (unique)
             {
-                _logger.debug("Creating unique destinations.");
+                log.debug("Creating unique destinations.");
                 id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
             }
             else
             {
-                _logger.debug("Creating shared destinations.");
-                id = "_" + _queueSharedId.incrementAndGet();
+                log.debug("Creating shared destinations.");
+                id = "_" + _queueSharedID.incrementAndGet();
             }
 
             // Check if this is a pub/sub pinger, in which case create topics.
             if (_isPubSub)
             {
                 destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
-                _logger.debug("Created topic " + destination);
+                log.debug("Created topic " + destination);
             }
             // Otherwise this is a p2p pinger, in which case create queues.
             else
             {
                 destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
-                _logger.debug("Created queue " + destination);
+                log.debug("Created queue " + destination);
             }
 
             // Keep the destination.
@@ -676,6 +725,29 @@
     }
 
     /**
+     * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
+     *
+     * @param  destinations The destinations to listen to.
+     * @param  selector     A selector to filter the messages with.
+     *
+     * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
+     */
+    public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
+    {
+        log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+            + ", String selector = " + selector + "): called");
+
+        for (Destination destination : destinations)
+        {
+            // Create a consumer for the destination and set this pinger to listen to its messages.
+            _consumer =
+                _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
+                    selector);
+            _consumer.setMessageListener(this);
+        }
+    }
+
+    /**
      * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a correlating
      * reply may be waiting on. This is only done if the reply has a correlation id that is expected in the replies map.
      *
@@ -683,13 +755,13 @@
      */
     public void onMessage(Message message)
     {
-        _logger.debug("public void onMessage(Message message): called");
+        log.debug("public void onMessage(Message message): called");
 
         try
         {
             // Extract the messages correlation id.
             String correlationID = message.getJMSCorrelationID();
-            _logger.debug("correlationID = " + correlationID);
+            log.debug("correlationID = " + correlationID);
 
             // Countdown on the traffic light if there is one for the matching correlation id.
             PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
@@ -701,7 +773,7 @@
                 // Restart the timeout timer on every message.
                 perCorrelationId.timeOutStart = System.nanoTime();
 
-                _logger.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+                log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
 
                 // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
                 // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
@@ -716,20 +788,14 @@
                     trueCount = trafficLight.getCount();
                     remainingCount = trueCount - 1;
 
-                    _logger.debug("remainingCount = " + remainingCount);
-                    _logger.debug("trueCount = " + trueCount);
+                    log.debug("remainingCount = " + remainingCount);
+                    log.debug("trueCount = " + trueCount);
 
                     // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
                     // blocked, even on the last message.
                     if ((remainingCount % _txBatchSize) == 0)
                     {
                         commitTx(_consumerSession);
-                        if (!_consumerSession.getTransacted() &&
-                            _consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
-                        {
-                            // Acknowledge the messages when the session is not transacted but client_ack
-                            ((AMQSession) _consumerSession).acknowledge();
-                        }
                     }
 
                     // Forward the message and remaining count to any interested chained message listener.
@@ -748,7 +814,7 @@
             }
             else
             {
-                _logger.warn("Got unexpected message with correlationId: " + correlationID);
+                log.warn("Got unexpected message with correlationId: " + correlationID);
             }
 
             // Print out ping times for every message in verbose mode only.
@@ -759,48 +825,16 @@
                 if (timestamp != null)
                 {
                     long diff = System.nanoTime() - timestamp;
-                    _logger.trace("Time for round trip (nanos): " + diff);
+                    log.trace("Time for round trip (nanos): " + diff);
                 }
             }
         }
         catch (JMSException e)
         {
-            _logger.warn("There was a JMSException: " + e.getMessage(), e);
+            log.warn("There was a JMSException: " + e.getMessage(), e);
         }
 
-        _logger.debug("public void onMessage(Message message): ending");
-    }
-
-    /**
-     * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out before a
-     * reply arrives, then a null reply is returned from this method. This method generates a new unqiue correlation id for
-     * the messages.
-     *
-     * @param  message  The message to send.
-     * @param  numPings The number of ping messages to send.
-     * @param  timeout  The timeout in milliseconds.
-     *
-     * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for
-     *         all prematurely.
-     *
-     * @throws JMSException         All underlying JMSExceptions are allowed to fall through.
-     * @throws InterruptedException When interrupted by a timeout.
-     */
-    public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
-    {
-        _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
-                      + timeout + "): called");
-
-        // Create a unique correlation id to put on the messages before sending them.
-        String messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
-
-        return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
-    }
-
-    public int pingAndWaitForReply(int numPings, long timeout, String messageCorrelationId)
-                            throws JMSException, InterruptedException
-    {
-        return pingAndWaitForReply(null, numPings, timeout, messageCorrelationId);
+        log.debug("public void onMessage(Message message): ending");
     }
 
     /**
@@ -808,10 +842,10 @@
      * reply arrives, then a null reply is returned from this method. This method allows the caller to specify the
      * correlation id.
      *
-     * @param  message              The message to send.
+     * @param  message              The message to send. If this is null, one is generated.
      * @param  numPings             The number of ping messages to send.
      * @param  timeout              The timeout in milliseconds.
-     * @param  messageCorrelationId The message correlation id.
+     * @param  messageCorrelationId The message correlation id. If this is null, one is generated.
      *
      * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for
      *         all prematurely.
@@ -820,10 +854,16 @@
      * @throws InterruptedException When interrupted by a timeout
      */
     public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
-                            throws JMSException, InterruptedException
+        throws JMSException, InterruptedException
     {
-        _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
-                      + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+        log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+            + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+
+        // Generate a unique correlation id to put on the messages before sending them, if one was not specified.
+        if (messageCorrelationId == null)
+        {
+            messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
+        }
 
         try
         {
@@ -858,31 +898,31 @@
 
                 allMessagesReceived = numReplies == getExpectedNumPings(numPings);
 
-                _logger.debug("numReplies = " + numReplies);
-                _logger.debug("allMessagesReceived = " + allMessagesReceived);
+                log.debug("numReplies = " + numReplies);
+                log.debug("allMessagesReceived = " + allMessagesReceived);
 
                 // Recheck the timeout condition.
                 long now = System.nanoTime();
                 long lastMessageReceievedAt = perCorrelationId.timeOutStart;
                 timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000);
 
-                _logger.debug("now = " + now);
-                _logger.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
+                log.debug("now = " + now);
+                log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
             }
             while (!timedOut && !allMessagesReceived);
 
             if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
             {
-                _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+                log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
             }
             else if (_verbose)
             {
-                _logger.info("Got all replies on id, " + messageCorrelationId);
+                log.info("Got all replies on id, " + messageCorrelationId);
             }
 
             commitTx(_consumerSession);
 
-            _logger.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
+            log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
 
             return numReplies;
         }
@@ -905,8 +945,8 @@
      */
     public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
     {
-        _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
-                      + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+        log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+            + ", String messageCorrelationId = " + messageCorrelationId + "): called");
 
         if (message == null)
         {
@@ -923,40 +963,19 @@
         // Send all of the ping messages.
         for (int i = 0; i < numPings; i++)
         {
-            // Reset the committed flag to indicate that there are uncommitted messages.
+            // Reset the committed flag to indicate that there may be uncommitted messages.
             committed = false;
 
             // Re-timestamp the message.
             message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
 
-            // Round robin the destinations as the messages are sent.
-            // return _destinationCount;
-            sendMessage(_pingDestinations.get(i % _pingDestinations.size()), message);
-
-            // Apply message rate throttling if a rate limit has been set up.
-            if (_rateLimiter != null)
-            {
-                _rateLimiter.throttle();
-            }
-
-            // Call commit every time the commit batch size is reached.
-            if ((i % _txBatchSize) == 0)
-            {
-                commitTx(_producerSession);
-                committed = true;
-
-                /* This pause is required for some cases. eg in load testing when sessions are non-transacted the
-                   Mina IO layer can't clear the cache in time. So this pause gives enough time for mina to clear
-                   the cache (without this mina throws OutOfMemoryError). pause() will check if time is != 0
-                 */
-                pause(_pauseBatch);
-            }
+            // Send the message, passing in the message count.
+            committed = sendMessage(i, message);
 
             // Spew out per message timings on every message sonly in verbose mode.
             if (_verbose)
             {
-                _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, "
-                             + messageCorrelationId);
+                log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
             }
         }
 
@@ -968,7 +987,70 @@
     }
 
     /**
-     * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
+     * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of
+     * messages sent so far must be specified and is used to round robin the ping destinations (where there are more
+     * than one), and to determine if the transaction batch size has been reached and the sent messages should be
+     * committed.
+     *
+     * @param i       The count of messages sent so far in a loop of multiple calls to this send method.
+     * @param message The message to send.
+     *
+     * @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise.
+     *
+     * @throws JMSException All underlyiung JMSExceptions are allowed to fall through.
+     */
+    protected boolean sendMessage(int i, Message message) throws JMSException
+    {
+        // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
+        // log.debug("_txBatchSize = " + _txBatchSize);
+
+        // Round robin the destinations as the messages are sent.
+        Destination destination = _pingDestinations.get(i % _pingDestinations.size());
+
+        // Prompt the user to kill the broker when doing failover testing.
+        if (_failBeforeSend)
+        {
+            if (_failOnce)
+            {
+                _failBeforeSend = false;
+            }
+
+            log.trace("Failing Before Send");
+            waitForUser(KILL_BROKER_PROMPT);
+        }
+
+        // Send the message either to its round robin destination, or its default destination.
+        if (destination == null)
+        {
+            _producer.send(message);
+        }
+        else
+        {
+            _producer.send(destination, message);
+        }
+
+        // Apply message rate throttling if a rate limit has been set up.
+        if (_rateLimiter != null)
+        {
+            _rateLimiter.throttle();
+        }
+
+        // Call commit every time the commit batch size is reached.
+        boolean committed = false;
+
+        // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
+        if (((i + 1) % _txBatchSize) == 0)
+        {
+            committed = commitTx(_producerSession);
+        }
+
+        return committed;
+    }
+
+    /**
+     * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction
+     * batch size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared,
+     * which will terminate the pinger.
      */
     public void pingLoop()
     {
@@ -979,25 +1061,20 @@
             msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
 
             // Send the message and wait for a reply.
-            pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT);
+            pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
         }
         catch (JMSException e)
         {
             _publish = false;
-            _logger.debug("There was a JMSException: " + e.getMessage(), e);
+            log.debug("There was a JMSException: " + e.getMessage(), e);
         }
         catch (InterruptedException e)
         {
             _publish = false;
-            _logger.debug("There was an interruption: " + e.getMessage(), e);
+            log.debug("There was an interruption: " + e.getMessage(), e);
         }
     }
 
-    public Destination getReplyDestination()
-    {
-        return getReplyDestinations().get(0);
-    }
-
     /**
      * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set here.
      *
@@ -1038,8 +1115,8 @@
     }
 
     /**
-     * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag has
-     * been cleared.
+     * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
+     * flag has been cleared.
      */
     public void stop()
     {
@@ -1066,8 +1143,8 @@
      */
     public void onException(JMSException e)
     {
+        log.debug("public void onException(JMSException e = " + e + "): called", e);
         _publish = false;
-        _logger.debug("There was a JMSException: " + e.getMessage(), e);
     }
 
     /**
@@ -1079,12 +1156,12 @@
     public Thread getShutdownHook()
     {
         return new Thread(new Runnable()
-            {
-                public void run()
                 {
-                    stop();
-                }
-            });
+                    public void run()
+                    {
+                        stop();
+                    }
+                });
     }
 
     /**
@@ -1098,40 +1175,30 @@
     }
 
     /**
-     * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
-     *
-     * @param  destinations The destinations to listen to.
-     * @param  selector     A selector to filter the messages with.
-     *
-     * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
-     */
-    public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
-    {
-        _logger.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
-                      + ", String selector = " + selector + "): called");
-
-        for (Destination destination : destinations)
-        {
-            // Create a consumer for the destination and set this pinger to listen to its messages.
-            MessageConsumer consumer =
-                _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
-                                                selector);
-            consumer.setMessageListener(this);
-        }
-    }
-
-    /**
      * Closes the pingers connection.
      *
      * @throws JMSException All JMSException are allowed to fall through.
      */
     public void close() throws JMSException
     {
-        _logger.debug("public void close(): called");
+        log.debug("public void close(): called");
 
-        if (_connection != null)
+        try
+        {
+            if (_connection != null)
+            {
+                _connection.close();
+            }
+        }
+        finally
         {
-            _connection.close();
+            _connection = null;
+            _producerSession = null;
+            _consumerSession = null;
+            _producer = null;
+            _consumer = null;
+            _pingDestinations = null;
+            _replyDestination = null;
         }
     }
 
@@ -1150,25 +1217,29 @@
      *
      * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
      *
+     * @return <tt>true</tt> if the session was committed, <tt>false</tt> if it was not.
+     *
      * @todo   Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
      *         method, because commits only apply to transactional pingers, but fail after send applied to transactional and
      *         non-transactional alike.
      */
-    protected void commitTx(Session session) throws JMSException
+    protected boolean commitTx(Session session) throws JMSException
     {
-        _logger.debug("protected void commitTx(Session session): called");
+        log.debug("protected void commitTx(Session session): called");
+
+        boolean committed = false;
 
-        _logger.trace("Batch time reached");
+        log.trace("Batch time reached");
         if (_failAfterSend)
         {
-            _logger.trace("Batch size reached");
+            log.trace("Batch size reached");
             if (_failOnce)
             {
                 _failAfterSend = false;
             }
 
-            _logger.trace("Failing After Send");
-            doFailover();
+            log.trace("Failing After Send");
+            waitForUser(KILL_BROKER_PROMPT);
         }
 
         if (session.getTransacted())
@@ -1182,13 +1253,14 @@
                         _failBeforeCommit = false;
                     }
 
-                    _logger.trace("Failing Before Commit");
-                    doFailover();
+                    log.trace("Failing Before Commit");
+                    waitForUser(KILL_BROKER_PROMPT);
                 }
 
-                long l = System.currentTimeMillis();
+                long l = System.nanoTime();
                 session.commit();
-                _logger.debug("Time taken to commit :" + (System.currentTimeMillis() - l) + " ms");
+                committed = true;
+                log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms");
 
                 if (_failAfterCommit)
                 {
@@ -1197,84 +1269,56 @@
                         _failAfterCommit = false;
                     }
 
-                    _logger.trace("Failing After Commit");
-                    doFailover();
+                    log.trace("Failing After Commit");
+                    waitForUser(KILL_BROKER_PROMPT);
                 }
 
-                _logger.trace("Session Commited.");
+                log.trace("Session Commited.");
             }
             catch (JMSException e)
             {
-                _logger.trace("JMSException on commit:" + e.getMessage(), e);
+                log.trace("JMSException on commit:" + e.getMessage(), e);
 
                 // Warn that the bounce back client is not available.
                 if (e.getLinkedException() instanceof AMQNoConsumersException)
                 {
-                    _logger.debug("No consumers on queue.");
+                    log.debug("No consumers on queue.");
                 }
 
                 try
                 {
                     session.rollback();
-                    _logger.trace("Message rolled back.");
+                    log.trace("Message rolled back.");
                 }
                 catch (JMSException jmse)
                 {
-                    _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+                    log.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
 
                     // Both commit and rollback failed. Throw the rollback exception.
                     throw jmse;
                 }
             }
         }
+
+        return committed;
     }
 
     /**
-     * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination of
-     * the ping producer. If an explicit destination is set, this overrides the default.
+     * Outputs a prompt to the console and waits for the user to press return.
      *
-     * @param  destination The destination to send to.
-     * @param  message     The message to send.
-     *
-     * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
+     * @param prompt The prompt to display on the console.
      */
-    protected void sendMessage(Destination destination, Message message) throws JMSException
+    protected void waitForUser(String prompt)
     {
-        if (_failBeforeSend)
-        {
-            if (_failOnce)
-            {
-                _failBeforeSend = false;
-            }
-
-            _logger.trace("Failing Before Send");
-            doFailover();
-        }
-
-        if (destination == null)
-        {
-            _producer.send(message);
-        }
-        else
-        {
-            _producer.send(destination, message);
-        }
-    }
+        System.out.println(prompt);
 
-    /**
-     * Prompts the user to terminate the broker, in order to test failover functionality. This method will block until the
-     * user supplied some input on the terminal.
-     */
-    protected void doFailover()
-    {
-        System.out.println("Kill Broker now then press return");
         try
         {
             System.in.read();
         }
         catch (IOException e)
         {
-            // ignore
+            // Ignored.
         }
 
         System.out.println("Continuing.");

Modified: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java?view=diff&rev=525800&r1=525799&r2=525800
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Thu Apr  5 04:47:50 2007
@@ -86,7 +86,7 @@
 
         // Sets up the test parameters with defaults.
         testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
-                                         Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT));
+            Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT));
     }
 
     /**
@@ -159,7 +159,7 @@
 
         // Send the requested number of messages, and wait until they have all been received.
         long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
-        int numReplies = pingClient.pingAndWaitForReply(numPings, timeout, perThreadSetup._correlationId);
+        int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId);
 
         // Check that all the replies were received and log a fail if they were not.
         if (numReplies < perCorrelationId._expectedCount)
@@ -247,8 +247,8 @@
                 String correlationId = message.getJMSCorrelationID();
 
                 _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount
-                              + "): called on batch boundary for message id: " + correlationId + " with thread id: "
-                              + Thread.currentThread().getId());
+                    + "): called on batch boundary for message id: " + correlationId + " with thread id: "
+                    + Thread.currentThread().getId());
 
                 // Get the details for the correlation id and check that they are not null. They can become null
                 // if a test times out.

Modified: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java?view=diff&rev=525800&r1=525799&r2=525800
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java Thu Apr  5 04:47:50 2007
@@ -175,7 +175,7 @@
 
         // Send the requested number of messages, and wait until they have all been received.
         long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
-        int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout);
+        int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
 
         // Check that all the replies were received and log a fail if they were not.
         if (numReplies < numPings)

Modified: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=525800&r1=525799&r2=525800
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Thu Apr  5 04:47:50 2007
@@ -66,19 +66,22 @@
     ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
 
     /** Holds a property reader to extract the test parameters from. */
-    protected ParsedProperties testParameters = new TestContextProperties(System.getProperties());
+    protected ParsedProperties testParameters =
+        TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
 
     public PingTestPerf(String name)
     {
         super(name);
 
+        _logger.debug("testParameters = " + testParameters);
+
         // Sets up the test parameters with defaults.
-        testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
+        /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, PingPongProducer.MESSAGE_SIZE_DEAFULT);
         testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
-                                         PingPongProducer.PING_QUEUE_NAME_DEFAULT);
+            PingPongProducer.PING_QUEUE_NAME_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
-                                         PingPongProducer.PERSISTENT_MODE_DEFAULT);
+            PingPongProducer.PERSISTENT_MODE_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, PingPongProducer.TRANSACTED_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT);
@@ -90,20 +93,20 @@
         testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, PingPongProducer.TIMEOUT_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME,
-                                         PingPongProducer.DESTINATION_COUNT_DEFAULT);
+            PingPongProducer.DESTINATION_COUNT_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
-                                         PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
+            PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
-                                         PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
+            PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
-                                         PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
+            PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
-                                         PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
+            PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, PingPongProducer.UNIQUE_DESTS_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, PingPongProducer.ACK_MODE_DEFAULT);
         testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME,
-                                         PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);
+            PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/
     }
 
     /**
@@ -139,20 +142,18 @@
         // Generate a sample message. This message is already time stamped and has its reply-to destination set.
         ObjectMessage msg =
             perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
-                                                      testParameters.getPropertyAsInteger(
-                                                          PingPongProducer.MESSAGE_SIZE_PROPNAME),
-                                                      testParameters.getPropertyAsBoolean(
-                                                          PingPongProducer.PERSISTENT_MODE_PROPNAME));
+                testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
 
         // start the test
         long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
-        int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout);
+        int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
 
         // Fail the test if the timeout was exceeded.
         if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings))
         {
             Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
-                        + numReplies);
+                + numReplies);
         }
     }
 
@@ -167,43 +168,13 @@
         {
             PerThreadSetup perThreadSetup = new PerThreadSetup();
 
-            // Extract the test set up paramaeters.
-            String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
-            String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
-            String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
-            String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME);
-            String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
-            boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
-            boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
-            String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
-            boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
-            int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
-            int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
-            boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME);
-            boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
-            boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
-            boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
-            boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
-            int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.TX_BATCH_SIZE_PROPNAME);
-            Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
-            boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_DESTS_PROPNAME);
-            int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
-            int pausetime = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
-
-            // Extract the test set up paramaeters.
-            int destinationscount =
-                Integer.parseInt(testParameters.getProperty(PingPongProducer.DESTINATION_COUNT_PROPNAME));
-
             // This is synchronized because there is a race condition, which causes one connection to sleep if
             // all threads try to create connection concurrently.
             synchronized (this)
             {
                 // Establish a client to ping a Destination and listen the reply back from same Destination
-                perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName,
-                                                            selector, transacted, persistent, messageSize, verbose,
-                                                            failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
-                                                            failOnce, batchSize, destinationscount, rate, pubsub, unique,
-                                                            ackMode, pausetime);
+                perThreadSetup._pingClient = new PingClient(testParameters);
+                perThreadSetup._pingClient.establishConnection(true, true);
             }
             // Start the client connection
             perThreadSetup._pingClient.getConnection().start();