You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/18 14:11:40 UTC

svn commit: r497425 - in /incubator/qpid/trunk/qpid/java: ./ etc/ perftests/ perftests/src/main/java/org/apache/qpid/ping/ perftests/src/main/java/org/apache/qpid/requestreply/ perftests/src/test/java/org/apache/qpid/ping/ perftests/src/test/java/org/a...

Author: rgreig
Date: Thu Jan 18 05:11:39 2007
New Revision: 497425

URL: http://svn.apache.org/viewvc?view=rev&rev=497425
Log:
(Patch submitted by Rupert Smith)
Restructured the ping tests, they now share common base classes to avoid cut and paste coding.

Added:
    incubator/qpid/trunk/qpid/java/etc/
    incubator/qpid/trunk/qpid/java/etc/log4j.xml
    incubator/qpid/trunk/qpid/java/perftests/jar-with-dependencies.xml
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
    incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/
    incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Removed:
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java
Modified:
    incubator/qpid/trunk/qpid/java/perftests/pom.xml
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
    incubator/qpid/trunk/qpid/java/pom.xml

Added: incubator/qpid/trunk/qpid/java/etc/log4j.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/etc/log4j.xml?view=auto&rev=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/etc/log4j.xml (added)
+++ incubator/qpid/trunk/qpid/java/etc/log4j.xml Thu Jan 18 05:11:39 2007
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<!-- ===================================================================== -->
+<!--                                                                       -->
+<!--  Log4j Configuration                                                  -->
+<!--                                                                       -->
+<!-- ===================================================================== -->
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+   <!-- ============================== -->
+   <!-- Append messages to the console -->
+   <!-- ============================== -->
+
+   <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+      <param name="Target" value="System.out"/>
+      <param name="Threshold" value="ALL"/>
+
+      <layout class="org.apache.log4j.PatternLayout">
+         <!-- The default pattern: Date Priority [Category] Message\n -->
+         <param name="ConversionPattern" value="%m%n"/>
+      </layout>
+
+   </appender>
+
+   <!-- ================ -->
+   <!-- Limit categories -->
+   <!-- ================ -->
+
+   <category name="org.apache.qpid">
+     <priority value="${amqj.logging.level}" logger="CONSOLE"/>
+   </category>
+
+   <!-- ======================= -->
+   <!-- Setup the Root category -->
+   <!-- ======================= -->
+
+   <root level="${root.logging.level}">
+      <appender-ref ref="CONSOLE"/>
+   </root>
+
+</log4j:configuration>

Added: incubator/qpid/trunk/qpid/java/perftests/jar-with-dependencies.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/jar-with-dependencies.xml?view=auto&rev=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/jar-with-dependencies.xml (added)
+++ incubator/qpid/trunk/qpid/java/perftests/jar-with-dependencies.xml Thu Jan 18 05:11:39 2007
@@ -0,0 +1,29 @@
+<!-- This is an assembly descriptor that produces a jar file that contains all the
+     dependencies, fully expanded into a single jar, required to run the tests of
+     a maven project. 
+ -->
+<assembly>
+  <id>all-test-deps</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory></outputDirectory>
+      <outputFileNameMapping></outputFileNameMapping>
+      <unpack>true</unpack>
+      <scope>test</scope>
+    </dependencySet>
+  </dependencySets>
+  <fileSets>
+    <fileSet>
+      <directory>target/classes</directory>
+      <outputDirectory></outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>target/test-classes</directory>
+      <outputDirectory></outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>

Modified: incubator/qpid/trunk/qpid/java/perftests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/pom.xml?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/perftests/pom.xml Thu Jan 18 05:11:39 2007
@@ -97,7 +97,7 @@
 
 		 ./bin/script_name or ./bin/script_name.bat
 
-		 These scripts can find everything in the 'all test dependencies' jar.
+		 These scripts can find everything in the 'all test dependencies' jar created by the assembly:assembly goal.
 		 -->	
             <!--	 
             <plugin>
@@ -146,7 +146,7 @@
             <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-assembly-plugin</artifactId>
-              <version>${assembly.version}</version>
+              <!--<version>2.2-SNAPSHOT</version>-->
               <configuration>
                 <descriptors>
                   <descriptor>jar-with-dependencies.xml</descriptor>

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java Thu Jan 18 05:11:39 2007
@@ -1,9 +1,13 @@
 package org.apache.qpid.ping;
 
