You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/08/23 14:10:43 UTC

svn commit: r568952 [2/2] - in /incubator/qpid/trunk/qpid/java/perftests/src: main/java/org/apache/qpid/client/message/ main/java/org/apache/qpid/ping/ main/java/org/apache/qpid/requestreply/ test/

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=568952&r1=568951&r2=568952&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Thu Aug 23 05:10:42 2007
@@ -20,21 +20,8 @@
  */
 package org.apache.qpid.requestreply;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.*;
-
 import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.*;
@@ -51,6 +38,18 @@
 import uk.co.thebadgerset.junit.extensions.Throttle;
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
+import javax.jms.*;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may
  * either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens
@@ -86,10 +85,11 @@
  * <tr><td> username         <td> guest    <td> The username to access the broker with.
  * <tr><td> password         <td> guest    <td> The password to access the broker with.
  * <tr><td> selector         <td> null     <td> Not used. Defines a message selector to filter pings with.
- * <tr><td> destinationCount <td> 1        <td> The number of receivers listening to the pings.
+ * <tr><td> destinationCount <td> 1        <td> The number of destinations to send pings to.
+ * <tr><td> numConsumers     <td> 1        <td> The number of consumers on each destination.
  * <tr><td> timeout          <td> 30000    <td> In milliseconds. The timeout to stop waiting for replies.
  * <tr><td> commitBatchSize  <td> 1        <td> The number of messages per transaction in transactional mode.
- * <tr><td> uniqueDests      <td> true     <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> uniqueDests      <td> true     <td> Whether each receivers only listens to one ping destination or all.
  * <tr><td> durableDests     <td> false    <td> Whether or not durable destinations are used.
  * <tr><td> ackMode          <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
  *                                               0 - SESSION_TRANSACTED
@@ -98,6 +98,10 @@
  *                                               3 - DUPS_OK_ACKNOWLEDGE
  *                                               257 - NO_ACKNOWLEDGE
  *                                               258 - PRE_ACKNOWLEDGE
+ * <tr><td> consTransacted   <td> false    <td> Whether or not consumers use transactions. Defaults to the same value
+ *                                              as the 'transacted' option if not seperately defined.
+ * <tr><td> consAckMode      <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same
+ *                                              value as 'ackMode' if not seperately defined.
  * <tr><td> maxPending       <td> 0        <td> The maximum size in bytes, of messages sent but not yet received.
  *                                              Limits the volume of messages currently buffered on the client
  *                                              or broker. Can help scale test clients by limiting amount of buffered
@@ -131,8 +135,9 @@
  *       Instead make mina use a bounded blocking buffer, or other form of back pressure, to stop data being written
  *       faster than it can be sent.
  */
-public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
+public class PingPongProducer implements Runnable /*, MessageListener*/, ExceptionListener
 {
+    /** Used for debugging. */
     private static final Logger log = Logger.getLogger(PingPongProducer.class);
 
     /** Holds the name of the property to get the test message size from. */
@@ -159,6 +164,9 @@
     /** Holds the transactional mode to use for the test. */
     public static final boolean TRANSACTED_DEFAULT = false;
 
+    public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";
+    public static final boolean CONSUMER_TRANSACTED_DEFAULT = false;
+
     /** Holds the name of the property to get the test broker url from. */
     public static final String BROKER_PROPNAME = "broker";
 
@@ -237,12 +245,18 @@
     /** Holds the default message selector. */
     public static final String SELECTOR_DEFAULT = "";
 
-    /** Holds the name of the proeprty to get the destination count from. */
+    /** Holds the name of the property to get the destination count from. */
     public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
 
     /** Defines the default number of destinations to ping. */
     public static final int DESTINATION_COUNT_DEFAULT = 1;
 
+    /** Holds the name of the property to get the number of consumers per destination from. */
+    public static final String NUM_CONSUMERS_PROPNAME = "numConsumers";
+
+    /** Defines the default number consumers per destination. */
+    public static final int NUM_CONSUMERS_DEFAULT = 1;
+
     /** Holds the name of the property to get the waiting timeout for response messages. */
     public static final String TIMEOUT_PROPNAME = "timeout";
 
@@ -270,6 +284,9 @@
     /** Defines the default message acknowledgement mode. */
     public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
 
+    public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";
+    public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+
     public static final String MAX_PENDING_PROPNAME = "maxPending";
     public static final int MAX_PENDING_DEFAULT = 0;
 
@@ -297,8 +314,10 @@
         defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
         defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
         defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
+        defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT);
         defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
         defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+        defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT);
         defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
         defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
         defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
