You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by bh...@apache.org on 2007/01/24 12:05:39 UTC

svn commit: r499356 - in /incubator/qpid/trunk/qpid/java/perftests/src: main/java/org/apache/qpid/client/message/ main/java/org/apache/qpid/ping/ main/java/org/apache/qpid/requestreply/ main/java/org/apache/qpid/topic/ test/java/org/apache/qpid/ping/ t...

Author: bhupendrab
Date: Wed Jan 24 03:05:35 2007
New Revision: 499356

URL: http://svn.apache.org/viewvc?view=rev&rev=499356
Log:
updated the test classes to be used with Topics as well as Queues

Modified:
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
    incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
    incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java?view=diff&rev=499356&r1=499355&r2=499356
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java Wed Jan 24 03:05:35 2007
@@ -26,6 +26,9 @@
 import javax.jms.StreamMessage;
 import javax.jms.BytesMessage;
 import javax.jms.TextMessage;
+import javax.jms.Queue;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
 
 public class TestMessageFactory
 {
@@ -61,7 +64,39 @@
 
     public static ObjectMessage newObjectMessage(Session session, int size) throws JMSException
     {
-        return session.createObjectMessage(createMessagePayload(size));
+        if (size == 0)
+        {
+            return session.createObjectMessage();
+        }
+        else
+        {
+            return session.createObjectMessage(createMessagePayload(size));
+        }
+    }
+
+    /**
+     * Creates an ObjectMessage with given size and sets the JMS properties (JMSReplyTo and DeliveryMode)
+     * @param session
+     * @param replyDestination
+     * @param size
+     * @param persistent
+     * @return the new ObjectMessage
+     * @throws JMSException
+     */
+    public static ObjectMessage newObjectMessage(Session session, Destination replyDestination, int size, boolean persistent) throws JMSException
+    {
+        ObjectMessage msg = newObjectMessage(session, size);
+
+        // Set the messages persistent delivery flag.
+        msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+        // Ensure that the temporary reply queue is set as the reply to destination for the message.
+        if (replyDestination != null)
+        {
+            msg.setJMSReplyTo(replyDestination);
+        }
+
+        return msg;
     }
 
     public static String createMessagePayload(int size)

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java?view=diff&rev=499356&r1=499355&r2=499356
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java Wed Jan 24 03:05:35 2007
@@ -29,6 +29,8 @@
 
     private static final Logger _logger = Logger.getLogger(TestPingClient.class);
     private AMQConnection _connection;
+    /** tells if the test is being done for pubsub or p2p */
+    private boolean _isPubSub = false;
 
     protected boolean _failBeforeCommit = false;
     protected boolean _failAfterCommit = false;
@@ -41,6 +43,16 @@
     public void setConnection(AMQConnection _connection)
     {
         this._connection = _connection;
+    }
+
+    public void setPubSub(boolean pubsub)
+    {
+        _isPubSub = pubsub;
+    }
+
+    public boolean isPubSub()
+    {
+        return _isPubSub;
     }
 
     /**

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java?view=diff&rev=499356&r1=499355&r2=499356
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java Wed Jan 24 03:05:35 2007
@@ -10,16 +10,14 @@
 import javax.jms.*;
 import javax.jms.Connection;
 import javax.jms.Message;
-import javax.jms.MessageProducer;
 
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.client.AMQNoConsumersException;
 import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.client.message.TestMessageFactory;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.*;
 import org.apache.qpid.jms.Session;
 
 /**
@@ -41,6 +39,8 @@
 {
     private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class);
 
+    /** tells if the test is being done for pubsub or p2p */
+    private boolean _isPubSub = false;
     /**
      * Used to format time stamping output.
      */
@@ -65,11 +65,12 @@
     private Session _producerSession;
 
     /**
-     * Holds the number of queues the tests will be using to send messages. By default it will be 1
+     * Holds the number of destinations for multiple-destination test. By default it will be 1
      */
-    protected int _queueCount = 1;
+    protected int _destinationCount = 1;
 
-    private List<Queue> _queues = new ArrayList<Queue>();
+    /** list of all the destinations for multiple-destinations test */
+    private List<Destination> _destinations = new ArrayList<Destination>();
 
     /**
      * Holds the message producer to send the pings through.
@@ -86,6 +87,19 @@
     protected int _txBatchSize = 1;
 
     /**
+     * Sets the test for pubsub or p2p.
+     * @param value
+     */
+    public void setPubSub(boolean value)
+    {
+        _isPubSub = value;
+    }
+
+    public boolean isPubSub()
+    {
+        return _isPubSub;
+    }
+    /**
      * Convenience method for a short pause.
      *
      * @param sleepTime The time in milliseconds to pause for.
@@ -119,31 +133,11 @@
      *
      * @throws JMSException All underlying JMSException are allowed to fall through.
      */
-    public ObjectMessage getTestMessage(Queue replyQueue, int messageSize, boolean persistent) throws JMSException
+    public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
     {
-        ObjectMessage msg;
-
-        if (messageSize != 0)
-        {
-            msg = TestMessageFactory.newObjectMessage(_producerSession, messageSize);
-        }
-        else
-        {
-            msg = _producerSession.createObjectMessage();
-        }
-
-        // Set the messages persistent delivery flag.
-        msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
+        ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
         // Timestamp the message.
         msg.setLongProperty("timestamp", System.currentTimeMillis());
-
-        // Ensure that the temporary reply queue is set as the reply to destination for the message.
-        if (replyQueue != null)
-        {
-            msg.setJMSReplyTo(replyQueue);
-        }
-
         return msg;
     }
 
@@ -217,14 +211,14 @@
         this._producerSession = session;
     }
 