+import java.text.SimpleDateFormat;
+
+import javax.jms.Connection;
 import javax.jms.JMSException;
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.jms.Session;
 
 /**
@@ -19,19 +23,20 @@
  */
 public abstract class AbstractPingClient
 {
+    /** Used to format time stamping output. */
+    protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
+
     private static final Logger _logger = Logger.getLogger(TestPingClient.class);
+    private AMQConnection _connection;
 
-    /** Used to keep a handle on the JMS session to send replies using. */
-    protected Session _session;
+    public AMQConnection getConnection()
+    {
+        return _connection;
+    }
 
-    /**
-     * Creates an abstract ping client to manage the specified transcation.
-     *
-     * @param session The session.
-     */
-    public AbstractPingClient(Session session)
+    public void setConnection(AMQConnection _connection)
     {
-        _session = session;
+        this._connection = _connection;
     }
 
     /**
@@ -39,13 +44,13 @@
      *
      * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
      */
-    protected void commitTx() throws JMSException
+    protected void commitTx(Session session) throws JMSException
     {
-        if (_session.getTransacted())
+        if (session.getTransacted())
         {
             try
             {
-                _session.commit();
+                session.commit();
                 _logger.trace("Session Commited.");
             }
             catch (JMSException e)
@@ -54,7 +59,7 @@
 
                 try
                 {
-                    _session.rollback();
+                    session.rollback();
                     _logger.debug("Message rolled back.");
                 }
                 catch (JMSException jmse)

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java Thu Jan 18 05:11:39 2007
@@ -1,5 +1,7 @@
 package org.apache.qpid.ping;
 
+import java.text.SimpleDateFormat;
+
 import javax.jms.*;
 
 import org.apache.log4j.Logger;
@@ -15,9 +17,10 @@
  *
  * <p><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Manage session.
+ * <tr><td> Manage the connection.
  * <tr><td> Provide clean shutdown on exception or shutdown hook.
  * <tr><td> Provide useable shutdown hook implementation.
+ * <tr><td> Run a ping loop.
  * </table>
  *
  * @author Rupert Smith
@@ -26,51 +29,66 @@
 {
     private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class);
 
-    /** Holds the current Qpid session to send and receive pings on. */
-    protected Session _session;
+    /** Used to format time stamping output. */
+    protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
 
     /** Used to tell the ping loop when to terminate, it only runs while this is true. */
     protected boolean _publish = true;
 
+    /** Holds the connection handle to the broker. */
+    private Connection _connection;
+
+    /** Holds the producer session, need to create test messages. */
+    private Session _producerSession;
+
     /**
-     * Creates an AbstractPingProducer on a session.
+     * Convenience method for a short pause.
+     *
+     * @param sleepTime The time in milliseconds to pause for.
      */
-    public AbstractPingProducer(Session session)
+    public static void pause(long sleepTime)
     {
-        _session = session;
+        if (sleepTime > 0)
+        {
+            try
+            {
+                Thread.sleep(sleepTime);
+            }
+            catch (InterruptedException ie)
+            { }
+        }
     }
 
+    public abstract void pingLoop();
+
     /**
      * Generates a test message of the specified size.
      *
-     * @param session     The Qpid session under which to generate the message.
      * @param replyQueue  The reply-to destination for the message.
      * @param messageSize The desired size of the message in bytes.
-     * @param currentTime The timestamp to add to the message as a "timestamp" property.
      *
      * @return A freshly generated test message.
      *
      * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
      */
-    public static ObjectMessage getTestMessage(Session session, Queue replyQueue, int messageSize, long currentTime,
-                                               boolean persistent) throws JMSException
+    public ObjectMessage getTestMessage(Queue replyQueue, int messageSize, boolean persistent) throws JMSException
     {
         ObjectMessage msg;
 
         if (messageSize != 0)
         {
-            msg = TestMessageFactory.newObjectMessage(session, messageSize);
+            msg = TestMessageFactory.newObjectMessage(_producerSession, messageSize);
         }
         else
         {
-            msg = session.createObjectMessage();
+            msg = _producerSession.createObjectMessage();
         }
 
         // Set the messages persistent delivery flag.
         msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
 
         // Timestamp the message.
-        msg.setLongProperty("timestamp", currentTime);
+        msg.setLongProperty("timestamp", System.currentTimeMillis());
 
         // Ensure that the temporary reply queue is set as the reply to destination for the message.
         if (replyQueue != null)
@@ -82,26 +100,6 @@
     }
 
     /**
-     * Convenience method for a short pause.
-     *
-     * @param sleepTime The time in milliseconds to pause for.
-     */
-    public static void pause(long sleepTime)
-    {
-        if (sleepTime > 0)
-        {
-            try
-            {
-                Thread.sleep(sleepTime);
-            }
-            catch (InterruptedException ie)
-            { }
-        }
-    }
-
-    public abstract void pingLoop();
-
-    /**
      * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
      * flag has been cleared.
      */
@@ -151,18 +149,38 @@
             });
     }
 
+    public Connection getConnection()
+    {
+        return _connection;
+    }
+
+    public void setConnection(Connection connection)
+    {
+        this._connection = connection;
+    }
+
+    public Session getProducerSession()
+    {
+        return _producerSession;
+    }
+
+    public void setProducerSession(Session session)
+    {
+        this._producerSession = session;
+    }
+
     /**
      * Convenience method to commit the transaction on the session associated with this pinger.
      *
      * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
      */