@@ -311,6 +330,7 @@
         defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT);
         defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
         defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
+        defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT);
         defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
         defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
         defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
@@ -323,12 +343,15 @@
     protected String _destinationName;
     protected String _selector;
     protected boolean _transacted;
+    protected boolean _consTransacted;
 
     /** Determines whether this producer sends persistent messages. */
     protected boolean _persistent;
 
     /** Holds the acknowledgement mode used for sending and receiving messages. */
-    private int _ackMode;
+    protected int _ackMode;
+
+    protected int _consAckMode;
 
     /** Determines what size of messages this producer sends. */
     protected int _messageSize;
@@ -363,7 +386,13 @@
     /** Holds the number of sends that should be performed in every transaction when using transactions. */
     protected int _txBatchSize;
 
+    /** Holds the number of destinations to ping. */
     protected int _noOfDestinations;
+
+    /** Holds the number of consumers per destination. */
+    protected int _noOfConsumers;
+
+    /** Holds the maximum send rate in herz. */
     protected int _rate;
 
     /**
@@ -373,7 +402,7 @@
     protected int _maxPendingSize;
 
     /**
-     * Holds a monitor which is used to synchronize sender and receiver threads, where the sender has elected
+     * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
      * to wait until the number of unreceived message is reduced before continuing to send.
      */
     protected Object _sendPauseMonitor = new Object();
@@ -397,10 +426,13 @@
     /** Holds the connection to the broker. */
     protected Connection _connection;
 
-    /** Holds the session on which ping replies are received. */
-    protected Session _consumerSession;
+    /** Holds the consumer connections. */
+    protected Connection[] _consumerConnection;
 
-    /** Holds the producer session, needed to create ping messages. */
+    /** Holds the controlSession on which ping replies are received. */
+    protected Session[] _consumerSession;
+
+    /** Holds the producer controlSession, needed to create ping messages. */
     protected Session _producerSession;
 
     /** Holds the destination where the response messages will arrive. */
@@ -434,18 +466,15 @@
     protected MessageProducer _producer;
 
     /** Holds the message consumer to receive the ping replies through. */
-    protected MessageConsumer _consumer;
-
-    /**
-     * Holds the number of consumers that will be attached to each topic. Each pings will result in a reply from each of the
-     * attached clients
-     */
-    static int _consumersPerTopic = 1;
+    protected MessageConsumer[] _consumer;
 
     /** The prompt to display when asking the user to kill the broker for failover testing. */
     private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
     private String _clientID;
 
+    /** Keeps count of the total messages sent purely for debugging purposes. */
+    private static AtomicInteger numSent = new AtomicInteger();
+
     /**
      * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
      * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
@@ -457,7 +486,7 @@
      */
     public PingPongProducer(Properties overrides) throws Exception
     {
-        log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
+        // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
 
         // Create a set of parsed properties from the defaults overriden by the passed in values.
         ParsedProperties properties = new ParsedProperties(defaults);
@@ -471,6 +500,7 @@
         _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
         _selector = properties.getProperty(SELECTOR_PROPNAME);
         _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+        _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME);
         _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
         _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
         _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
@@ -481,11 +511,13 @@
         _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME);
         _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME);
         _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME);
+        _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME);
         _rate = properties.getPropertyAsInteger(RATE_PROPNAME);
         _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
         _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
         _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
         _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+        _consAckMode = properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
         _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
 
         // Check that one or more destinations were specified.