-    public int getQueueCount()
+    public int getDestinationsCount()
     {
-        return _queueCount;
+        return _destinationCount;
     }
 
-    public void setQueueCount(int queueCount)
+    public void setDestinationsCount(int count)
     {
-        this._queueCount = queueCount;
+        this._destinationCount = count;
     }
 
     protected void commitTx() throws JMSException
@@ -233,31 +227,57 @@
     }
 
     /**
-     * Creates queues dynamically and adds to the queues list.  This is when the test is being done with
-     * multiple queues.
-     *
-     * @param queueCount
+     * Creates destinations dynamically and adds to the destinations list for multiple-destinations test
+     * @param count
      */
-    protected void createQueues(int queueCount)
+    protected void createDestinations(int count)
+    {
+        if (isPubSub())
+        {
+            createTopics(count);
+        }
+        else
+        {
+            createQueues(count);
+        }
+    }
+
+    private void createQueues(int count)
     {
-        for (int i = 0; i < queueCount; i++)
+        for (int i = 0; i < count; i++)
         {
             AMQShortString name =
-                new AMQShortString("Queue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
+                new AMQShortString("AMQQueue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
             AMQQueue queue = new AMQQueue(name, name, false, false, false);
 
-            _queues.add(queue);
+            _destinations.add(queue);
         }
     }
 
-    protected Queue getQueue(int index)
+    private void createTopics(int count)
     {
-        return _queues.get(index);
+        for (int i = 0; i < count; i++)
+        {
+            AMQShortString name =
+                new AMQShortString("AMQTopic_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
+            AMQTopic topic = new AMQTopic(name);
+
+            _destinations.add(topic);
+        }
+    }
+
+    /**
+     * Returns the destination from the destinations list with given index. This is for multiple-destinations test
+     * @param index
+     * @return Destination with given index
+     */
+    protected Destination getDestination(int index)
+    {
+        return _destinations.get(index);
     }
 
     /**
      * Convenience method to commit the transaction on the session associated with this pinger.
-     *
      * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
      */
     protected void commitTx(Session session) throws JMSException
@@ -336,7 +356,7 @@
         sendMessage(null, message);
     }
 
-    protected void sendMessage(Queue q, Message message) throws JMSException
+    protected void sendMessage(Destination destination, Message message) throws JMSException
     {
         if (_failBeforeSend)
         {
@@ -349,13 +369,13 @@
             doFailover();
         }
 
-        if (q == null)
+        if (destination == null)
         {
             _producer.send(message);
         }
         else
         {
-            _producer.send(q, message);
+            _producer.send(destination, message);
         }
 
         commitTx();

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java?view=diff&rev=499356&r1=499355&r2=499356
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java Wed Jan 24 03:05:35 2007
@@ -38,8 +38,8 @@
     private static final Logger _logger = Logger.getLogger(TestPingItself.class);
 
     /**
-     * If queueCount is <= 1 : There will be one Queue and one consumer instance for the test
-     * If queueCount is > 1 : This creats a client for tests with multiple queues. Creates as many consumer instances
+     * If noOfDestinations is <= 1 : There will be one Queue and one consumer instance for the test
+     * If noOfDestinations is > 1 : This creats a client for tests with multiple queues. Creates as many consumer instances
      * as there are queues, each listening to a Queue. A producer is created which picks up a queue from
      * the list of queues to send message
      *
@@ -59,20 +59,21 @@
      * @param beforeSend
      * @param failOnce
      * @param batchSize
-     * @param queueCount
+     * @param noOfDestinations
      * @throws Exception
      */
     public TestPingItself(String brokerDetails, String username, String password, String virtualpath, String queueName,
                           String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
                           boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
-                          int batchSize, int queueCount, int rate) throws Exception
+                          int batchSize, int noOfDestinations, int rate, boolean pubsub) throws Exception
     {
-        super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, messageSize,
-              verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize, queueCount, rate);
+        super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent,
+              messageSize, verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+              noOfDestinations, rate, pubsub);
 
-        if (queueCount > 1)
+        if (noOfDestinations > 1)
         {
-            createQueues(queueCount);
+            createDestinations(noOfDestinations);
 
             _persistent = persistent;
             _messageSize = messageSize;
@@ -83,17 +84,16 @@
         }
     }
 