-    protected void commitTx() throws JMSException
+    protected void commitTx(Session session) throws JMSException
     {
-        if (_session.getTransacted())
+        if (session.getTransacted())
         {
             try
             {
-                _session.commit();
+                session.commit();
                 _logger.trace("Session Commited.");
             }
             catch (JMSException e)
@@ -177,7 +195,7 @@
 
                 try
                 {
-                    _session.rollback();
+                    session.rollback();
                     _logger.trace("Message rolled back.");
                 }
                 catch (JMSException jmse)

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java Thu Jan 18 05:11:39 2007
@@ -43,6 +43,8 @@
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td> Provide command line invocation to start the ping consumer on a configurable broker url.
  * </table>
+ *
+ * @todo Add a better command line interpreter to the main method. The command line is not very nice...
  */
 class TestPingClient extends AbstractPingClient implements MessageListener
 {
@@ -51,20 +53,42 @@
     /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
     private boolean _verbose = false;
 
+    /** The producer session. */
+    private Session _consumerSession;
+
     /**
-     * Creates a PingPongClient on the specified session.
+     * Creates a TestPingClient on the specified session.
+     *
+     * @param brokerDetails
+     * @param username
+     * @param password
+     * @param queueName
+     * @param virtualpath
+     * @param transacted
+     * @param selector
+     * @param verbose
      *
-     * @param session       The JMS Session for the ping pon client to run on.
-     * @param consumer      The message consumer to receive the messages with.
-     * @param verbose       If set to <tt>true</tt> will output timing information on every message.
+     * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
      */
-    public TestPingClient(Session session, MessageConsumer consumer, boolean verbose) throws JMSException
+    public TestPingClient(String brokerDetails, String username, String password, String queueName, String virtualpath,
+                          boolean transacted, String selector, boolean verbose) throws Exception
     {
-        // Hang on to the session for the replies.
-        super(session);
-
-        // Set this up to listen for messages on the queue.
+        // Create a connection to the broker.
+        InetAddress address = InetAddress.getLocalHost();
+        String clientID = address.getHostName() + System.currentTimeMillis();
+
+        setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
+
+        // Create a transactional or non-transactional session depending on the command line parameter.
+        _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+        // Connect a consumer to the ping queue and register this to be called back by it.
+        Queue q = new AMQQueue(queueName);
+        MessageConsumer consumer = _consumerSession.createConsumer(q, 1, false, false, selector);
         consumer.setMessageListener(this);
+
+        // Hang on to the verbose flag setting.
+        _verbose = verbose;
     }
 
     /**
@@ -72,57 +96,32 @@
      *
      * @param args
      */
-    public static void main(String[] args)
+    public static void main(String[] args) throws Exception
     {
         _logger.info("Starting...");
 
         // Display help on the command line.
         if (args.length < 4)
         {
-            System.out.println("Usage: brokerdetails username password virtual-path [transacted] [selector]");
+            System.out.println(
+                "Usage: brokerdetails username password virtual-path [queueName] [verbose] [transacted] [selector]");
             System.exit(1);
         }
 
-        // Extract all comman line parameters.
+        // Extract all command line parameters.
         String brokerDetails = args[0];
         String username = args[1];
         String password = args[2];
         String virtualpath = args[3];
-        boolean transacted = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
-        String selector = (args.length == 6) ? args[5] : null;
-
-        try
-        {
-            InetAddress address = InetAddress.getLocalHost();
-
-            AMQConnection con1 = new AMQConnection(brokerDetails, username, password, address.getHostName(), virtualpath);
-
-            _logger.info("Connected with URL:" + con1.toURL());
-
-            // Create a transactional or non-transactional session depending on the command line parameter.
-            Session session = null;
-
-            if (transacted)
-            {
-                session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.SESSION_TRANSACTED);
-            }
-            else if (!transacted)
-            {
-                session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            }
-
-            Queue q = new AMQQueue("ping");
-
-            MessageConsumer consumer = session.createConsumer(q, 1, false, false, selector);
-            new TestPingClient(session, consumer, true);
-
-            con1.start();
-        }
-        catch (Throwable t)
-        {
-            System.err.println("Fatal error: " + t);
-            t.printStackTrace();
-        }
+        String queueName = (args.length >= 5) ? args[4] : "ping";
+        boolean verbose = (args.length >= 6) ? Boolean.parseBoolean(args[5]) : true;
+        boolean transacted = (args.length >= 7) ? Boolean.parseBoolean(args[6]) : false;
+        String selector = (args.length == 8) ? args[7] : null;
+
+        // Create the test ping client and set it running.
+        TestPingClient pingClient =
+            new TestPingClient(brokerDetails, username, password, queueName, virtualpath, transacted, selector, verbose);
+        pingClient.getConnection().start();
 
         System.out.println("Waiting...");
     }
@@ -145,12 +144,12 @@
                 if (timestamp != null)
                 {
                     long diff = System.currentTimeMillis() - timestamp;
-                    _logger.info("Ping time: " + diff);
+                    System.out.println("Ping time: " + diff);
                 }
             }
 
             // Commit the transaction if running in transactional mode.
-            commitTx();
+            commitTx(_consumerSession);
         }
         catch (JMSException e)
         {

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java Thu Jan 18 05:11:39 2007
@@ -21,6 +21,8 @@
 package org.apache.qpid.ping;
 
 import java.net.InetAddress;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 
 import javax.jms.*;
 
@@ -57,49 +59,49 @@
     /** Used to define how long to wait between pings. */
     private static final long SLEEP_TIME = 250;
 
-    /** Used to define how long to wait before assuming that a ping has timed out. */
-    private static final long TIMEOUT = 3000;
-
     /** Holds the name of the queue to send pings on. */
     private static final String PING_QUEUE_NAME = "ping";
+
     private static TestPingProducer _pingProducer;
 
     /** Holds the message producer to send the pings through. */
     private MessageProducer _producer;
 
     /** Determines whether this producer sends persistent messages from the run method. */
-    private boolean _persistent;
+    private boolean _persistent = false;
 
     /** Holds the message size to send, from the run method. */
-    private int _messageSize;
+    private int _messageSize = DEFAULT_MESSAGE_SIZE;
 
-    public TestPingProducer(Session session, MessageProducer producer) throws JMSException
-    {
-        super(session);
-        _producer = producer;
-    }
+    /** Used to indicate that the ping loop should print out whenever it pings. */
+    private boolean _verbose = false;
 
-    public TestPingProducer(Session session, MessageProducer producer, boolean persistent, int messageSize)
-                     throws JMSException
+    public TestPingProducer(String brokerDetails, String username, String password, String virtualpath, String queueName,
+                            boolean transacted, boolean persistent, int messageSize, boolean verbose) throws Exception
     {
-        this(session, producer);
+        // Create a connection to the broker.
+        InetAddress address = InetAddress.getLocalHost();
+        String clientID = address.getHostName() + System.currentTimeMillis();
+
+        setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
+
+        // Create a transactional or non-transactional session, based on the command line arguments.
+        setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
+
+        // Create a queue to send the pings on.
+        Queue pingQueue = new AMQQueue(queueName);
+        _producer = (MessageProducer) getProducerSession().createProducer(pingQueue);
 
         _persistent = persistent;
         _messageSize = messageSize;
+
+        _verbose = verbose;
     }
 
     /**
-     * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongClient} also needs
+     * Starts a ping-pong loop running from the command line. The bounce back client {@link TestPingClient} also needs
      * to be started to bounce the pings back again.
      *
-     * <p/>The command line takes from 2 to 4 arguments:
-     * <p/><table>
-     * <tr><td>brokerDetails <td> The broker connection string.
-     * <tr><td>virtualPath   <td> The virtual path.
-     * <tr><td>transacted    <td> A boolean flag, telling this client whether or not to use transactions.
-     * <tr><td>size          <td> The size of ping messages to use, in bytes.
-     * </table>
-     *
      * @param args The command line arguments as defined above.
      */
     public static void main(String[] args) throws Exception
@@ -108,53 +110,33 @@
         if (args.length < 2)
         {
             System.err.println(
-                "Usage: TestPingPublisher <brokerDetails> <virtual path> [transacted] [persistent] [message size in bytes]");
+                "Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose] [transacted] [persistent] [message size in bytes]");
             System.exit(0);
         }
 
         String brokerDetails = args[0];
         String virtualpath = args[1];
-        boolean transacted = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : false;
-        boolean persistent = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
-        int messageSize = (args.length >= 5) ? Integer.parseInt(args[4]) : DEFAULT_MESSAGE_SIZE;
-
-        // Create a connection to the broker.
-        InetAddress address = InetAddress.getLocalHost();
-        String clientID = address.getHostName() + System.currentTimeMillis();
+        boolean verbose = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : true;
+        boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
+        boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
+        int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
+
+        // Create a ping producer to generate the pings.
+        _pingProducer = new TestPingProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, transacted,
+                                             persistent, messageSize, verbose);
 
-        Connection _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath);
-
-        // Create a transactional or non-transactional session, based on the command line arguments.
-        Session session;
-
-        if (transacted)
-        {
-            session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
-        }
-        else
-        {
-            session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        }
-
-        // Create a queue to send the pings on.
-        Queue pingQueue = new AMQQueue(PING_QUEUE_NAME);
-        MessageProducer producer = (MessageProducer) session.createProducer(pingQueue);
-
-        // Create a ping producer to handle the request/wait/reply cycle.
-        _pingProducer = new TestPingProducer(session, producer, persistent, messageSize);
-
-        // Start the message consumers running.
-        _connection.start();
+        // Start the connection running.
+        _pingProducer.getConnection().start();
 
         // Create a shutdown hook to terminate the ping-pong producer.
         Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
 
-        // Start the ping loop running, ensuring that it is registered to listen for exceptions on the connection too.
-        _connection.setExceptionListener(_pingProducer);
+        // Ensure the ping loop execption listener is registered on the connection to terminate it on error.
+        _pingProducer.getConnection().setExceptionListener(_pingProducer);
+
+        // Start the ping loop running until it is interrupted.
         Thread pingThread = new Thread(_pingProducer);
         pingThread.run();
-
-        // Run until the ping loop is terminated.
         pingThread.join();
     }
 
@@ -174,16 +156,7 @@
 
         // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
         // this method, as the message will not be sent until the transaction is committed.