@@ -516,7 +548,7 @@
      */
     public void establishConnection(boolean producer, boolean consumer) throws Exception
     {
-        log.debug("public void establishConnection(): called");
+        // log.debug("public void establishConnection(): called");
 
         // Generate a unique identifying name for this client, based on it ip address and the current time.
         InetAddress address = InetAddress.getLocalHost();
@@ -526,11 +558,17 @@
         createConnection(_clientID);
 
         // Create transactional or non-transactional sessions, based on the command line arguments.
-        _producerSession = (Session) getConnection().createSession(_transacted, _ackMode);
-        _consumerSession = (Session) getConnection().createSession(_transacted, _ackMode);
+        _producerSession = (Session) _connection.createSession(_transacted, _ackMode);
+
+        _consumerSession = new Session[_noOfConsumers];
+
+        for (int i = 0; i < _noOfConsumers; i++)
+        {
+            _consumerSession[i] = (Session) _consumerConnection[i].createSession(_consTransacted, _consAckMode);
+        }
 
         // Create the destinations to send pings to and receive replies from.
-        _replyDestination = _consumerSession.createTemporaryQueue();
+        _replyDestination = _consumerSession[0].createTemporaryQueue();
         createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
 
         // Create the message producer only if instructed to.
@@ -557,7 +595,20 @@
      */
     protected void createConnection(String clientID) throws AMQException, URLSyntaxException
     {
+        // log.debug("protected void createConnection(String clientID = " + clientID + "): called");
+
+        // log.debug("Creating a connection for the message producer.");
+
         _connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath);
+
+        // log.debug("Creating " + _noOfConsumers + " connections for the consumers.");
+
+        _consumerConnection = new Connection[_noOfConsumers];
+
+        for (int i = 0; i < _noOfConsumers; i++)
+        {
+            _consumerConnection[i] = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath);
+        }
     }
 
     /**
@@ -570,20 +621,21 @@
     {
         try
         {
-            Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
+            Properties options =
+                CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
 
             // Create a ping producer overriding its defaults with all options passed on the command line.
             PingPongProducer pingProducer = new PingPongProducer(options);
             pingProducer.establishConnection(true, true);
 
             // Start the ping producers dispatch thread running.
-            pingProducer.getConnection().start();
+            pingProducer._connection.start();
 
             // Create a shutdown hook to terminate the ping-pong producer.
             Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
 
             // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
-            pingProducer.getConnection().setExceptionListener(pingProducer);
+            pingProducer._connection.setExceptionListener(pingProducer);
 
             // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
             Thread pingThread = new Thread(pingProducer);
@@ -624,12 +676,12 @@
      */
     public List<Destination> getReplyDestinations()
     {
-        log.debug("public List<Destination> getReplyDestinations(): called");
+        // log.debug("public List<Destination> getReplyDestinations(): called");
 
         List<Destination> replyDestinations = new ArrayList<Destination>();
         replyDestinations.add(_replyDestination);
 
-        log.debug("replyDestinations = " + replyDestinations);
+        // log.debug("replyDestinations = " + replyDestinations);
 
         return replyDestinations;
     }
@@ -642,12 +694,12 @@
      */
     public void createProducer() throws JMSException
     {
-        log.debug("public void createProducer(): called");
+        // log.debug("public void createProducer(): called");
 
         _producer = (MessageProducer) _producerSession.createProducer(null);
         _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
 
-        log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
+        // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
     }
 
     /**
@@ -665,14 +717,14 @@
     public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
         boolean durable) throws JMSException, AMQException
     {
-        log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
+        /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
             + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
-            + durable + "): called");
+            + durable + "): called");*/
 
         _pingDestinations = new ArrayList<Destination>();
 
         // Create the desired number of ping destinations and consumers for them.
-        log.debug("Creating " + noOfDestinations + " destinations to ping.");
+        // log.debug("Creating " + noOfDestinations + " destinations to ping.");
 
         for (int i = 0; i < noOfDestinations; i++)
         {
@@ -683,12 +735,12 @@
             // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
             if (unique)
             {
-                log.debug("Creating unique destinations.");
+                // log.debug("Creating unique destinations.");
                 id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
             }
             else
             {
-                log.debug("Creating shared destinations.");
+                // log.debug("Creating shared destinations.");
                 id = "_" + _queueSharedID.incrementAndGet();
             }
 
@@ -698,14 +750,14 @@
                 if (!durable)
                 {
                     destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
-                    log.debug("Created non-durable topic " + destination);
+                    // log.debug("Created non-durable topic " + destination);
                 }
                 else
                 {
                     destination =
                         AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
                             _clientID, (AMQConnection) _connection);
-                    log.debug("Created durable topic " + destination);
+                    // log.debug("Created durable topic " + destination);
                 }
             }
             // Otherwise this is a p2p pinger, in which case create queues.
@@ -719,7 +771,7 @@
                 ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null,
                     ExchangeDefaults.DIRECT_EXCHANGE_NAME);
 
-                log.debug("Created queue " + destination);
+                // log.debug("Created queue " + destination);
             }
 
             // Keep the destination.
@@ -737,20 +789,36 @@
      */
     public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
     {
-        log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
-            + ", String selector = " + selector + "): called");
+        /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+            + ", String selector = " + selector + "): called");*/
 