-
-    /**
+     /**
      * Sets the replyQueue to be the same as ping queue.
      */
     @Override
     public void createConsumer(String selector) throws JMSException
     {
         // Create a message consumer to get the replies with and register this to be called back by it.
-        setReplyQueue(getPingQueue());
+        setReplyDestination(getPingDestination());
         MessageConsumer consumer =
-                getConsumerSession().createConsumer(getReplyQueue(), PREFETCH, false, EXCLUSIVE, selector);
+            getConsumerSession().createConsumer(getReplyDestination(), PREFETCH, false, EXCLUSIVE, selector);
         consumer.setMessageListener(this);
     }
 
@@ -123,6 +123,7 @@
         int queueCount = (config.getQueueCount() != 0) ? config.getQueueCount() : 1;
         int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : BATCH_SIZE;
         int rate = (config.getRate() != 0) ? config.getRate() : 0;
+        boolean pubsub = config.isPubSub();
 
         String queue = "ping_" + System.currentTimeMillis();
         _logger.info("Queue:" + queue + ", Transacted:" + transacted + ", persistent:" + persistent + ",MessageSize:"
@@ -169,9 +170,9 @@
 
         // Create a ping producer to handle the request/wait/reply cycle.
         TestPingItself pingItself = new TestPingItself(brokerDetails, "guest", "guest", virtualpath, queue, null,
-                                                       transacted, persistent, messageSize, verbose,
-                                                       afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-                                                       batchSize, queueCount, rate);
+                                                    transacted, persistent, messageSize, verbose, afterCommit,
+                                                    beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+                                                    queueCount, rate, pubsub);
 
         pingItself.getConnection().start();
 
@@ -214,6 +215,4 @@
                            + "-messages   : no of messages to be sent (if 0, the ping loop will run indefinitely)");
         System.exit(0);
     }
-
-
 }

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?view=diff&rev=499356&r1=499355&r2=499356
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java Wed Jan 24 03:05:35 2007
@@ -29,6 +29,7 @@
 
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.ping.AbstractPingClient;
@@ -75,6 +76,8 @@
     /** Determines whether this bounce back client bounces back messages persistently. */
     private boolean _persistent = false;
 
+    private Destination _consumerDestination;
+
     /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
     private Destination _lastResponseDest;
 
@@ -91,24 +94,28 @@
      * Creates a PingPongBouncer on the specified producer and consumer sessions.
      *
      * @param brokerDetails The addresses of the brokers to connect to.
-     * @param username      The broker username.
-     * @param password      The broker password.
-     * @param virtualpath   The virtual host name within the broker.
-     * @param queueName     The name of the queue to receive pings on (or root of the queue name where many queues are generated).
-     * @param persistent    A flag to indicate that persistent message should be used.
-     * @param transacted    A flag to indicate that pings should be sent within transactions.
-     * @param selector      A message selector to filter received pings with.
-     * @param verbose       A flag to indicate that message timings should be sent to the console.
+     * @param username        The broker username.
+     * @param password        The broker password.
+     * @param virtualpath     The virtual host name within the broker.
+     * @param destinationName The name of the queue to receive pings on
+     *                        (or root of the queue name where many queues are generated).
+     * @param persistent      A flag to indicate that persistent message should be used.
+     * @param transacted      A flag to indicate that pings should be sent within transactions.
+     * @param selector        A message selector to filter received pings with.
+     * @param verbose         A flag to indicate that message timings should be sent to the console.
      *
      * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
      */