-        commitTx();
-    }
-
-    /**
-     * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
-     * flag has been cleared.
-     */
-    public void stop()
-    {
-        _publish = false;
+        commitTx(getProducerSession());
     }
 
     /**
@@ -195,12 +168,16 @@
         try
         {
             // Generate a sample message and time stamp it.
-            ObjectMessage msg = getTestMessage(_session, null, _messageSize, System.currentTimeMillis(), _persistent);
+            ObjectMessage msg = getTestMessage(null, _messageSize, _persistent);
             msg.setLongProperty("timestamp", System.currentTimeMillis());
 
             // Send the message.
             ping(msg);
 
+            if (_verbose)
+            {
+                System.out.println("Pinged at: " + timestampFormatter.format(new Date())); //" + " with id: " + msg.getJMSMessageID());
+            }
             // Introduce a short pause if desired.
             pause(SLEEP_TIME);
         }

Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?view=auto&rev=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java Thu Jan 18 05:11:39 2007
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.net.InetAddress;
+import java.util.Date;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.ping.AbstractPingClient;
+
+/**
+ * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
+ * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes
+ * too.
+ *
+ * <p/>The message id from the received message is extracted, and placed into the reply as the correlation id. Messages
+ * are bounced back to the reply-to destination. The original sender of the message has the option to use either a unique
+ * temporary queue or the correlation id to correlate the original message to the reply.
+ *
+ * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
+ * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
+ * be disabled for real timing tests as writing to the console will slow things down.
+ *
+ * <p/>When the a message is received, a reply to producer is created for it if it is not the same as the previous
+ * message. All subsequent replies are sent using that producer until a different reply to destination is
+ * encountered; effectively a last used cache of size 1. Fast because it saves creating the reply producer over and
+ * over again when the destination does not change. For a larger fixed set of reply to destinations could turn this
+ * into a cache with more elements.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Bounce back messages to their reply to destination.
+ * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url.
+ * </table>
+ *
+ * @todo Replace the command line parsing with a neater tool.
+ *
+ * @todo Make verbose accept a number of messages, only prints to console every X messages.
+ */
+public class PingPongBouncer extends AbstractPingClient implements MessageListener
+{
+    private static final Logger _logger = Logger.getLogger(PingPongBouncer.class);
+
+    /** The default prefetch size for the message consumer. */
+    private static final int PREFETCH = 1;
+
+    /** The default no local flag for the message consumer. */
+    private static final boolean NO_LOCAL = true;
+
+    /** The default exclusive flag for the message consumer. */
+    private static final boolean EXCLUSIVE = false;
+
+    /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
+    private boolean _verbose = false;
+
+    /** Determines whether this bounce back client bounces back messages persistently. */
+    private boolean _persistent = false;
+
+    /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
+    private Destination _lastResponseDest;
+
+    /** The cached, most recently used reply producer. */
+    private MessageProducer _cachedReplyProducer;
+
+    /** The consumer session. */
+    private Session _consumerSession;
+
+    /** The producer session. */
+    private Session _producerSession;
+
+    /**
+     * Creates a PingPongBouncer on the specified producer and consumer sessions.
+     *
+     * @param brokerDetails
+     * @param username
+     * @param password
+     * @param virtualpath
+     * @param queueName
+     * @param persistent
+     * @param transacted
+     * @param selector
+     * @param verbose
+     * @throws JMSException
+     *
+     * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
+     */
+    public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, String queueName,
+                           boolean persistent, boolean transacted, String selector, boolean verbose) throws Exception
+    {
+        // Create a client id to uniquely identify this client.
+        InetAddress address = InetAddress.getLocalHost();
+        String clientId = address.getHostName() + System.currentTimeMillis();
+
+        // Connect to the broker.
+        setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
+        _logger.info("Connected with URL:" + getConnection().toURL());
+
+        // Set up the failover notifier.
+        getConnection().setConnectionListener(new FailoverNotifier());
+
+        // Create a session to listen for messages on and one to send replies on, transactional depending on the
+        // command line option.
+        Session consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+        Session producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the queue to listen for message on.
+        Queue q = new AMQQueue(queueName);
+        MessageConsumer consumer = consumerSession.createConsumer(q, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+
+        // Hang on to the sessions for the messages and replies.
+        _consumerSession = consumerSession;
+        _producerSession = producerSession;
+
+        _verbose = verbose;
+        _persistent = persistent;
+
+        // Set this up to listen for messages on the queue.
+        consumer.setMessageListener(this);
+    }
+
+    /**
+     * Starts a stand alone ping-pong client running in verbose mode.
+     *
+     * @param args
+     */
+    public static void main(String[] args) throws Exception
+    {
+        System.out.println("Starting...");
+
+        // Display help on the command line.
+        if (args.length < 5)
+        {
+            System.err.println("Usage: <brokerdetails> <username> <password> <virtual-path> <serviceQueue> "
+                               + "[<P[ersistent]|N[onPersistent]> <T[ransacted]|N<onTransacted]>] [selector]");
+            System.exit(1);
+        }
+
+        // Extract all command line parameters.
+        String brokerDetails = args[0];
+        String username = args[1];
+        String password = args[2];
+        String virtualpath = args[3];
+        String queueName = args[4];
+        boolean persistent = ((args.length >= 6) && (args[5].toUpperCase().charAt(0) == 'P'));
+        boolean transacted = ((args.length >= 7) && (args[6].toUpperCase().charAt(0) == 'T'));
+        String selector = (args.length == 8) ? args[5] : null;
+
+        // Instantiate the ping pong client with the command line options and start it running.
+        PingPongBouncer pingBouncer =
+            new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName, persistent, transacted, selector,
+                                true);
+        pingBouncer.getConnection().start();
+
+        System.out.println("Waiting...");
+    }
+
+    /**
+     * This is a callback method that is notified of all messages for which this has been registered as a message
+     * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
+     * destination of the message.
+     *
+     * @param message The message that triggered this callback.
+     */
+    public void onMessage(Message message)
+    {
+        try
+        {
+            String messageCorrelationId = message.getJMSCorrelationID();
+
+            if (_verbose)
+            {
+                _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, "
+                             + messageCorrelationId);
+            }
+
+            // Get the reply to destination from the message and check it is set.
+            Destination responseDest = message.getJMSReplyTo();
+
+            if (responseDest == null)
+            {
+                _logger.debug("Producer not created because the response destination is null.");
+
+                return;
+            }
+
+            // Check if the reply to destination is different to the last message and create a new producer if so.
+            if (!responseDest.equals(_lastResponseDest))
+            {
+                _lastResponseDest = responseDest;
+
+                _logger.debug("About to create a producer.");
+                _cachedReplyProducer = _producerSession.createProducer(responseDest);
+                _cachedReplyProducer.setDisableMessageTimestamp(true);
+                _cachedReplyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+                _logger.debug("After create a producer.");
+            }
+
+            // Spew out some timing information if verbose mode is on.
+            if (_verbose)
+            {
+                Long timestamp = message.getLongProperty("timestamp");
+
+                if (timestamp != null)
+                {
+                    long diff = System.currentTimeMillis() - timestamp;
+                    _logger.info("Time to bounce point: " + diff);
+                }
+            }
+
+            // Correlate the reply to the original.
+            message.setJMSCorrelationID(messageCorrelationId);
+
+            // Send the receieved message as the pong reply.
+            _cachedReplyProducer.send(message);
+
+            if (_verbose)
+            {
+                _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, "
+                             + messageCorrelationId);
+            }
+
+            // Commit the transaction if running in transactional mode.
+            commitTx(_producerSession);
+        }
+        catch (JMSException e)
+        {
+            _logger.debug("There was a JMSException: " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * A connection listener that logs out any failover complete events. Could do more interesting things with this
+     * at some point...
+     */
+    public static class FailoverNotifier implements ConnectionListener
+    {
+        public void bytesSent(long count)
+        { }
+
+        public void bytesReceived(long count)
+        { }
+
+        public boolean preFailover(boolean redirect)
+        {
+            return true;
+        }
+
+        public boolean preResubscribe()
+        {
+            return true;
+        }
+
+        public void failoverComplete()
+        {
+            _logger.info("App got failover complete callback.");
+        }
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Thu Jan 18 05:11:39 2007
@@ -20,22 +20,28 @@
  */
 package org.apache.qpid.requestreply;
 
+import java.net.InetAddress;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.*;
+
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.jms.MessageProducer;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.ping.AbstractPingProducer;
-import org.apache.qpid.util.concurrent.BooleanLatch;
-
-import javax.jms.*;
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
- * client (see {@link org.apache.qpid.requestreply.PingPongClient} for the bounce back client). It is designed to be run from the command line
+ * client (see {@link PingPongBouncer} for the bounce back client). It is designed to be run from the command line
  * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session,
  * message producer and message consumer to run the ping-pong cycle on.
  *
@@ -75,8 +81,20 @@
     /** Holds the name of the queue to send pings on. */
     private static final String PING_QUEUE_NAME = "ping";
 
+    /** The batch size. */
+    private static final int BATCH_SIZE = 100;
+
     /** Keeps track of the ping producer instance used in the run loop. */
     private static PingPongProducer _pingProducer;
+    private static final int PREFETCH = 100;
+    private static final boolean NO_LOCAL = true;
+    private static final boolean EXCLUSIVE = false;
+
+    /** The number of priming loops to run. */
+    private static final int PRIMING_LOOPS = 3;
+
+    /** A source for providing sequential unique correlation ids. */
+    private AtomicLong idGenerator = new AtomicLong(0L);
 
     /** Holds the message producer to send the pings through. */
     private MessageProducer _producer;
@@ -91,32 +109,65 @@
     private int _messageSize;
 
     /** Holds a map from message ids to latches on which threads wait for replies. */
-    private Map<String, BooleanLatch> trafficLights = new HashMap<String, BooleanLatch>();
+    private Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
+
+    /** Used to indicate that the ping loop should print out whenever it pings. */
+    private boolean _verbose = false;
 
-    /** Holds a map from message ids to correlated replies. */
-    private Map<String, Message> replies = new HashMap<String, Message>();
+    private Session _consumerSession;
 
-    public PingPongProducer(Session session, Queue replyQueue, MessageProducer producer, MessageConsumer consumer)
-                     throws JMSException
+    /**
+     * Creates a ping pong producer with the specified connection details and type.
+     *
+     * @param brokerDetails
+     * @param username
+     * @param password
+     * @param virtualpath
+     * @param transacted
+     * @param persistent
+     * @param messageSize
+     * @param verbose
+     *
+     * @throws Exception All allowed to fall through. This is only test code...
+     */
+    public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, String queueName,
+                            String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose)
+                     throws Exception
     {
-        super(session);
-        _producer = producer;
-        _replyQueue = replyQueue;
+        // Create a connection to the broker.
+        InetAddress address = InetAddress.getLocalHost();
+        String clientID = address.getHostName() + System.currentTimeMillis();
+
+        setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
 
+        // Create transactional or non-transactional sessions, based on the command line arguments.
+        setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
+        _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+        // Create a queue and producer to send the pings on.
+        Queue pingQueue = new AMQQueue(queueName);
+        _producer = (MessageProducer) getProducerSession().createProducer(pingQueue);
+        _producer.setDisableMessageTimestamp(true);
+        _producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+        // Create a temporary queue to get the pongs on.
+        _replyQueue = _consumerSession.createTemporaryQueue();
+
+        // Create a message consumer to get the replies with and register this to be called back by it.
+        MessageConsumer consumer = _consumerSession.createConsumer(_replyQueue, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
         consumer.setMessageListener(this);
-    }
 
-    public PingPongProducer(Session session, Queue replyQueue, MessageProducer producer, MessageConsumer consumer,
-                            boolean persistent, int messageSize) throws JMSException
-    {
-        this(session, replyQueue, producer, consumer);
+        // Run a few priming pings to remove warm up time from test results.
+        prime(PRIMING_LOOPS);
 
         _persistent = persistent;
         _messageSize = messageSize;
+
+        _verbose = verbose;
     }
 
     /**
-     * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongClient} also needs
+     * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs
      * to be started to bounce the pings back again.
      *
      * <p/>The command line takes from 2 to 4 arguments:
@@ -141,59 +192,59 @@
 
         String brokerDetails = args[0];
         String virtualpath = args[1];
-        boolean transacted = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : false;
-        boolean persistent = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
-        int messageSize = (args.length >= 5) ? Integer.parseInt(args[4]) : DEFAULT_MESSAGE_SIZE;
-
-        // Create a connection to the broker.
-        InetAddress address = InetAddress.getLocalHost();
-        String clientID = address.getHostName() + System.currentTimeMillis();
-
-        Connection _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath);
-
-        // Create a transactional or non-transactional session, based on the command line arguments.
-        Session session;
-
-        if (transacted)
-        {
-            session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
-        }
-        else
-        {
-            session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        }
-
-        // Create a queue to send the pings on.
-        Queue pingQueue = new AMQQueue(PING_QUEUE_NAME);
-        MessageProducer producer = (MessageProducer) session.createProducer(pingQueue);
-
-        // Create a temporary queue to reply with the pongs on.
-        Queue replyQueue = session.createTemporaryQueue();
-
-        // Create a message consumer to get the replies with.
-        MessageConsumer consumer = session.createConsumer(replyQueue);
+        boolean verbose = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : true;
+        boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
+        boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
+        int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
 
         // Create a ping producer to handle the request/wait/reply cycle.
-        _pingProducer = new PingPongProducer(session, replyQueue, producer, consumer, persistent, messageSize);
-
-        // Start the message consumers running.
-        _connection.start();
+        _pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted,
+                                             persistent, messageSize, verbose);
+        _pingProducer.getConnection().start();
 
         // Create a shutdown hook to terminate the ping-pong producer.
         Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
 
-        // Start the ping loop running, ensuring that it is registered to listen for exceptions on the connection too.
-        _connection.setExceptionListener(_pingProducer);
+        // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+        _pingProducer.getConnection().setExceptionListener(_pingProducer);
+
+        // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
         Thread pingThread = new Thread(_pingProducer);
         pingThread.run();
-
-        // Run until the ping loop is terminated.
         pingThread.join();
     }
 
     /**
+     * Primes the test loop by sending a few messages, then introducing a short wait. This allows the bounce back client
+     * on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling
+     * this a few times, in order to prime the JVMs JIT compilation.
+     *
+     * @param x The number of priming loops to run.
+     *
+     * @throws JMSException All underlying exceptions are allowed to fall through.
+     */
+    public void prime(int x) throws JMSException
+    {
+        for (int i = 0; i < x; i++)
+        {
+            // Create and send a small message.
+            Message first = getTestMessage(_replyQueue, 0, false);
+            _producer.send(first);
+            commitTx(getProducerSession());
+
+            try
+            {
+                Thread.sleep(100);
+            }
+            catch (InterruptedException ignore)
+            { }
+        }
+    }
+
+    /**
      * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
-     * correlating reply may be waiting on.
+     * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected
+     * in the replies map.
      *
      * @param message The received message.
      */
@@ -201,21 +252,37 @@
     {
         try
         {
-            // Store the reply.
+            // Store the reply, if it has a correlation id that is expected.
             String correlationID = message.getJMSCorrelationID();
-            replies.put(correlationID, message);
+
+            if (_verbose)
+            {
+                _logger.info(timestampFormatter.format(new Date()) + ": Got reply with correlation id, " + correlationID);
+            }
 
             // Turn the traffic light to green.
-            BooleanLatch trafficLight = trafficLights.get(correlationID);
+            CountDownLatch trafficLight = trafficLights.get(correlationID);
 
             if (trafficLight != null)
             {
-                trafficLight.signal();
+                _logger.debug("Reply was expected, decrementing the latch for the id.");
+                trafficLight.countDown();
             }
             else
             {
                 _logger.debug("There was no thread waiting for reply: " + correlationID);
             }
+
+            if (_verbose)
+            {
+                Long timestamp = message.getLongProperty("timestamp");
+
+                if (timestamp != null)
+                {
+                    long diff = System.currentTimeMillis() - timestamp;
+                    _logger.info("Time for round trip: " + diff);
+                }
+            }
         }
         catch (JMSException e)
         {
@@ -224,50 +291,90 @@
     }
 
     /**
-     * Sends the specified ping message and then waits for a correlating reply. If the wait times out before a reply
-     * arrives, then a null reply is returned from this method.
+     * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
+     * before a reply arrives, then a null reply is returned from this method.
      *
-     * @param message The message to send.
-     * @param timeout The timeout in milliseconds.
+     * @param message  The message to send.
+     * @param numPings The number of ping messages to send.
+     * @param timeout  The timeout in milliseconds.
      *
-     * @return The reply, or null if no reply arrives before the timeout.
+     * @return The number of replies received. This may be less than the number sent if the timeout terminated the
+     *         wait for all prematurely.
      *
      * @throws JMSException All underlying JMSExceptions are allowed to fall through.
      */
-    public Message pingAndWaitForReply(Message message, long timeout) throws JMSException, InterruptedException
+    public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
     {
-        _producer.send(message);
+        // Put a unique correlation id on the message before sending it.
+        String messageCorrelationId = Long.toString(idGenerator.incrementAndGet());
+        message.setJMSCorrelationID(messageCorrelationId);
 
-        // Keep the messageId to correlate with the reply.
-        String messageId = message.getJMSMessageID();
+        for (int i = 0; i < numPings; i++)
+        {
+            // Re-timestamp the message.
+            message.setLongProperty("timestamp", System.currentTimeMillis());
+
+            _producer.send(message);
+        }
 
         // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
         // this method, as the message will not be sent until the transaction is committed.
-        commitTx();
+        commitTx(getProducerSession());
+
+        // Keep the messageId to correlate with the reply.
+        //String messageId = message.getJMSMessageID();
+
+        if (_verbose)
+        {
+            _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
+        }
 
         // Block the current thread until a reply to the message is received, or it times out.
-        BooleanLatch trafficLight = new BooleanLatch();
-        trafficLights.put(messageId, trafficLight);
+        CountDownLatch trafficLight = new CountDownLatch(numPings);
+        trafficLights.put(messageCorrelationId, trafficLight);
 
         // Note that this call expects a timeout in nanoseconds, millisecond timeout is multiplied up.
-        trafficLight.await(timeout * 1000);
+        trafficLight.await(timeout, TimeUnit.MILLISECONDS);
 
-        // Check the replies to see if one was generated, if not then the reply timed out.
-        Message result = replies.get(messageId);
+        // Work out how many replies were receieved.
+        int numReplies = numPings - (int) trafficLight.getCount();
 
-        return result;
+        if ((numReplies < numPings) && _verbose)
+        {
+            _logger.info("Timed out before all replies received on id, " + messageCorrelationId);
+        }
+        else if (_verbose)
+        {
+            _logger.info("Got all replies on id, " + messageCorrelationId);
+        }
+
+        return numReplies;
     }
 
     /**
-     * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
-     * connection, this clears the publish flag which in turn will halt the ping loop.
+     * Sends the specified ping message but does not wait for a correlating reply.
+     *
+     * @param message  The message to send.
+     * @param numPings The number of pings to send.
      *
-     * @param e The exception that triggered this callback method.
+     * @return The reply, or null if no reply arrives before the timeout.
+     *
+     * @throws JMSException All underlying JMSExceptions are allowed to fall through.
      */
-    public void onException(JMSException e)
+    public void pingNoWaitForReply(Message message, int numPings) throws JMSException, InterruptedException
     {
-        _publish = false;
-        _logger.debug("There was a JMSException: " + e.getMessage(), e);
+        for (int i = 0; i < numPings; i++)
+        {
+            _producer.send(message);
+
+            if (_verbose)
+            {
+                _logger.info(timestampFormatter.format(new Date()) + ": Pinged at.");
+            }
+        }
+
+        // Commit the transaction if running in transactional mode, to force the send now.
+        commitTx(getProducerSession());
     }
 
     /**
@@ -279,11 +386,11 @@
         try
         {
             // Generate a sample message and time stamp it.
-            ObjectMessage msg = getTestMessage(_session, _replyQueue, _messageSize, System.currentTimeMillis(), _persistent);
+            ObjectMessage msg = getTestMessage(_replyQueue, _messageSize, _persistent);
             msg.setLongProperty("timestamp", System.currentTimeMillis());
 
             // Send the message and wait for a reply.
-            pingAndWaitForReply(msg, TIMEOUT);
+            pingAndWaitForReply(msg, BATCH_SIZE, TIMEOUT);
 
             // Introduce a short pause if desired.
             pause(SLEEP_TIME);
@@ -297,6 +404,39 @@
         {
             _publish = false;
             _logger.debug("There was an interruption: " + e.getMessage(), e);
+        }
+    }
+
+    public Queue getReplyQueue()
+    {
+        return _replyQueue;
+    }
+
+    /**
+     * A connection listener that logs out any failover complete events. Could do more interesting things with this
+     * at some point...
+     */
+    public static class FailoverNotifier implements ConnectionListener
+    {
+        public void bytesSent(long count)
+        { }
+
+        public void bytesReceived(long count)
+        { }
+
+        public boolean preFailover(boolean redirect)
+        {
+            return true;
+        }
+
+        public boolean preResubscribe()
+        {
+            return true;
+        }
+
+        public void failoverComplete()
+        {
+            _logger.info("App got failover complete callback.");
         }
     }
 }

Added: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=auto&rev=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Thu Jan 18 05:11:39 2007
@@ -0,0 +1,180 @@
+package org.apache.qpid.ping;
+
+import java.net.InetAddress;
+import java.util.Properties;
+
+import javax.jms.*;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+
+/**
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class PingTestPerf extends TestCase //implements TimingControllerAware
+{
+    private static Logger _logger = Logger.getLogger(PingTestPerf.class);
+
+    /** Holds the name of the property to get the test message size from. */
+    private static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+
+    /** Holds the name of the property to get the ping queue name from. */
+    private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+
+    /** Holds the name of the property to get the test delivery mode from. */
+    private static final String PERSISTENT_MODE_PROPNAME = "persistent";
+
+    /** Holds the name of the property to get the test transactional mode from. */
+    private static final String TRANSACTED_PROPNAME = "transacted";
+
+    /** Holds the name of the property to get the test broker url from. */
+    private static final String BROKER_PROPNAME = "broker";
+
+    /** Holds the name of the property to get the test broker virtual path. */
+    private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
+
+    /** Holds the size of message body to attach to the ping messages. */
+    private static final int MESSAGE_SIZE_DEFAULT = 0;
+
+    /** Holds the name of the queue to which pings are sent. */
+    private static final String PING_QUEUE_NAME_DEFAULT = "ping";
+
+    /** Holds the message delivery mode to use for the test. */
+    private static final boolean PERSISTENT_MODE_DEFAULT = false;
+
+    /** Holds the transactional mode to use for the test. */
+    private static final boolean TRANSACTED_DEFAULT = false;
+
+    /** Holds the default broker url for the test. */
+    private static final String BROKER_DEFAULT = "tcp://localhost:5672";
+
+    /** Holds the default virtual path for the test. */
+    private static final String VIRTUAL_PATH_DEFAULT = "/test";
+
+    /** Sets a default ping timeout. */
+    private static final long TIMEOUT = 3000;
+
+    // Sets up the test parameters with defaults.
+    static
+    {
+        setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
+        setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+        setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
+        setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
+        setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
+        setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
+    }
+
+    /** Holds the test ping-pong producer. */
+    private TestPingProducer _testPingProducer;
+
+    /** Holds the test ping client. */
+    private TestPingClient _testPingClient;
+
+    // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
+    // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
+    // of the test parameters to log with the results.
+    private Properties testParameters = System.getProperties();
+    //private Properties testParameters = new ContextualProperties(System.getProperties());
+
+    public PingTestPerf(String name)
+    {
+        super(name);
+    }
+
+    private static void setSystemPropertyIfNull(String propName, String propValue)
+    {
+        if (System.getProperty(propName) == null)
+        {
+            System.setProperty(propName, propValue);
+        }
+    }
+
+    public void testPingOk() throws Exception
+    {
+        // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+        ObjectMessage msg =
+            _testPingProducer.getTestMessage(null, Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)),
+                                             Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)));
+
+        // Use the test timing controller to reset the test timer now and obtain the current time.
+        // This can be used to remove the message creation time from the test.
+        //TestTimingController timingUtils = getTimingController();
+        //long startTime = timingUtils.restart();
+
+        // Send the message.
+        _testPingProducer.ping(msg);
+
+        // Fail the test if the timeout was exceeded.
+        /*if (reply == null)
+        {
+            Assert.fail("The ping timed out for message id: " + msg.getJMSMessageID());
+        }*/
+    }
+
+    protected void setUp() throws Exception
+    {
+        // Log4j will propagate the test name as a thread local in all log output.
+        NDC.push(getName());
+
+        // Ensure that the connection, session and ping queue are established, if they have not already been.
+        if (_testPingProducer == null)
+        {
+            // Extract the test set up paramaeters.
+            String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
+            String username = "guest";
+            String password = "guest";
+            String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
+            String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
+            boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
+            boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
+            String selector = null;
+            boolean verbose = false;
+            int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+
+            // Establish a bounce back client on the ping queue to bounce back the pings.
+            _testPingClient = new TestPingClient(brokerDetails, username, password, queueName, virtualpath, transacted,
+                                                 selector, verbose);
+
+            // Establish a ping-pong client on the ping queue to send the pings with.
+            _testPingProducer = new TestPingProducer(brokerDetails, username, password, virtualpath, queueName, transacted,
+                                                     persistent, messageSize, verbose);
+
+            // Start the connections for client and producer running.
+            _testPingClient.getConnection().start();
+            _testPingProducer.getConnection().start();
+        }
+    }
+
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            if ((_testPingClient != null) && (_testPingClient.getConnection() != null))
+            {
+                _testPingClient.getConnection().close();
+            }
+
+            if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null))
+            {
+                _testPingProducer.getConnection().close();
+            }
+        }
+        finally
+        {
+            NDC.pop();
+        }
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java Thu Jan 18 05:11:39 2007
@@ -16,6 +16,7 @@
 import org.apache.qpid.jms.Connection;
 import org.apache.qpid.jms.MessageProducer;
 import org.apache.qpid.jms.Session;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
 
 /**
  * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
@@ -42,7 +43,7 @@
  *
  * @author Rupert Smith
  */