-        log.debug("Creating " + destinations.size() + " reply consumers.");
+        // log.debug("There are " + destinations.size() + " destinations.");
+        // log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
+        // log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers));
 
         for (Destination destination : destinations)
         {
-            // Create a consumer for the destination and set this pinger to listen to its messages.
-            _consumer =
-                _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
-                    selector);
-            _consumer.setMessageListener(this);
+            _consumer = new MessageConsumer[_noOfConsumers];
+
+            for (int i = 0; i < _noOfConsumers; i++)
+            {
+                // Create a consumer for the destination and set this pinger to listen to its messages.
+                _consumer[i] =
+                    _consumerSession[i].createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
+                        selector);
+
+                final int consumerNo = i;
+
+                _consumer[i].setMessageListener(new MessageListener()
+                    {
+                        public void onMessage(Message message)
+                        {
+                            onMessageWithConsumerNo(message, consumerNo);
+                        }
+                    });
 
-            log.debug("Set this to listen to replies sent to destination: " + destination);
+                // log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
+            }
         }
     }
 
@@ -761,97 +829,123 @@
      *
      * @param message The received message.
      */
-    public void onMessage(Message message)
+    public void onMessageWithConsumerNo(Message message, int consumerNo)
     {
-        // log.debug("public void onMessage(Message message): called");
-
+        // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called");
         try
         {
+            long now = System.nanoTime();
+            long timestamp = getTimestamp(message);
+            long pingTime = now - timestamp;
+
+            // NDC.push("cons" + consumerNo);
+
             // Extract the messages correlation id.
             String correlationID = message.getJMSCorrelationID();
             // log.debug("correlationID = " + correlationID);
 
-            // Countdown on the traffic light if there is one for the matching correlation id.
-            PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
+            int num = message.getIntProperty("MSG_NUM");
+            // log.info("Message " + num + " received.");
 
-            if (perCorrelationId != null)
+            boolean isRedelivered = message.getJMSRedelivered();
+            // log.debug("isRedelivered = " + isRedelivered);
+
+            if (!isRedelivered)
             {
-                CountDownLatch trafficLight = perCorrelationId.trafficLight;
+                // Countdown on the traffic light if there is one for the matching correlation id.
+                PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
+
+                if (perCorrelationId != null)
+                {
+                    CountDownLatch trafficLight = perCorrelationId.trafficLight;
 
-                // Restart the timeout timer on every message.
-                perCorrelationId.timeOutStart = System.nanoTime();
+                    // Restart the timeout timer on every message.
+                    perCorrelationId.timeOutStart = System.nanoTime();
 
-                // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+                    // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
 
-                // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
-                // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
-                // ensures that each thread will get a unique value for the remaining messages.
-                long trueCount = -1;
-                long remainingCount = -1;
+                    // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
+                    // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
+                    // ensures that each thread will get a unique value for the remaining messages.
+                    long trueCount = -1;
+                    long remainingCount = -1;
 
-                synchronized (trafficLight)
-                {
-                    trafficLight.countDown();
+                    synchronized (trafficLight)
+                    {
+                        trafficLight.countDown();
 
-                    trueCount = trafficLight.getCount();
-                    remainingCount = trueCount - 1;
+                        trueCount = trafficLight.getCount();
+                        remainingCount = trueCount - 1;
 
-                    // Decrement the count of sent but not yet received messages.
-                    int unreceived = _unreceived.decrementAndGet();
-                    int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
+                        // Decrement the count of sent but not yet received messages.
+                        int unreceived = _unreceived.decrementAndGet();
+                        int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
 
-                    // Release a waiting sender if there is one.
-                    synchronized (_sendPauseMonitor)
-                    {
-                        if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
-                        // && (_sendPauseBarrier.getNumberWaiting() == 1))
+                        // Release a waiting sender if there is one.
+                        synchronized (_sendPauseMonitor)
                         {
-                            log.debug("unreceived size estimate under limit = " + unreceivedSize);
-
-                            // Wait on the send pause barrier for the limit to be re-established.
-                            /*try
-                            {*/
-                            // _sendPauseBarrier.await();
-                            _sendPauseMonitor.notify();
-                            /*}
-                            catch (InterruptedException e)
+                            if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
+                            // && (_sendPauseBarrier.getNumberWaiting() == 1))
                             {
-                                throw new RuntimeException(e);
+                                // log.debug("unreceived size estimate under limit = " + unreceivedSize);
+
+                                // Wait on the send pause barrier for the limit to be re-established.
+                                /*try
+                                {*/
+                                // _sendPauseBarrier.await();
+                                _sendPauseMonitor.notify();
+                                /*}
+                                catch (InterruptedException e)
+                                {
+                                    throw new RuntimeException(e);
+                                }
+                                catch (BrokenBarrierException e)
+                                {
+                                    throw new RuntimeException(e);
+                                }*/
                             }
-                            catch (BrokenBarrierException e)
-                            {
-                                throw new RuntimeException(e);
-                            }*/
                         }
-                    }
 
-                    // log.debug("remainingCount = " + remainingCount);
-                    // log.debug("trueCount = " + trueCount);
+                        // NDC.push("/rem" + remainingCount);
 
-                    // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
-                    // blocked, even on the last message.
-                    if ((remainingCount % _txBatchSize) == 0)
-                    {
-                        commitTx(_consumerSession);
-                    }
+                        // log.debug("remainingCount = " + remainingCount);
+                        // log.debug("trueCount = " + trueCount);
 
-                    // Forward the message and remaining count to any interested chained message listener.
-                    if (_chainedMessageListener != null)
-                    {
-                        _chainedMessageListener.onMessage(message, (int) remainingCount);
-                    }
+                        // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
+                        // blocked, even on the last message.
+                        // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on
+                        // each batch boundary. For pub/sub each consumer gets every message so no division is done.
+                        long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
+                        // log.debug("commitCount = " + commitCount);
 
-                    // Check if this is the last message, in which case release any waiting producers. This is done
-                    // after the transaction has been committed and any listeners notified.
-                    if (trueCount == 1)
-                    {
-                        trafficLight.countDown();
+                        if ((commitCount % _txBatchSize) == 0)
+                        {
+                            // log.debug("Trying commit for consumer " + consumerNo + ".");
+                            commitTx(_consumerSession[consumerNo]);
+                        }
+
+                        // Forward the message and remaining count to any interested chained message listener.
+                        if (_chainedMessageListener != null)
+                        {
+                            _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime);
+                        }
+
+                        // Check if this is the last message, in which case release any waiting producers. This is done
+                        // after the transaction has been committed and any listeners notified.
+                        if (trueCount == 1)
+                        {
+                            trafficLight.countDown();
+                        }
                     }
                 }
+                else
+                {
+                    log.warn("Got unexpected message with correlationId: " + correlationID);
+                }
             }
             else
             {
-                log.warn("Got unexpected message with correlationId: " + correlationID);
+                log.warn("Got redelivered message, ignoring.");
             }
 
             // Print out ping times for every message in verbose mode only.
@@ -870,8 +964,11 @@
         {
             log.warn("There was a JMSException: " + e.getMessage(), e);
         }
-
-        // log.debug("public void onMessage(Message message): ending");
+        finally
+        {
+            // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending");
+            // NDC.clear();
+        }
     }
 
     /**
@@ -893,8 +990,8 @@
     public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
         throws JMSException, InterruptedException
     {
-        log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
-            + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+        /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+            + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
 
         // Generate a unique correlation id to put on the messages before sending them, if one was not specified.
         if (messageCorrelationId == null)
@@ -904,6 +1001,8 @@
 
         try
         {
+            // NDC.push("prod");
+
             // Create a count down latch to count the number of replies with. This is created before the messages are
             // sent so that the replies cannot be received before the count down is created.
             // One is added to this, so that the last reply becomes a special case. The special case is that the
@@ -935,16 +1034,16 @@
 
                 allMessagesReceived = numReplies == getExpectedNumPings(numPings);
 
-                log.debug("numReplies = " + numReplies);
-                log.debug("allMessagesReceived = " + allMessagesReceived);
+                // log.debug("numReplies = " + numReplies);
+                // log.debug("allMessagesReceived = " + allMessagesReceived);
 
                 // Recheck the timeout condition.
                 long now = System.nanoTime();
                 long lastMessageReceievedAt = perCorrelationId.timeOutStart;
                 timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000);
 
-                log.debug("now = " + now);
-                log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
+                // log.debug("now = " + now);
+                // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
             }
             while (!timedOut && !allMessagesReceived);
 
@@ -957,9 +1056,9 @@
                 log.info("Got all replies on id, " + messageCorrelationId);
             }
 
-            commitTx(_consumerSession);
+            // commitTx(_consumerSession);
 
-            log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
+            // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
 
             return numReplies;
         }
@@ -967,6 +1066,7 @@
         // so will be a memory leak if this is not done.
         finally
         {
+            // NDC.pop();
             perCorrelationIds.remove(messageCorrelationId);
         }
     }
@@ -982,8 +1082,8 @@
      */
     public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
     {
-        log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
-            + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+        /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+            + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
 
         if (message == null)
         {
@@ -1067,7 +1167,7 @@
 
                 if (unreceivedSize > _maxPendingSize)
                 {
-                    log.debug("unreceived size estimate over limit = " + unreceivedSize);
+                    // log.debug("unreceived size estimate over limit = " + unreceivedSize);
 
                     // Wait on the send pause barrier for the limit to be re-established.
                     try
@@ -1096,11 +1196,19 @@
         // Send the message either to its round robin destination, or its default destination.
         if (destination == null)
         {
+            int num = numSent.incrementAndGet();
+            message.setIntProperty("MSG_NUM", num);
+            setTimestamp(message);
             _producer.send(message);
+            // log.info("Message " + num + " sent.");
         }
         else
         {
+            int num = numSent.incrementAndGet();
+            message.setIntProperty("MSG_NUM", num);
+            setTimestamp(message);
             _producer.send(destination, message);
+            // log.info("Message " + num + " sent.");
         }
 
         // Increase the unreceived size, this may actually happen aftern the message is recevied.
@@ -1118,6 +1226,7 @@
         // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
         if (((i + 1) % _txBatchSize) == 0)
         {
+            // log.debug("Trying commit on producer session.");
             committed = commitTx(_producerSession);
         }
 
@@ -1135,7 +1244,7 @@
         {
             // Generate a sample message and time stamp it.
             Message msg = getTestMessage(_replyDestination, _messageSize, _persistent);
-            setTimestamp(msg);
+            // setTimestamp(msg);
 
             // Send the message and wait for a reply.
             pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
@@ -1143,12 +1252,12 @@
         catch (JMSException e)
         {
             _publish = false;
-            log.debug("There was a JMSException: " + e.getMessage(), e);
+            // log.debug("There was a JMSException: " + e.getMessage(), e);
         }
         catch (InterruptedException e)
         {
             _publish = false;
-            log.debug("There was an interruption: " + e.getMessage(), e);
+            // log.debug("There was an interruption: " + e.getMessage(), e);
         }
     }
 
@@ -1186,7 +1295,7 @@
 
         // Timestamp the message in nanoseconds.
 
-        setTimestamp(msg);
+        // setTimestamp(msg);
 
         return msg;
     }
@@ -1227,6 +1336,16 @@
         _publish = false;
     }
 
+    public void start() throws JMSException
+    {
+        _connection.start();
+
+        for (int i = 0; i < _noOfConsumers; i++)
+        {
+            _consumerConnection[i].start();
+        }
+    }
+
     /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
     public void run()
     {
@@ -1245,7 +1364,7 @@
      */
     public void onException(JMSException e)
     {
-        log.debug("public void onException(JMSException e = " + e + "): called", e);
+        // log.debug("public void onException(JMSException e = " + e + "): called", e);
         _publish = false;
     }
 
@@ -1267,30 +1386,29 @@
     }
 
     /**
-     * Gets the underlying connection that this ping client is running on.
-     *
-     * @return The underlying connection that this ping client is running on.
-     */
-    public Connection getConnection()
-    {
-        return _connection;
-    }
-
-    /**
      * Closes the pingers connection.
      *
      * @throws JMSException All JMSException are allowed to fall through.
      */
     public void close() throws JMSException
     {
-        log.debug("public void close(): called");
+        // log.debug("public void close(): called");
 
         try
         {
             if (_connection != null)
             {
                 _connection.close();
-                log.debug("Close connection.");
+                // log.debug("Close connection.");
+            }
+
+            for (int i = 0; i < _noOfConsumers; i++)
+            {
+                if (_consumerConnection[i] != null)
+                {
+                    _consumerConnection[i].close();
+                    // log.debug("Closed consumer connection.");
+                }
             }
         }
         finally
@@ -1298,6 +1416,7 @@
             _connection = null;
             _producerSession = null;
             _consumerSession = null;
+            _consumerConnection = null;
             _producer = null;
             _consumer = null;
             _pingDestinations = null;
@@ -1306,8 +1425,8 @@
     }
 
     /**
-     * Convenience method to commit the transaction on the specified session. If the session to commit on is not a
-     * transactional session, this method does nothing (unless the failover after send flag is set).
+     * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a
+     * transactional controlSession, this method does nothing (unless the failover after send flag is set).
      *
      * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is
      * applied. This flag applies whether the pinger is transactional or not.
@@ -1316,9 +1435,9 @@
      * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the
      * commit is applied. These flags will only apply if using a transactional pinger.
      *
-     * @param session The session to commit
+     * @param session The controlSession to commit
      *
-     * @return <tt>true</tt> if the session was committed, <tt>false</tt> if it was not.
+     * @return <tt>true</tt> if the controlSession was committed, <tt>false</tt> if it was not.
      *
      * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
      *
@@ -1347,6 +1466,8 @@
 
         if (session.getTransacted())
         {
+            // log.debug("Session is transacted.");
+
             try
             {
                 if (_failBeforeCommit)
@@ -1360,10 +1481,10 @@
                     waitForUser(KILL_BROKER_PROMPT);
                 }
 
-                // long l = System.nanoTime();
+                long start = System.nanoTime();
                 session.commit();
                 committed = true;
-                // log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms");
+                // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms");
 
                 if (_failAfterCommit)
                 {
@@ -1376,26 +1497,26 @@
                     waitForUser(KILL_BROKER_PROMPT);
                 }
 
-                // log.trace("Session Commited.");
+                // log.debug("Session Commited.");
             }
             catch (JMSException e)
             {
-                log.debug("JMSException on commit:" + e.getMessage(), e);
+                // log.debug("JMSException on commit:" + e.getMessage(), e);
 
                 // Warn that the bounce back client is not available.
                 if (e.getLinkedException() instanceof AMQNoConsumersException)
                 {
-                    log.debug("No consumers on queue.");
+                    // log.debug("No consumers on queue.");
                 }
 
                 try
                 {
                     session.rollback();
-                    log.debug("Message rolled back.");
+                    // log.debug("Message rolled back.");
                 }
                 catch (JMSException jmse)
                 {
-                    log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
+                    // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
 
                     // Both commit and rollback failed. Throw the rollback exception.
                     throw jmse;
@@ -1428,23 +1549,34 @@
     }
 
     /**
-     * This value will be changed by PingClient to represent the number of clients connected to each topic.
+     * Gets the number of consumers that are listening to each destination in the test.
      *
      * @return int The number of consumers subscribing to each topic.
      */
-    public int getConsumersPerTopic()
+    public int getConsumersPerDestination()
     {
-        return _consumersPerTopic;
+        return _noOfConsumers;
     }
 
+    /**
+     * Calculates how many pings are expected to be received for the given number sent.
+     *
+     * @param numpings The number of pings that will be sent.
+     *
+     * @return The number that should be received, for the test to pass.
+     */
     public int getExpectedNumPings(int numpings)
     {
-        return numpings * getConsumersPerTopic();
+        // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called");
+
+        // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers.");
+
+        return numpings * (_isPubSub ? getConsumersPerDestination() : 1);
     }
 
     /**
      * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link
-     * PingPongProducer#onMessage} method is called, the chained listener set through the {@link
+     * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link
      * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of
      * messages with that correlation id.
      *
@@ -1454,7 +1586,17 @@
      */
     public static interface ChainedMessageListener
     {
-        public void onMessage(Message message, int remainingCount) throws JMSException;
+        /**
+         * Notifies interested listeners about message arrival and important test stats, the number of messages
+         * remaining in the test, and the messages send timestamp.
+         *
+         * @param message        The newly arrived message.
+         * @param remainingCount The number of messages left to complete the test.
+         * @param latency        The nanosecond latency of the message.
+         *
+         * @throws JMSException Any JMS exceptions is allowed to fall through.
+         */
+        public void onMessage(Message message, int remainingCount, long latency) throws JMSException;
     }
 
     /**

Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java?rev=568952&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java Thu Aug 23 05:10:42 2007
@@ -0,0 +1,251 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+import javax.jms.*;
+
+/**
+ * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
+ * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from
+ * a producer to a conumer, then the consumer replies to the message on a temporary queue.
+ *
+ * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of the number
+ * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled
+ * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more
+ * information on how to do this.
+ *
+ * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
+ * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads
+ * gets its own connection/producer/consumer, this is only re-established if the connection is lost.
+ *
+ * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
+ * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come
+ * back on the temporary queue.
+ *
+ * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ */
+public class PingPongTestPerf extends AsymptoticTestCase
+{
+    private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
+
+    /** Thread local to hold the per-thread test setup fields. */
+    ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
+
+    // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
+    // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
+    // of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
+    // private Properties testParameters = System.getProperties();
+    private ParsedProperties testParameters =
+        TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
+
+    public PingPongTestPerf(String name)
+    {
+        super(name);
+
+        _logger.debug(testParameters);
+
+        // Sets up the test parameters with defaults.
+        /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
+            Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
+        testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
+            Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT));
+        testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
+            PingPongProducer.PING_QUEUE_NAME_DEFAULT);
+        testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
+            Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT));
+        testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
+            Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT));
+        testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT);
+        testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT);
+        testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT);
+        testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT);
+        testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME,
+            Boolean.toString(PingPongProducer.VERBOSE_DEFAULT));
+        testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT));
+        testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME,
+            Boolean.toString(PingPongProducer.PUBSUB_DEFAULT));
+        testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
+            Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
+        testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT));
+        testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME,
+            Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT));
+        testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
+            PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
+        testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
+            PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
+        testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
+            PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
+        testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
+            PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
+        testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT);
+        testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME,
+            Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT));
+        testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
+            Integer.toString(PingPongProducer.ACK_MODE_DEFAULT));
+        testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME,
+            PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/
+    }
+
+    /**
+     * Compile all the tests into a test suite.
+     */
+    public static Test suite()
+    {
+        // Build a new test suite
+        TestSuite suite = new TestSuite("Ping-Pong Performance Tests");
+
+        // Run performance tests in read committed mode.
+        suite.addTest(new PingPongTestPerf("testPingPongOk"));
+
+        return suite;
+    }
+
+    private static void setSystemPropertyIfNull(String propName, String propValue)
+    {
+        if (System.getProperty(propName) == null)
+        {
+            System.setProperty(propName, propValue);
+        }
+    }
+
+    public void testPingPongOk(int numPings) throws Exception
+    {
+        // Get the per thread test setup to run the test through.
+        PerThreadSetup perThreadSetup = threadSetup.get();
+
+        // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+        Message msg =
+            perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0),
+                testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+        // Send the message and wait for a reply.
+        int numReplies =
+            perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null);
+
+        // Fail the test if the timeout was exceeded.
+        if (numReplies != numPings)
+        {
+            Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings);
+        }
+    }
+
+    /**
+     * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+     */
+    public void threadSetUp()
+    {
+        try
+        {
+            PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+            // Extract the test set up paramaeters.
+            String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
+            String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
+            String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+            String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME);
+            String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
+            boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
+            boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
+            String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
+            boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
+            boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME);
+
+            synchronized (this)
+            {
+                // Establish a bounce back client on the ping queue to bounce back the pings.
+                perThreadSetup._testPingBouncer =
+                    new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent,
+                        transacted, selector, verbose, pubsub);
+
+                // Start the connections for client and producer running.
+                perThreadSetup._testPingBouncer.getConnection().start();
+
+                // Establish a ping-pong client on the ping queue to send the pings and receive replies with.
+                perThreadSetup._testPingProducer = new PingPongProducer(testParameters);
+                perThreadSetup._testPingProducer.establishConnection(true, true);
+                perThreadSetup._testPingProducer.start();
+            }
+
+            // Attach the per-thread set to the thread.
+            threadSetup.set(perThreadSetup);
+        }
+        catch (Exception e)
+        {
+            _logger.warn("There was an exception during per thread setup.", e);
+        }
+    }
+
+    /**
+     * Performs test fixture clean
+     */
+    public void threadTearDown()
+    {
+        _logger.debug("public void threadTearDown(): called");
+
+        try
+        {
+            // Get the per thread test fixture.
+            PerThreadSetup perThreadSetup = threadSetup.get();
+
+            // Close the pingers so that it cleans up its connection cleanly.
+            synchronized (this)
+            {
+                perThreadSetup._testPingProducer.close();
+                // perThreadSetup._testPingBouncer.close();
+            }
+
+            // Ensure the per thread fixture is reclaimed.
+            threadSetup.remove();
+        }
+        catch (JMSException e)
+        {
+            _logger.warn("There was an exception during per thread tear down.");
+        }
+    }
+
+    protected static class PerThreadSetup
+    {
+        /**
+         * Holds the test ping-pong producer.
+         */
+        private PingPongProducer _testPingProducer;
+
+        /**
+         * Holds the test ping client.
+         */
+        private PingPongBouncer _testPingBouncer;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
------------------------------------------------------------------------------
    svn:eol-style = native