-    public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, String queueName,
-                           boolean persistent, boolean transacted, String selector, boolean verbose) throws Exception
+    public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath,
+                           String destinationName, boolean persistent, boolean transacted, String selector,
+                           boolean verbose, boolean pubsub) throws Exception
     {
         // Create a client id to uniquely identify this client.
         InetAddress address = InetAddress.getLocalHost();
         String clientId = address.getHostName() + System.currentTimeMillis();
-
+        _verbose = verbose;
+        _persistent = persistent;
+        setPubSub(pubsub);
         // Connect to the broker.
         setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
         _logger.info("Connected with URL:" + getConnection().toURL());
@@ -122,21 +129,30 @@
         _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
 
         // Create the queue to listen for message on.
-        Queue q = new AMQQueue(queueName);
-        MessageConsumer consumer = _consumerSession.createConsumer(q, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+        createConsumerDestination(destinationName);
+        MessageConsumer consumer = _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
 
         // Create a producer for the replies, without a default destination.
         _replyProducer = _producerSession.createProducer(null);
         _replyProducer.setDisableMessageTimestamp(true);
         _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
 
-        _verbose = verbose;
-        _persistent = persistent;
-
         // Set this up to listen for messages on the queue.
         consumer.setMessageListener(this);
     }
 
+    private void createConsumerDestination(String name)
+    {
+        if (isPubSub())
+        {
+            _consumerDestination = new AMQTopic(name);
+        }
+        else
+        {
+            _consumerDestination = new AMQQueue(name);
+        }
+    }
+
     /**
      * Starts a stand alone ping-pong client running in verbose mode.
      *
@@ -149,8 +165,9 @@
         // Display help on the command line.
         if (args.length < 5)
         {
-            System.err.println("Usage: <brokerdetails> <username> <password> <virtual-path> <serviceQueue> "
-                               + "[<P[ersistent]|N[onPersistent]> <T[ransacted]|N<onTransacted]>] [selector]");
+            System.err.println("Usage: <brokerdetails> <username> <password> <virtual-path> <serviceQueue> " +
+                               "[<P[ersistent]|N[onPersistent]> <T[ransacted]|N<onTransacted]>] " +
+                               "[selector] [pubsub(true/false)]");
             System.exit(1);
         }
 
@@ -160,14 +177,15 @@
         String password = args[2];
         String virtualpath = args[3];
         String queueName = args[4];
-        boolean persistent = ((args.length >= 6) && (args[5].toUpperCase().charAt(0) == 'P'));
-        boolean transacted = ((args.length >= 7) && (args[6].toUpperCase().charAt(0) == 'T'));
-        String selector = (args.length == 8) ? args[5] : null;
+        boolean persistent = ((args.length > 5) && (args[5].toUpperCase().charAt(0) == 'P'));
+        boolean transacted = ((args.length > 6) && (args[6].toUpperCase().charAt(0) == 'T'));
+        String selector = (args.length > 7) ? args[7] : null;
+        boolean pubsub = (args.length > 8) ? Boolean.parseBoolean(args[8]) : false;
 
         // Instantiate the ping pong client with the command line options and start it running.
         PingPongBouncer pingBouncer =
-            new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName, persistent, transacted, selector,
-                                true);
+            new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName, persistent, transacted,
+                                selector, true, pubsub);
         pingBouncer.getConnection().start();
 
         System.out.println("Waiting...");
@@ -185,7 +203,6 @@
         try
         {
             String messageCorrelationId = message.getJMSCorrelationID();
-
             if (_verbose)
             {
                 _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, "

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=499356&r1=499355&r2=499356
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Wed Jan 24 03:05:35 2007
@@ -34,6 +34,7 @@
 
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.jms.MessageProducer;
 import org.apache.qpid.jms.Session;
@@ -85,9 +86,9 @@
     protected static final long TIMEOUT = 9000;
 
     /**
-     * Holds the name of the queue to send pings on.
+     * Holds the name of the destination to send pings on.
      */
-    protected static final String PING_QUEUE_NAME = "ping";
+    protected static final String PING_DESTINATION_NAME = "ping";
 
     /**
      * The batch size.
@@ -114,14 +115,14 @@
     private static Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
 
     /**
-     * Holds the queue to send the ping replies to.
+     * Destination where the responses messages will arrive
      */
-    private Queue _replyQueue;
+    private Destination _replyDestination;
 
     /**
-     * Hold the known Queue where the producer will be sending message to
+     * Destination where the producer will be sending message to
      */
-    private Queue _pingQueue;
+    private Destination _pingDestination;
 
     /**
      * Determines whether this producer sends persistent messages from the run method.
@@ -213,20 +214,23 @@
      * @param transacted
      * @throws Exception All allowed to fall through. This is only test code...
      */
-    public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, String queueName,
-                            String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
-                            boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend,
-                            boolean failOnce, int batchSize, int queueCount, int rate) throws Exception
+    public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
+                            String destinationName, String selector, boolean transacted, boolean persistent,
+                            int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
+                            boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize,
+                            int noOfDestinations, int rate, boolean pubsub) throws Exception
     {
         this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose, afterCommit,
              beforeCommit, afterSend, beforeSend, failOnce, batchSize, rate);
 