-public class PingPongTestPerf extends TestCase implements ExceptionListener //, TimingControllerAware
+public class PingPongTestPerf extends TestCase //implements TimingControllerAware
 {
     private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
 
@@ -99,36 +100,15 @@
     /** Holds the test ping-pong producer. */
     private PingPongProducer _testPingProducer;
 
+    /** Holds the test ping client. */
+    private PingPongBouncer _testPingBouncer;
+
     // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
     // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
-    // of the test parameters to log with the results.
+    // of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
     private Properties testParameters = System.getProperties();
     //private Properties testParameters = new ContextualProperties(System.getProperties());
 
-    /** Holds the connection to the broker. */
-    private Connection _connection = null;
-
-    /** Holds the current session to the broker. */
-    private Session _session;
-
-    /** Holds the destination to send the ping messages to. */
-    private Queue _pingQueue;
-
-    /** Holds the destination to send replies to. */
-    private Queue _replyQueue;
-
-    /** Holds a message producer, set up on the ping destination, to send messages through. */
-    private MessageProducer _producer;
-
-    /** Holds a message consumer, set up on the ping destination, to receive pings through. */
-    private MessageConsumer _pingConsumer;
-
-    /** Holds a message consumer, set up on the pong destination, to receive replies through. */
-    private MessageConsumer _pongConsumer;
-
-    /** Holds a failure flag, which gets set if the connection to the broker goes down. */
-    private boolean _failure;
-
     public PingPongTestPerf(String name)
     {
         super(name);
@@ -146,9 +126,8 @@
     {
         // Generate a sample message. This message is already time stamped and has its reply-to destination set.
         ObjectMessage msg =
-            PingPongProducer.getTestMessage(_session, _replyQueue,
+            _testPingProducer.getTestMessage(_testPingProducer.getReplyQueue(),
                                             Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)),
-                                            System.currentTimeMillis(),
                                             Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)));
 
         // Use the test timing controller to reset the test timer now and obtain the current time.