-        _queueCount = queueCount;
-        if (queueCount <= 1)
+        _destinationCount = noOfDestinations;
+        setPubSub(pubsub);
+        
+        if (noOfDestinations <= 1)
         {
-            if (queueName != null)
+            if (destinationName != null)
             {
-                _pingQueue = new AMQQueue(queueName);
+                createPingDestination(destinationName);
                 // Create producer and the consumer
                 createProducer();
                 createConsumer(selector);
@@ -239,57 +243,76 @@
         }
     }
 
+    private void createPingDestination(String name)
+    {
+        if (isPubSub())
+        {
+            _pingDestination = new AMQTopic(name);
+        }
+        else
+        {
+            _pingDestination = new AMQQueue(name);
+        }
+    }
+
     /**
-     * Creates the producer to send the pings on.  If the tests are with nultiple queues, then producer
+     * Creates the producer to send the pings on.  If the tests are with nultiple-destinations, then producer
      * is created with null destination, so that any destination can be specified while sending
-     *
      * @throws JMSException
      */
     public void createProducer() throws JMSException
     {
-        if (getQueueCount() > 1)
+        if (getDestinationsCount() > 1)
         {
-            // create producer with initial destination as null for test with multiple queues
+            // create producer with initial destination as null for test with multiple-destinations
             // In this case, a different destination will be used while sending the message
             _producer = (MessageProducer) getProducerSession().createProducer(null);
         }
         else
         {
-            // Create a queue and producer to send the pings on.
-            _producer = (MessageProducer) getProducerSession().createProducer(_pingQueue);
+            // Create a producer with known destination to send the pings on.
+            _producer = (MessageProducer) getProducerSession().createProducer(_pingDestination);
 
         }
+
         _producer.setDisableMessageTimestamp(true);
         _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
     }
 
     /**
-     * Creates the temporary queue to listen to the responses
-     *
+     * Creates the temporary destination to listen to the responses
      * @param selector
      * @throws JMSException
      */
     public void createConsumer(String selector) throws JMSException
     {
-        // Create a temporary queue to get the pongs on.
-        _replyQueue = _consumerSession.createTemporaryQueue();
+        // Create a temporary destination to get the pongs on.
+        if (isPubSub())
+        {
+            _replyDestination = _consumerSession.createTemporaryTopic();
+        }
+        else
+        {
+            _replyDestination = _consumerSession.createTemporaryQueue();
+        }
 
         // Create a message consumer to get the replies with and register this to be called back by it.
-        MessageConsumer consumer = _consumerSession.createConsumer(_replyQueue, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+        MessageConsumer consumer = _consumerSession.createConsumer(_replyDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
         consumer.setMessageListener(this);
     }
 
     /**
-     * Creates consumer instances for each queue. This is used when test is being done with multiple queues.
-     *
+     * Creates consumer instances for each destination. This is used when test is being done with multiple destinations.
+     * 
      * @param selector
      * @throws JMSException
      */
     public void createConsumers(String selector) throws JMSException
     {
-        for (int i = 0; i < getQueueCount(); i++)
+        for (int i = 0; i < getDestinationsCount(); i++)
         {
-            MessageConsumer consumer = getConsumerSession().createConsumer(getQueue(i), PREFETCH, false, EXCLUSIVE, selector);
+            MessageConsumer consumer =
+                getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
             consumer.setMessageListener(this);
         }
     }
@@ -300,14 +323,14 @@
         return _consumerSession;
     }
 
-    public Queue getPingQueue()
+    public Destination getPingDestination()
     {
-        return _pingQueue;
+        return _pingDestination;
     }
 
-    protected void setPingQueue(Queue queue)
+    protected void setPingDestination(Destination destination)
     {
-        _pingQueue = queue;
+        _pingDestination = destination;
     }
 
     /**
@@ -329,9 +352,9 @@
         // Extract the command line.
         if (args.length < 2)
         {
-            System.err.println(
-                    "Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose (true/false)] "
-                    + "[transacted (true/false)] [persistent (true/false)] [message size in bytes] [batchsize] [rate]");
+            System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose (true/false)] " +
+                "[transacted (true/false)] [persistent (true/false)] [message size in bytes] [batchsize]" +
+                " [rate] [pubsub(true/false)]");
             System.exit(0);
         }
 
@@ -343,6 +366,7 @@
         int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
         int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1;
         int rate = (args.length >= 8) ? Integer.parseInt(args[7]) : 0;
+        boolean ispubsub = (args.length >= 9) ?  Boolean.parseBoolean(args[8]) : false;
 
         boolean afterCommit = false;
         boolean beforeCommit = false;
@@ -383,21 +407,21 @@
         }
 
         // Create a ping producer to handle the request/wait/reply cycle.
-        PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted,
-                                                             persistent, messageSize, verbose,
-                                                             afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-                                                             batchSize, 0, rate);
+        PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath,
+                                            PING_DESTINATION_NAME, null, transacted, persistent, messageSize, verbose,
+                                            afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+                                            0, rate, ispubsub);
 
         pingProducer.getConnection().start();
 
         // Run a few priming pings to remove warm up time from test results.
-        pingProducer.prime(PRIMING_LOOPS);
+        //pingProducer.prime(PRIMING_LOOPS);
         // Create a shutdown hook to terminate the ping-pong producer.
         Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
 
         // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
         pingProducer.getConnection().setExceptionListener(pingProducer);
-
+        
         // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
         Thread pingThread = new Thread(pingProducer);
         pingThread.run();
@@ -405,13 +429,6 @@
     }
 
     /**
-     * Creates consumer instances for each queue. This is used when test is being done with multiple queues.
-     *
-     * @param selector
-     * @throws JMSException
-     */
-
-    /**
      * Primes the test loop by sending a few messages, then introduces a short wait. This allows the bounce back client
      * on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling
      * this a few times, in order to prime the JVMs JIT compilation.
@@ -424,8 +441,7 @@
         for (int i = 0; i < x; i++)
         {
             // Create and send a small message.
-            Message first = getTestMessage(_replyQueue, 0, false);
-
+            Message first = getTestMessage(_replyDestination, 0, false);
             sendMessage(first);
 
             try
@@ -434,6 +450,7 @@
             }
             catch (InterruptedException ignore)
             {
+                
             }
         }
     }
@@ -524,10 +541,11 @@
             // Re-timestamp the message.
             message.setLongProperty("timestamp", System.currentTimeMillis());
 
-            // Check if the test is with multiple queues, in which case round robin the queues as the messages are sent.
-            if (getQueueCount() > 1)
+            // Check if the test is with multiple-destinations, in which case round robin the destinations
+            // as the messages are sent.
+            if (getDestinationsCount() > 1)
             {
-                sendMessage(getQueue(i % getQueueCount()), message);
+                sendMessage(getDestination(i % getDestinationsCount()), message);
             }
             else
             {
@@ -609,7 +627,7 @@
         try
         {
             // Generate a sample message and time stamp it.
-            ObjectMessage msg = getTestMessage(_replyQueue, _messageSize, _persistent);
+            ObjectMessage msg = getTestMessage(_replyDestination, _messageSize, _persistent);
             msg.setLongProperty("timestamp", System.currentTimeMillis());
 
             // Send the message and wait for a reply.
@@ -630,15 +648,14 @@
         }
     }
 
-    public Queue getReplyQueue()
+    public Destination getReplyDestination()
     {
-        return _replyQueue;
+        return _replyDestination;
     }
 
-
-    protected void setReplyQueue(Queue queue)
+    protected void setReplyDestination(Destination destination)
     {
-        _replyQueue = queue;
+        _replyDestination = destination;
     }
 
     /*
@@ -657,10 +674,10 @@
             // Re-timestamp the message.
             message.setLongProperty("timestamp", System.currentTimeMillis());
 
-            sendMessage(getQueue(queueIndex++), message);
+            sendMessage(getDestination(queueIndex++), message);
 
             // reset the counter to get the first queue
-            if (queueIndex == (getQueueCount() - 1))
+            if (queueIndex == (getDestinationsCount() - 1))
             {
                 queueIndex = 0;
             }

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java?view=diff&rev=499356&r1=499355&r2=499356
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java Wed Jan 24 03:05:35 2007
@@ -48,6 +48,7 @@
     private int noOfQueues;
     private int batchSize;
     private int rate;
+    private boolean ispubsub;
 
     public Config()
     {
@@ -178,6 +179,11 @@
         return transacted;
     }
 
+    public boolean isPubSub()
+    {
+        return ispubsub;
+    }
+
     public void setOption(String key, String value)
     {
         if("-host".equalsIgnoreCase(key))
@@ -254,6 +260,10 @@
         else if ("-rate".equalsIgnoreCase(key))
         {
             rate = parseInt("MEssage rate", value);
+        }
+        else if("-pubsub".equalsIgnoreCase(key))
+        {
+            ispubsub = "true".equalsIgnoreCase(value);
         }
         else
         {

Modified: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=499356&r1=499355&r2=499356
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Wed Jan 24 03:05:35 2007
@@ -42,17 +42,17 @@
     /**
      * Holds the name of the property to get the test message size from.
      */
-    protected static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+    private static final String MESSAGE_SIZE_PROPNAME = "messagesize";
 
     /**
      * Holds the name of the property to get the ping queue name from.
      */
-    protected static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+    private static final String PING_DESTINATION_NAME_PROPNAME = "destinationname";
 
     /**
      * holds the queue count, if the test is being performed with multiple queues
      */
-    protected static final String PING_QUEUE_COUNT_PROPNAME = "queues";
+    private static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount";
 
     /**
      * Holds the name of the property to get the test delivery mode from.
@@ -84,6 +84,8 @@
 
     protected static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
 
+    /** Holds the true or false depending on wether it is P2P test or PubSub */
+    private static final String IS_PUBSUB_PROPNAME = "pubsub";
     /**
      * Holds the size of message body to attach to the ping messages.
      */
@@ -95,7 +97,7 @@
     /**
      * Holds the name of the queue to which pings are sent.
      */
-    protected static final String PING_QUEUE_NAME_DEFAULT = "ping";
+    private static final String PING_DESTINATION_NAME_DEFAULT = "ping";
 
     /**
      * Holds the message delivery mode to use for the test.
@@ -138,6 +140,8 @@
      */
     ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
 
+    Object _lock = new Object();
+
     // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
     // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
     // of the test parameters to log with the results.
@@ -158,15 +162,16 @@
         setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT));
         setSystemPropertyIfNull(COMMIT_BATCH_SIZE, Integer.toString(COMMIT_BATCH_SIZE_DEFAULT));
         setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