@@ -157,77 +136,46 @@
         //long startTime = timingUtils.restart();
 
         // Send the message and wait for a reply.
-        Message reply = _testPingProducer.pingAndWaitForReply(msg, TIMEOUT);
+        int numReplies = _testPingProducer.pingAndWaitForReply(msg, 1, TIMEOUT);
 
         // Fail the test if the timeout was exceeded.
-        if (reply == null)
+        if (numReplies != 1)
         {
             Assert.fail("The ping timed out for message id: " + msg.getJMSMessageID());
         }
     }
 
-    /**
-     * This is a callback method that is registered to receive any JMSExceptions that occurr on the connection to
-     * the broker. It sets a failure flag to indicate that there is an error condition.
-     *
-     * @param e The JMSException that triggered this callback method.
-     *
-     * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
-     */
-    public void onException(JMSException e)
-    {
-        // Set the failure flag.
-        _failure = true;
-
-        _logger.debug("There was a JMSException: " + e.getMessage(), e);
-    }
-
     protected void setUp() throws Exception
     {
         // Log4j will propagate the test name as a thread local in all log output.
         NDC.push(getName());
 
         // Ensure that the connection, session and ping queue are established, if they have not already been.
-        if (_connection == null)
+        if (_testPingProducer == null)
         {
-            // Create a client id that identifies the client machine.
-            String clientID = InetAddress.getLocalHost().getHostName() + System.currentTimeMillis();
-
-            // Connect to the broker.
-            _connection = new AMQConnection(testParameters.getProperty(BROKER_PROPNAME), "guest", "guest", clientID,
-                                            testParameters.getProperty(VIRTUAL_PATH_PROPNAME));
-            _connection.setExceptionListener(this);
-
-            // Create a transactional or non-transactional session, based on the test properties, if a session has not
-            // already been created.
-            if (Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)))
-            {
-                _session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
-            }
-            else
-            {
-                _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            }
-
-            // Create a queue to send the pings on.
-            _pingQueue = new AMQQueue(testParameters.getProperty(PING_QUEUE_NAME_PROPNAME));
-            _producer = (MessageProducer) _session.createProducer(_pingQueue);
-
-            // Create a temporary queue to reply with the pongs on.
-            _replyQueue = _session.createTemporaryQueue();
-
-            // Create the ping and pong consumers on their respective destinations.
-            _pingConsumer = _session.createConsumer(_pingQueue);
-            _pongConsumer = _session.createConsumer(_replyQueue);
+            // Extract the test set up paramaeters.
+            String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
+            String username = "guest";
+            String password = "guest";
+            String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
+            String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
+            boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
+            boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
+            String selector = null;
+            boolean verbose = false;
+            int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
 
             // Establish a bounce back client on the ping queue to bounce back the pings.