-        setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+        setSystemPropertyIfNull(PING_DESTINATION_NAME_PROPNAME, PING_DESTINATION_NAME_DEFAULT);
         setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
         setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
         setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
         setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
         setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT));
-        setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME, Integer.toString(1));
+        setSystemPropertyIfNull(PING_DESTINATION_COUNT_PROPNAME, Integer.toString(1));
         setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false));
         setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT));
+        setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false));
     }
 
     /**
@@ -244,14 +249,15 @@
             String username = "guest";
             String password = "guest";
             String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
-            int queueCount = Integer.parseInt(testParameters.getProperty(PING_QUEUE_COUNT_PROPNAME));
-            String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
+            int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
+            String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
             boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
             boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
             String selector = null;
             boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
             int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
             int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
+            boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
 
             boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
             boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
@@ -263,14 +269,14 @@
 
             // This is synchronized because there is a race condition, which causes one connection to sleep if
             // all threads try to create connection concurrently
-            synchronized (this)
+            synchronized (_lock)
             {
-                // Establish a client to ping a Queue and listen the reply back from same Queue
+                // Establish a client to ping a Destination and listen the reply back from same Destination
                 perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
-                                                                      queueName, selector, transacted, persistent,
+                                                                      destinationname, selector, transacted, persistent,
                                                                       messageSize, verbose, afterCommit, beforeCommit,
-                                                                      afterSend, beforeSend, failOnce, batchSize, queueCount,
-                                                                      rate);
+                                                                      afterSend, beforeSend, failOnce, batchSize, destinationscount,
+                                                                      rate, pubsub);
             }
             // Start the client connection
             perThreadSetup._pingItselfClient.getConnection().start();

Modified: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java?view=diff&rev=499356&r1=499355&r2=499356
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java Wed Jan 24 03:05:35 2007
@@ -1,6 +1,5 @@
 package org.apache.qpid.requestreply;
 
-import java.net.InetAddress;
 import java.util.Properties;
 
 import javax.jms.*;
@@ -10,7 +9,6 @@
 import junit.framework.TestSuite;
 
 import org.apache.log4j.Logger;
-import org.apache.log4j.NDC;
 
 import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
 
@@ -47,12 +45,12 @@
     /**
      * Holds the name of the property to get the test message size from.
      */
-    private static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+    private static final String MESSAGE_SIZE_PROPNAME = "messagesize";
 
     /**
      * Holds the name of the property to get the ping queue name from.
      */
-    private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+    private static final String PING_QUEUE_NAME_PROPNAME = "destinationname";
 
     /**
      * Holds the name of the property to get the test delivery mode from.
@@ -79,6 +77,8 @@
      */
     private static final int MESSAGE_SIZE_DEFAULT = 0;
 
+    private static final int BATCH_SIZE_DEFAULT = 2;
+
     /**
      * Holds the name of the queue to which pings are sent.
      */
@@ -112,6 +112,11 @@
     /** Holds the name of the property to get the message rate from. */
     private static final String RATE_PROPNAME = "rate";
 
+    private static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
+    
+    /** Holds the true or false depending on wether it is P2P test or PubSub */
+    private static final String IS_PUBSUB_PROPNAME = "pubsub";
+
     /** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */
     private static final int RATE_DEFAULT = 0;
 
@@ -126,6 +131,7 @@
      * Thread local to hold the per-thread test setup fields.
      */
     ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
+    Object _lock = new Object();
 
     // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
     // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
@@ -138,13 +144,16 @@
         super(name);
 
         // Sets up the test parameters with defaults.
+        setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT));
         setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
         setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
         setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
         setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
         setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
         setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