-            new org.apache.qpid.requestreply.PingPongClient(_session, _pingConsumer, false);
+            _testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName, persistent,
+                                                 transacted, selector, verbose);
 
-            // Establish a ping-pong client on the ping queue to send pings and wait for replies.
-            _testPingProducer = new org.apache.qpid.requestreply.PingPongProducer(_session, _replyQueue, _producer,
-                                                                                  _pongConsumer);
-
-            _connection.start();
+            // Establish a ping-pong client on the ping queue to send the pings with.
+            _testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath, queueName, selector,
+                                                     transacted, persistent, messageSize, verbose);
+
+            // Start the connections for client and producer running.
+            _testPingBouncer.getConnection().start();
+            _testPingProducer.getConnection().start();
         }
     }
 
@@ -235,7 +183,15 @@
     {
         try
         {
-            _connection.close();
+            if ((_testPingBouncer != null) && (_testPingBouncer.getConnection() != null))
+            {
+                _testPingBouncer.getConnection().close();
+            }
+
+            if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null))
+            {
+                _testPingProducer.getConnection().close();
+            }
         }
         finally
         {

Modified: incubator/qpid/trunk/qpid/java/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/pom.xml?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/pom.xml Thu Jan 18 05:11:39 2007
@@ -99,7 +99,7 @@
 
         <!--Versions for various plugins and features -->
         <antrun.version>1.2-SNAPSHOT</antrun.version>
-        <assembly.version>2.1</assembly.version>
+        <assembly.version>2.2-SNAPSHOT</assembly.version>
         <cobertura.version>2.0</cobertura.version>
         <compiler.version>2.0.1</compiler.version>
         <dependency.plugin.version>1.0</dependency.plugin.version>
@@ -284,6 +284,8 @@
                             <property>
                                 <name>amqj.logging.level</name>
                                 <value>${amqj.logging.level}</value>
+				<!--<name>log4j.configuration</name>
+				<value>file:/${topDirectoryLocation}/etc/log4j.xml</value>-->
                             </property>
                         </systemproperties>
                     </configuration>