+        setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false));
         setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT));
+        setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false));
     }
 
     /**
@@ -176,7 +185,7 @@
 
         // Generate a sample message. This message is already time stamped and has its reply-to destination set.
         ObjectMessage msg =
-            perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyQueue(),
+            perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestination(),
                                                             Integer.parseInt(testParameters.getProperty(
                                                                                  MESSAGE_SIZE_PROPNAME)),
                                                             Boolean.parseBoolean(testParameters.getProperty(
@@ -217,9 +226,10 @@
             boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
             boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
             String selector = null;
-            boolean verbose = false;
+            boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
             int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
             int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
+            boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
 
             boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
             boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
@@ -228,20 +238,23 @@
             int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
             Boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
 
-            // Establish a bounce back client on the ping queue to bounce back the pings.
-            perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName,
-                                                                  persistent, transacted, selector, verbose);
+            synchronized(_lock)
+            {
+                // Establish a bounce back client on the ping queue to bounce back the pings.
+                perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath,
+                                                          queueName, persistent, transacted, selector, verbose, pubsub);
 
-            // Start the connections for client and producer running.
-            perThreadSetup._testPingBouncer.getConnection().start();
+                // Start the connections for client and producer running.
+                perThreadSetup._testPingBouncer.getConnection().start();
 
-            // Establish a ping-pong client on the ping queue to send the pings with.
-            perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath,
+                // Establish a ping-pong client on the ping queue to send the pings with.
+
+                perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath,
                                                                     queueName, selector, transacted, persistent, messageSize,
                                                                     verbose, afterCommit, beforeCommit, afterSend,
-                                                                    beforeSend, failOnce, batchSize, 0, rate);
-
-            perThreadSetup._testPingProducer.getConnection().start();
+                                                                    beforeSend, failOnce, batchSize, 0, rate, pubsub);
+                perThreadSetup._testPingProducer.getConnection().start();
+            }
 
             // Attach the per-thread set to the thread.
             threadSetup.set(perThreadSetup);