You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/23 18:08:04 UTC

svn commit: r499083 - in /incubator/qpid/trunk/qpid/java: ./ perftests/ perftests/src/main/java/org/apache/qpid/ping/ perftests/src/main/java/org/apache/qpid/requestreply/ perftests/src/main/java/org/apache/qpid/topic/ perftests/src/test/java/org/apach...

Author: rgreig
Date: Tue Jan 23 09:08:03 2007
New Revision: 499083

URL: http://svn.apache.org/viewvc?view=rev&rev=499083
Log:
(Patch submitted by Rupert Smith) Added the ability to limit the rate at which messages are sent by the ping tests.

Modified:
    incubator/qpid/trunk/qpid/java/perftests/pom.xml
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
    incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
    incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
    incubator/qpid/trunk/qpid/java/pom.xml

Modified: incubator/qpid/trunk/qpid/java/perftests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/pom.xml?view=diff&rev=499083&r1=499082&r2=499083
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/perftests/pom.xml Tue Jan 23 09:08:03 2007
@@ -35,6 +35,7 @@
 
     <properties>
         <topDirectoryLocation>..</topDirectoryLocation>
+	<log4j.perftests>perftests.log4j</log4j.perftests>	
     </properties>
 
     <dependencies>
@@ -91,7 +92,7 @@
 
 		 To generate the scripts do:
 
-		 mvn uk.co.thebadgerset:junit-toolkit-maven-plugin:scripts
+		 mvn uk.co.thebadgerset:junit-toolkit-maven-plugin:tkscriptgen
 
 		 Then to run the scripts do (after doing assembly:assembly goal):
 
@@ -99,7 +100,7 @@
 
 		 These scripts can find everything in the 'all test dependencies' jar created by the assembly:assembly goal.
 		 -->	
-	    <!--
+            <!--
             <plugin>
               <groupId>uk.co.thebadgerset</groupId>
               <artifactId>junit-toolkit-maven-plugin</artifactId> 
@@ -108,10 +109,20 @@
               <configuration>
 	        <scriptOutDirectory>target</scriptOutDirectory>
 	        <testJar>${project.build.finalName}-all-test-deps.jar</testJar>
+
+		<systemproperties>
+                  <property>
+                    <name>log4j.configuration</name>
+                    <value>${log4j.perftests}</value>
+                  </property>
+                </systemproperties>
+
                 <commands>
                   <!## Run the ping pong test once. This is just to check toolkit test runner is working. Real tests follow. ##>
                   <PingOnce>-n PingOnce -s [1] -r 1 -t testPingOk -o . org.apache.qpid.ping.PingTestPerf</PingOnce>
-		  <ThrottleTest>-n ThrottleTest -s [10,10000],samples=8,exp -t testThrottle -o . org.apache.qpid.ping.ThrottleTestPerf</ThrottleTest>
+
+		  <!## Tests the accuracy of the throttle implementation at different speeds. Throttle is used to restrict message rate in some tsts. ##>
+		  <ThrottleTest>-n ThrottleTest -r 5 -s [10,10000],samples=100,exp -t testThrottle -o . org.apache.qpid.ping.ThrottleTestPerf</ThrottleTest>
 
 		  <!##
 		     Skim Tests.
@@ -123,7 +134,7 @@
 		  <Skim-Many>-n Skim-Many -s [1] -c [4] -o . -t testPingOk org.apache.qpid.ping.PingTestPerf</Skim-Many>
 		  <Skim-Queues>-n Skim-Queues -s [1000] -o . -t testPingOk org.apache.qpid.ping.PingTestPerf numQueues=10</Skim-Queues>
 		  <Skim-Duration>-n Skim-Duration -s [1000] -d10S -o . -t testPingOk org.apache.qpid.ping.PingTestPerf</Skim-Duration>
-		  <Skim-Rate>-n Skim-Rate -s [1000] -d10S -o . -t testPingOk org.apache.qpid.ping.PingTestPerf numQueues=10 rate=100</Skim-Rate>
+		  <Skim-Rate>-n Skim-Rate -s [1000] -d10S -o . -t testPingOk org.apache.qpid.ping.PingTestPerf rate=100</Skim-Rate>
 
 		  <!## P2P Volume Tests. ##>
 		  <VQ-Qpid-1>-n VQ-Qpid-1 -s [900000] -o . -t testPingOk org.apache.qpid.ping.PingTestPerf messageSize=1024 transacted=true</VQ-Qpid-1>
@@ -149,8 +160,8 @@
 		  <PQ-Qpid-14>-n PQ-Qpid-14 -s [15000] -c[1000] -D1M -o . -t testPingOk org.apache.qpid.ping.PingTestPerf messageSize=1024</PQ-Qpid-14>
 
 		  <!## Increasing Message Payload Tests. ##>
-		  <LT-Qpid-3-521b>-n LT-Qpid-3-521b -s [900000] -o . -t testPingOk org.apache.qpid.ping.PingTestPerf messageSize=512 transacted=true</LT-Qpid-3-521b>
-		  <LT-Qpid-4-521b>-n LT-Qpid-4-521b -s [900000] -o . -t testPingOk org.apache.qpid.ping.PingTestPerf messageSize=512 transacted=true</LT-Qpid-4-521b>
+		  <LT-Qpid-3-512b>-n LT-Qpid-3-512b -s [900000] -o . -t testPingOk org.apache.qpid.ping.PingTestPerf messageSize=512 transacted=true</LT-Qpid-3-512b>
+		  <LT-Qpid-4-512b>-n LT-Qpid-4-512b -s [900000] -o . -t testPingOk org.apache.qpid.ping.PingTestPerf messageSize=512 transacted=true</LT-Qpid-4-512b>
 		  <LT-Qpid-3-1K>-n LT-Qpid-3-1K -s [900000] -o . -t testPingOk org.apache.qpid.ping.PingTestPerf messageSize=1024 transacted=true</LT-Qpid-3-1K>
 		  <LT-Qpid-4-1K>-n LT-Qpid-4-1K -s [900000] -o . -t testPingOk org.apache.qpid.ping.PingTestPerf messageSize=1024 transacted=true</LT-Qpid-4-1K>
 		  <LT-Qpid-3-5K>-n LT-Qpid-3-5K -s [900000] -o . -t testPingOk org.apache.qpid.ping.PingTestPerf messageSize=5120 transacted=true</LT-Qpid-3-5K>
@@ -214,8 +225,8 @@
 
         </plugins>
 
-        <!-- Include source files in built jar -->
         <resources>
+	    <!-- Include source files in built jar -->
             <resource>
                 <targetPath>src/</targetPath>
                 <filtering>false</filtering>
@@ -224,8 +235,9 @@
                     <include>**/*.java</include>
                 </includes>
             </resource>
+	    <!-- Include a log4j configuration in the jar at the root level (don't name this log4j.properties though as won't be able to override it). -->
             <resource>
-                <targetPath>src/</targetPath>
+                <targetPath>/</targetPath>
                 <filtering>false</filtering>
                 <directory>src/main/java</directory>
                 <includes>

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java?view=diff&rev=499083&r1=499082&r2=499083
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java Tue Jan 23 09:08:03 2007
@@ -1,10 +1,10 @@
 package org.apache.qpid.ping;
 
+import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.io.IOException;
-import java.util.List;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.*;
@@ -15,12 +15,12 @@
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.BasicMessageProducer;
 import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.BasicMessageProducer;
 import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jms.*;
 import org.apache.qpid.jms.Session;
-import org.apache.qpid.framing.AMQShortString;
 
 /**
  * This abstract class captures functionality that is common to all ping producers. It provides functionality to
@@ -46,6 +46,9 @@
      */
     protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
 
+    /** This id generator is used to generate ids to append to the queue name to ensure that queues are unique. */
+    private static AtomicInteger _queueSequenceID = new AtomicInteger();
+
     /**
      * Used to tell the ping loop when to terminate, it only runs while this is true.
      */
@@ -61,12 +64,11 @@
      */
     private Session _producerSession;
 
-
     /**
-     * holds the no of queues the tests will be using to send messages. By default it will be 1
+     * Holds the number of queues the tests will be using to send messages. By default it will be 1
      */
-    protected int _queueCount;
-    private static AtomicInteger _queueSequenceID = new AtomicInteger();
+    protected int _queueCount = 1;
+
     private List<Queue> _queues = new ArrayList<Queue>();
 
     /**
@@ -80,9 +82,8 @@
     protected boolean _failAfterSend = false;
     protected boolean _failOnce = true;
 
-    protected int _sentMessages = 0;
-    protected int _batchSize = 1;
-
+    /** Holds the number of sends that should be performed in every transaction when using transactions. */
+    protected int _txBatchSize = 1;
 
     /**
      * Convenience method for a short pause.
@@ -98,11 +99,14 @@
                 Thread.sleep(sleepTime);
             }
             catch (InterruptedException ie)
-            {
-            }
+            { }
         }
     }
 
+    /**
+     * Implementations should provide this method to perform a single ping cycle (which may send many messages). The
+     * run loop will repeatedly call this method until the publish flag is set to false.
+     */
     public abstract void pingLoop();
 
     /**
@@ -110,8 +114,10 @@
      *
      * @param replyQueue  The reply-to destination for the message.
      * @param messageSize The desired size of the message in bytes.
+     *
      * @return A freshly generated test message.
-     * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
+     *
+     * @throws JMSException All underlying JMSException are allowed to fall through.
      */
     public ObjectMessage getTestMessage(Queue replyQueue, int messageSize, boolean persistent) throws JMSException
     {
@@ -183,12 +189,12 @@
     public Thread getShutdownHook()
     {
         return new Thread(new Runnable()
-        {
-            public void run()
             {
-                stop();
-            }
-        });
+                public void run()
+                {
+                    stop();
+                }
+            });
     }
 
     public Connection getConnection()
@@ -211,12 +217,6 @@
         this._producerSession = session;
     }
 
-
-    protected void commitTx() throws JMSException
-    {
-        commitTx(getProducerSession());
-    }
-
     public int getQueueCount()
     {
         return _queueCount;
@@ -227,6 +227,11 @@
         this._queueCount = queueCount;
     }
 
+    protected void commitTx() throws JMSException
+    {
+        commitTx(getProducerSession());
+    }
+
     /**
      * Creates queues dynamically and adds to the queues list.  This is when the test is being done with
      * multiple queues.
@@ -237,7 +242,8 @@
     {
         for (int i = 0; i < queueCount; i++)
         {
-            AMQShortString name = new AMQShortString("Queue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
+            AMQShortString name =
+                new AMQShortString("Queue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
             AMQQueue queue = new AMQQueue(name, name, false, false, false);
 
             _queues.add(queue);
@@ -256,69 +262,69 @@
      */
     protected void commitTx(Session session) throws JMSException
     {
-        if ((++_sentMessages % _batchSize) == 0)
+        _logger.trace("Batch time reached");
+        if (_failAfterSend)
         {
-            _logger.trace("Batch time reached");
-            if (_failAfterSend)
+            if (_failOnce)
             {
-                if (_failOnce)
-                {
-                    _failAfterSend = false;
-                }
-                _logger.trace("Failing After Send");
-                doFailover();
+                _failAfterSend = false;
             }
 
+            _logger.trace("Failing After Send");
+            doFailover();
+        }
 
-            if (session.getTransacted())
+        if (session.getTransacted())
+        {
+            try
             {
-                try
+                if (_failBeforeCommit)
                 {
-                    if (_failBeforeCommit)
+                    if (_failOnce)
                     {
-                        if (_failOnce)
-                        {
-                            _failBeforeCommit = false;
-                        }
-                        _logger.trace("Failing Before Commit");
-                        doFailover();
+                        _failBeforeCommit = false;
                     }
 
-                    session.commit();
+                    _logger.trace("Failing Before Commit");
+                    doFailover();
+                }
+
+                session.commit();
 
-                    if (_failAfterCommit)
+                if (_failAfterCommit)
+                {
+                    if (_failOnce)
                     {
-                        if (_failOnce)
-                        {
-                            _failAfterCommit = false;
-                        }
-                        _logger.trace("Failing After Commit");
-                        doFailover();
+                        _failAfterCommit = false;
                     }
-                    _logger.trace("Session Commited.");
+
+                    _logger.trace("Failing After Commit");
+                    doFailover();
                 }
-                catch (JMSException e)
-                {
-                    _logger.trace("JMSException on commit:" + e.getMessage(), e);
 
-                    // Warn that the bounce back client is not available.
-                    if (e.getLinkedException() instanceof AMQNoConsumersException)
-                    {
-                        _logger.debug("No consumers on queue.");
-                    }
+                _logger.trace("Session Commited.");
+            }
+            catch (JMSException e)
+            {
+                _logger.trace("JMSException on commit:" + e.getMessage(), e);
 
-                    try
-                    {
-                        session.rollback();
-                        _logger.trace("Message rolled back.");
-                    }
-                    catch (JMSException jmse)
-                    {
-                        _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+                // Warn that the bounce back client is not available.
+                if (e.getLinkedException() instanceof AMQNoConsumersException)
+                {
+                    _logger.debug("No consumers on queue.");
+                }
 
-                        // Both commit and rollback failed. Throw the rollback exception.
-                        throw jmse;
-                    }
+                try
+                {
+                    session.rollback();
+                    _logger.trace("Message rolled back.");
+                }
+                catch (JMSException jmse)
+                {
+                    _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+
+                    // Both commit and rollback failed. Throw the rollback exception.
+                    throw jmse;
                 }
             }
         }
@@ -337,6 +343,7 @@
             {
                 _failBeforeSend = false;
             }
+
             _logger.trace("Failing Before Send");
             doFailover();
         }
@@ -361,8 +368,8 @@
             System.in.read();
         }
         catch (IOException e)
-        {
-        }
+        { }
+
         System.out.println("Continuing.");
     }
 
@@ -374,8 +381,8 @@
             System.in.read();
         }
         catch (IOException e)
-        {
-        }
+        { }
+
         System.out.println("Continuing.");
 
     }

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java?view=diff&rev=499083&r1=499082&r2=499083
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java Tue Jan 23 09:08:03 2007
@@ -17,14 +17,15 @@
  */
 package org.apache.qpid.ping;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.requestreply.PingPongProducer;
-import org.apache.qpid.topic.Config;
-
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.ObjectMessage;
 
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+import org.apache.qpid.topic.Config;
+
 /**
  * This class is used to test sending and receiving messages to (pingQueue) and from a queue (replyQueue).
  * The producer and consumer created by this test send and receive messages to and from the same Queue. ie.
@@ -62,11 +63,10 @@
     public TestPingItself(String brokerDetails, String username, String password, String virtualpath, String queueName,
                           String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
                           boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
-                          int batchSize, int queueCount)
-            throws Exception
+                          int batchSize, int queueCount, int rate) throws Exception
     {
         super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent, messageSize,
-              verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize, queueCount);
+              verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize, queueCount, rate);
 
         if (queueCount > 1)
         {
@@ -82,18 +82,6 @@
     }
 
     /**
-     * Sets the replyQueue to be the same as ping queue.
-     */
-    @Override
-    public void createConsumer(String selector) throws JMSException
-    {
-        // Create a message consumer to get the replies with and register this to be called back by it.
-        setReplyQueue(getPingQueue());
-        MessageConsumer consumer = getConsumerSession().createConsumer(getReplyQueue(), PREFETCH, false, EXCLUSIVE, selector);
-        consumer.setMessageListener(this);
-    }
-
-    /**
      * Starts a ping-pong loop running from the command line.
      *
      * @param args The command line arguments as defined above.
@@ -109,15 +97,15 @@
         boolean verbose = false;
         boolean transacted = config.isTransacted();
         boolean persistent = config.usePersistentMessages();
-        int messageSize = config.getPayload() != 0 ? config.getPayload() : DEFAULT_MESSAGE_SIZE;
+        int messageSize = (config.getPayload() != 0) ? config.getPayload() : DEFAULT_MESSAGE_SIZE;
         int messageCount = config.getMessages();
-        int queueCount = config.getQueueCount() != 0 ? config.getQueueCount() : 1;
-        int batchSize = config.getBatchSize() != 0 ? config.getBatchSize() : BATCH_SIZE;
+        int queueCount = (config.getQueueCount() != 0) ? config.getQueueCount() : 1;
+        int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : BATCH_SIZE;
+        int rate = (config.getRate() != 0) ? config.getRate() : 0;
 
         String queue = "ping_" + System.currentTimeMillis();
-        _logger.info("Queue:" + queue + ", Transacted:" + transacted + ", persistent:" + persistent +
-                     ",MessageSize:" + messageSize + " bytes");
-
+        _logger.info("Queue:" + queue + ", Transacted:" + transacted + ", persistent:" + persistent + ",MessageSize:"
+                     + messageSize + " bytes");
 
         boolean afterCommit = false;
         boolean beforeCommit = false;
@@ -144,6 +132,7 @@
                         afterSend = parts[1].equals("after");
                         beforeSend = parts[1].equals("before");
                     }
+
                     if (parts[1].equals("once"))
                     {
                         failOnce = true;
@@ -158,10 +147,10 @@
         }
 
         // Create a ping producer to handle the request/wait/reply cycle.
-        TestPingItself pingItself = new TestPingItself(brokerDetails, "guest", "guest", virtualpath, queue, null,
-                                                    transacted, persistent, messageSize, verbose,
-                                                    afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-                                                    batchSize, queueCount);
+        TestPingItself pingItself =
+            new TestPingItself(brokerDetails, "guest", "guest", virtualpath, queue, null, transacted, persistent,
+                               messageSize, verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+                               queueCount, rate);
 
         pingItself.getConnection().start();
 
@@ -192,19 +181,29 @@
             pingThread.run();
             pingThread.join();
         }
+
         pingItself.getConnection().close();
     }
 
     private static void usage()
     {
-        System.err.println("Usage: TestPingPublisher \n" +
-                           "-host : broker host" +
-                           "-port : broker port" +
-                           "-transacted : (true/false). Default is false" +
-                           "-persistent : (true/false). Default is false" +
-                           "-payload    : paylaod size. Default is 0" +
-                           "-queues     : no of queues" +
-                           "-messages   : no of messages to be sent (if 0, the ping loop will run indefinitely)");
+        System.err.println("Usage: TestPingPublisher \n" + "-host : broker host" + "-port : broker port"
+                           + "-transacted : (true/false). Default is false" + "-persistent : (true/false). Default is false"
+                           + "-payload    : paylaod size. Default is 0" + "-queues     : no of queues"
+                           + "-messages   : no of messages to be sent (if 0, the ping loop will run indefinitely)");
         System.exit(0);
+    }
+
+    /**
+     * Sets the replyQueue to be the same as ping queue.
+     */
+    @Override
+    public void createConsumer(String selector) throws JMSException
+    {
+        // Create a message consumer to get the replies with and register this to be called back by it.
+        setReplyQueue(getPingQueue());
+        MessageConsumer consumer =
+            getConsumerSession().createConsumer(getReplyQueue(), PREFETCH, false, EXCLUSIVE, selector);
+        consumer.setMessageListener(this);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java?view=diff&rev=499083&r1=499082&r2=499083
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java Tue Jan 23 09:08:03 2007
@@ -85,11 +85,10 @@
      */
     private boolean _verbose = false;
 
-
     public TestPingProducer(String brokerDetails, String username, String password, String virtualpath, String queueName,
                             boolean transacted, boolean persistent, int messageSize, boolean verbose, boolean afterCommit,
-                            boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
-                            int batchSize) throws Exception
+                            boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize)
+                     throws Exception
     {
         // Create a connection to the broker.
         InetAddress address = InetAddress.getLocalHost();
@@ -114,8 +113,7 @@
         _failBeforeCommit = beforeCommit;
         _failAfterSend = afterSend;
         _failBeforeSend = beforeSend;
-        _sentMessages = 0;
-        _batchSize = batchSize;
+        _txBatchSize = batchSize;
         _failOnce = failOnce;
     }
 
@@ -131,8 +129,8 @@
         if (args.length < 2)
         {
             System.err.println(
-                    "Usage: TestPingPublisher <brokerDetails> <virtual path> " +
-                    "[<verbose(true|false)> <transacted(true|false))> <persistent(true|false)> <message size in bytes> <batchsize>");
+                "Usage: TestPingPublisher <brokerDetails> <virtual path> "
+                + "[<verbose(true|false)> <transacted(true|false))> <persistent(true|false)> <message size in bytes> <batchsize>");
             System.exit(0);
         }
 
@@ -144,7 +142,6 @@
         int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
         int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1;
 
-
         boolean afterCommit = false;
         boolean beforeCommit = false;
         boolean afterSend = false;
@@ -170,6 +167,7 @@
                         afterSend = parts[1].equals("after");
                         beforeSend = parts[1].equals("before");
                     }
+
                     if (parts[1].equals("once"))
                     {
                         failOnce = true;
@@ -183,10 +181,9 @@
         }
 
         // Create a ping producer to generate the pings.
-        _pingProducer = new TestPingProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME,
-                                             transacted, persistent, messageSize, verbose,
-                                             afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-                                             batchSize);
+        _pingProducer = new TestPingProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, transacted,
+                                             persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
+                                             beforeSend, failOnce, batchSize);
 
         // Start the connection running.
         _pingProducer.getConnection().start();
@@ -217,7 +214,7 @@
         String messageId = message.getJMSMessageID();
 
         // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
-        // this method, as the message will not be sent until the transaction is committed.        
+        // this method, as the message will not be sent until the transaction is committed.
         commitTx();
     }
 

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java?view=diff&rev=499083&r1=499082&r2=499083
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java Tue Jan 23 09:08:03 2007
@@ -61,6 +61,7 @@
         }
 
         // Keep the time of the last call to this method to calculate the next cycle.
-        lastTimeNanos = currentTimeNanos;
+        //lastTimeNanos = currentTimeNanos;
+        lastTimeNanos = System.nanoTime();
     }
 }

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?view=diff&rev=499083&r1=499082&r2=499083
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java Tue Jan 23 09:08:03 2007
@@ -38,20 +38,14 @@
  * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes
  * too.
  *
- * <p/>The message id from the received message is extracted, and placed into the reply as the correlation id. Messages
- * are bounced back to the reply-to destination. The original sender of the message has the option to use either a unique
+ * <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages
+ * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique
  * temporary queue or the correlation id to correlate the original message to the reply.
  *
  * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
  * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
  * be disabled for real timing tests as writing to the console will slow things down.
  *
- * <p/>When the a message is received, a reply to producer is created for it if it is not the same as the previous
- * message. All subsequent replies are sent using that producer until a different reply to destination is
- * encountered; effectively a last used cache of size 1. Fast because it saves creating the reply producer over and
- * over again when the destination does not change. For a larger fixed set of reply to destinations could turn this
- * into a cache with more elements.
- *
  * <p><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td> Bounce back messages to their reply to destination.
@@ -84,8 +78,8 @@
     /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
     private Destination _lastResponseDest;
 
-    /** The cached, most recently used reply producer. */
-    private MessageProducer _cachedReplyProducer;
+    /** The producer for sending replies with. */
+    private MessageProducer _replyProducer;
 
     /** The consumer session. */
     private Session _consumerSession;
@@ -96,16 +90,15 @@
     /**
      * Creates a PingPongBouncer on the specified producer and consumer sessions.
      *
-     * @param brokerDetails
-     * @param username
-     * @param password
-     * @param virtualpath
-     * @param queueName
-     * @param persistent
-     * @param transacted
-     * @param selector
-     * @param verbose
-     * @throws JMSException
+     * @param brokerDetails The addresses of the brokers to connect to.
+     * @param username      The broker username.
+     * @param password      The broker password.
+     * @param virtualpath   The virtual host name within the broker.
+     * @param queueName     The name of the queue to receive pings on (or root of the queue name where many queues are generated).
+     * @param persistent    A flag to indicate that persistent message should be used.
+     * @param transacted    A flag to indicate that pings should be sent within transactions.
+     * @param selector      A message selector to filter received pings with.
+     * @param verbose       A flag to indicate that message timings should be sent to the console.
      *
      * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
      */
@@ -132,6 +125,11 @@
         Queue q = new AMQQueue(queueName);
         MessageConsumer consumer = _consumerSession.createConsumer(q, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
 
+        // Create a producer for the replies, without a default destination.
+        _replyProducer = _producerSession.createProducer(null);
+        _replyProducer.setDisableMessageTimestamp(true);
+        _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
         _verbose = verbose;
         _persistent = persistent;
 
@@ -199,23 +197,11 @@
 
             if (responseDest == null)
             {
-                _logger.debug("Producer not created because the response destination is null.");
+                _logger.debug("Cannot send reply because reply-to destination is null.");
 
                 return;
             }
 
-            // Check if the reply to destination is different to the last message and create a new producer if so.
-            if (!responseDest.equals(_lastResponseDest))
-            {
-                _lastResponseDest = responseDest;
-
-                _logger.debug("About to create a producer.");
-                _cachedReplyProducer = _producerSession.createProducer(responseDest);
-                _cachedReplyProducer.setDisableMessageTimestamp(true);
-                _cachedReplyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-                _logger.debug("After create a producer.");
-            }
-
             // Spew out some timing information if verbose mode is on.
             if (_verbose)
             {
@@ -232,7 +218,7 @@
             message.setJMSCorrelationID(messageCorrelationId);
 
             // Send the receieved message as the pong reply.
-            _cachedReplyProducer.send(message);
+            _replyProducer.send(responseDest, message);
 
             if (_verbose)
             {

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=499083&r1=499082&r2=499083
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Tue Jan 23 09:08:03 2007
@@ -38,6 +38,7 @@
 import org.apache.qpid.jms.MessageProducer;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.ping.AbstractPingProducer;
+import org.apache.qpid.ping.Throttle;
 
 /**
  * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
@@ -108,6 +109,11 @@
     private static AtomicLong idGenerator = new AtomicLong(0L);
 
     /**
+     * Holds a map from message ids to latches on which threads wait for replies.
+     */
+    private static Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
+
+    /**
      * Holds the queue to send the ping replies to.
      */
     private Queue _replyQueue;
@@ -128,22 +134,25 @@
     protected int _messageSize;
 
     /**
-     * Holds a map from message ids to latches on which threads wait for replies.
-     */
-    private static Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
-
-    /**
      * Used to indicate that the ping loop should print out whenever it pings.
      */
     protected boolean _verbose = false;
 
     protected Session _consumerSession;
 
-    private PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
-                             boolean transacted, boolean persistent, int messageSize, boolean verbose,
-                             boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
-                             int batchSize)
-            throws Exception
+    /** Used to restrict the sending rate to a specified limit. */
+    private Throttle rateLimiter = null;
+
+    /**
+     * The throttler can only reliably restrict to a few hundred cycles per second, so a throttling batch size is used
+     * to group sends together into batches large enough that the throttler runs slower than that.
+     */
+    int _throttleBatchSize;
+
+    private PingPongProducer(String brokerDetails, String username, String password, String virtualpath, boolean transacted,
+                             boolean persistent, int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
+                             boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize, int rate)
+                      throws Exception
     {
         // Create a connection to the broker.
         InetAddress address = InetAddress.getLocalHost();
@@ -165,8 +174,31 @@
         _failAfterSend = afterSend;
         _failBeforeSend = beforeSend;
         _failOnce = failOnce;
-        _batchSize = batchSize;
-        _sentMessages = 0;
+        _txBatchSize = batchSize;
+
+        // Calculate a throttling batch size and rate such that the throttle runs slower than 100 cycles per second
+        // and batched sends within each cycle multiply up to give the desired rate.
+        //
+        // total rate = throttle rate * batch size.
+        // 1 < throttle rate < 100
+        // 1 < total rate < 20000
+        if (rate > 0)
+        {
+            // Log base 10 over 2 is used here to get a feel for what power of 100 the total rate is.
+            // As the total rate goes up the powers of 100 the batch size goes up by powers of 100 to keep the
+            // throttle rate back into the range 1 to 100.
+            int x = (int) (Math.log10(rate) / 2);
+            _throttleBatchSize = (int) Math.pow(100, x);
+            int throttleRate = rate / _throttleBatchSize;
+
+            _logger.info("rate = " + rate);
+            _logger.info("x = " + x);
+            _logger.info("_throttleBatchSize = " + _throttleBatchSize);
+            _logger.info("throttleRate = " + throttleRate);
+
+            rateLimiter = new Throttle();
+            rateLimiter.setRate(throttleRate);
+        }
     }
 
     /**
@@ -181,12 +213,11 @@
      */
     public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, String queueName,
                             String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
-                            boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
-                            int batchSize, int queueCount)
-            throws Exception
+                            boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend,
+                            boolean failOnce, int batchSize, int queueCount, int rate) throws Exception
     {
-        this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose,
-             afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize);
+        this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose, afterCommit,
+             beforeCommit, afterSend, beforeSend, failOnce, batchSize, rate);
 
         _queueCount = queueCount;
         if (queueCount <= 1)
@@ -207,76 +238,6 @@
     }
 
     /**
-     * Creates the producer to send the pings on.  If the tests are with nultiple queues, then producer
-     * is created with null destination, so that any destination can be specified while sending
-     *
-     * @throws JMSException
-     */
-    public void createProducer() throws JMSException
-    {
-        if (getQueueCount() > 1)
-        {
-            // create producer with initial destination as null for test with multiple queues
-            // In this case, a different destination will be used while sending the message
-            _producer = (MessageProducer) getProducerSession().createProducer(null);
-        }
-        else
-        {
-            // Create a queue and producer to send the pings on.
-            _producer = (MessageProducer) getProducerSession().createProducer(_pingQueue);
-
-        }
-        _producer.setDisableMessageTimestamp(true);
-        _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-    }
-
-    /**
-     * Creates the temporary queue to listen to the responses
-     *
-     * @param selector
-     * @throws JMSException
-     */
-    public void createConsumer(String selector) throws JMSException
-    {
-        // Create a temporary queue to get the pongs on.
-        _replyQueue = _consumerSession.createTemporaryQueue();
-
-        // Create a message consumer to get the replies with and register this to be called back by it.
-        MessageConsumer consumer = _consumerSession.createConsumer(_replyQueue, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
-        consumer.setMessageListener(this);
-    }
-
-    /**
-     * Creates consumer instances for each queue. This is used when test is being done with multiple queues.
-     *
-     * @param selector
-     * @throws JMSException
-     */
-    public void createConsumers(String selector) throws JMSException
-    {
-        for (int i = 0; i < getQueueCount(); i++)
-        {
-            MessageConsumer consumer = getConsumerSession().createConsumer(getQueue(i), PREFETCH, false, EXCLUSIVE, selector);
-            consumer.setMessageListener(this);
-        }
-    }
-
-    protected Session getConsumerSession()
-    {
-        return _consumerSession;
-    }
-
-    public Queue getPingQueue()
-    {
-        return _pingQueue;
-    }
-
-    protected void setPingQueue(Queue queue)
-    {
-        _pingQueue = queue;
-    }
-
-    /**
      * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs
      * to be started to bounce the pings back again.
      * <p/>
@@ -295,8 +256,9 @@
         // Extract the command line.
         if (args.length < 2)
         {
-            System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose (true/false)] " +
-                               "[transacted (true/false)] [persistent (true/false)] [message size in bytes]");
+            System.err.println(
+                "Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose (true/false)] "
+                + "[transacted (true/false)] [persistent (true/false)] [message size in bytes] [batchsize] [rate]");
             System.exit(0);
         }
 
@@ -307,7 +269,7 @@
         boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
         int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
         int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1;
-
+        int rate = (args.length >= 8) ? Integer.parseInt(args[7]) : 0;
 
         boolean afterCommit = false;
         boolean beforeCommit = false;
@@ -334,6 +296,7 @@
                         afterSend = parts[1].equals("after");
                         beforeSend = parts[1].equals("before");
                     }
+
                     if (parts[1].equals("once"))
                     {
                         failOnce = true;
@@ -347,10 +310,10 @@
         }
 
         // Create a ping producer to handle the request/wait/reply cycle.
-        PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted,
-                                             persistent, messageSize, verbose,
-                                             afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-                                             batchSize, 0);
+        PingPongProducer pingProducer =
+            new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted, persistent,
+                                 messageSize, verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+                                 0, rate);
 
         pingProducer.getConnection().start();
 
@@ -369,6 +332,68 @@
     }
 
     /**
+     * Creates the producer to send the pings on.  If the tests are with nultiple queues, then producer
+     * is created with null destination, so that any destination can be specified while sending
+     *
+     * @throws JMSException
+     */
+    public void createProducer() throws JMSException
+    {
+        if (getQueueCount() > 1)
+        {
+            // create producer with initial destination as null for test with multiple queues
+            // In this case, a different destination will be used while sending the message
+            _producer = (MessageProducer) getProducerSession().createProducer(null);
+        }
+        else
+        {
+            // Create a queue and producer to send the pings on.
+            _producer = (MessageProducer) getProducerSession().createProducer(_pingQueue);
+
+        }
+
+        _producer.setDisableMessageTimestamp(true);
+        _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+    }
+
+    /**
+     * Creates the temporary queue to listen to the responses
+     *
+     * @param selector
+     * @throws JMSException
+     */
+    public void createConsumer(String selector) throws JMSException
+    {
+        // Create a temporary queue to get the pongs on.
+        _replyQueue = _consumerSession.createTemporaryQueue();
+
+        // Create a message consumer to get the replies with and register this to be called back by it.
+        MessageConsumer consumer = _consumerSession.createConsumer(_replyQueue, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+        consumer.setMessageListener(this);
+    }
+
+    /**
+     * Creates consumer instances for each queue. This is used when test is being done with multiple queues.
+     *
+     * @param selector
+     * @throws JMSException
+     */
+    public void createConsumers(String selector) throws JMSException
+    {
+        for (int i = 0; i < getQueueCount(); i++)
+        {
+            MessageConsumer consumer =
+                getConsumerSession().createConsumer(getQueue(i), PREFETCH, false, EXCLUSIVE, selector);
+            consumer.setMessageListener(this);
+        }
+    }
+
+    public Queue getPingQueue()
+    {
+        return _pingQueue;
+    }
+
+    /**
      * Primes the test loop by sending a few messages, then introduces a short wait. This allows the bounce back client
      * on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling
      * this a few times, in order to prime the JVMs JIT compilation.
@@ -390,8 +415,7 @@
                 Thread.sleep(100);
             }
             catch (InterruptedException ignore)
-            {
-            }
+            { }
         }
     }
 
@@ -452,8 +476,10 @@
      * @param message  The message to send.
      * @param numPings The number of ping messages to send.
      * @param timeout  The timeout in milliseconds.
+     *
      * @return The number of replies received. This may be less than the number sent if the timeout terminated the
      *         wait for all prematurely.
+     *
      * @throws JMSException All underlying JMSExceptions are allowed to fall through.
      */
     public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
@@ -467,24 +493,52 @@
         CountDownLatch trafficLight = new CountDownLatch(numPings);
         trafficLights.put(messageCorrelationId, trafficLight);
 
-        if (getQueueCount() > 1)
-        {
-            // If test is with multiple queues
-            pingMultipleQueues(message, numPings);
-        }
-        else
+        // Set up a committed flag to detect uncommitted message at the end of the send loop. This may occurr if the
+        // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
+        // needed.
+        boolean committed = false;
+
+        // Send all of the ping messages.
+        for (int i = 0; i < numPings; i++)
         {
-            // If test is with one Queue only
-            for (int i = 0; i < numPings; i++)
+            // Reset the committed flag to indicate that there are uncommitted message.
+            committed = false;
+
+            // Re-timestamp the message.
+            message.setLongProperty("timestamp", System.currentTimeMillis());
+
+            // Check if the test is with multiple queues, in which case round robin the queues as the messages are sent.
+            if (getQueueCount() > 1)
+            {
+                sendMessage(getQueue(i % getQueueCount()), message);
+            }
+            else
             {
-                // Re-timestamp the message.
-                message.setLongProperty("timestamp", System.currentTimeMillis());
                 sendMessage(message);
             }
+
+            // Apply message rate throttling if a rate limit has been set up and the throttling batch limit has been
+            // reached. See the comment on the throttle batch size for information about the use of batches here.
+            if ((rateLimiter != null) && ((i % _throttleBatchSize) == 0))
+            {
+                rateLimiter.throttle();
+            }
+
+            // Call commit every time the commit batch size is reached.
+            if ((i % _txBatchSize) == 0)
+            {
+                commitTx();
+                committed = true;
+            }
+        }
+
+        // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
+        if (!committed)
+        {
+            commitTx();
         }
 
-        // Keep the messageId to correlate with the reply.
-        //String messageId = message.getJMSMessageID();
+        // Spew out per message timings only in verbose mode.
         if (_verbose)
         {
             _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
@@ -498,7 +552,7 @@
 
         if ((numReplies < numPings) && _verbose)
         {
-            _logger.info("Timed out (" + timeout  + " ms) before all replies received on id, " + messageCorrelationId);
+            _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
         }
         else if (_verbose)
         {
@@ -508,33 +562,7 @@
         return numReplies;
     }
 
-    /**
-     * When the test is being performed with multiple queues, then this method will be used, which has a loop to
-     * pick up the next queue from the queues list and sends message to it.
-     *
-     * @param message
-     * @param numPings
-     * @throws JMSException
-     */
-    private void pingMultipleQueues(Message message, int numPings) throws JMSException
-    {
-        int queueIndex = 0;
-        for (int i = 0; i < numPings; i++)
-        {
-            // Re-timestamp the message.
-            message.setLongProperty("timestamp", System.currentTimeMillis());
-
-            sendMessage(getQueue(queueIndex++), message);
-
-            // reset the counter to get the first queue
-            if (queueIndex == getQueueCount() - 1)
-            {
-                queueIndex = 0;
-            }
-        }
-    }
-
-    /**
+    /*
      * Sends the specified ping message but does not wait for a correlating reply.
      *
      * @param message  The message to send.
@@ -542,7 +570,7 @@
      * @return The reply, or null if no reply arrives before the timeout.
      * @throws JMSException All underlying JMSExceptions are allowed to fall through.
      */
-    public void pingNoWaitForReply(Message message, int numPings) throws JMSException, InterruptedException
+    /*public void pingNoWaitForReply(Message message, int numPings) throws JMSException, InterruptedException
     {
         for (int i = 0; i < numPings; i++)
         {
@@ -553,7 +581,7 @@
                 _logger.info(timestampFormatter.format(new Date()) + ": Pinged at.");
             }
         }
-    }
+    }*/
 
     /**
      * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and
@@ -590,11 +618,47 @@
         return _replyQueue;
     }
 
+    protected Session getConsumerSession()
+    {
+        return _consumerSession;
+    }
+
+    protected void setPingQueue(Queue queue)
+    {
+        _pingQueue = queue;
+    }
+
     protected void setReplyQueue(Queue queue)
     {
         _replyQueue = queue;
     }
 
+    /*
+     * When the test is being performed with multiple queues, then this method will be used, which has a loop to
+     * pick up the next queue from the queues list and sends message to it.
+     *
+     * @param message
+     * @param numPings
+     * @throws JMSException
+     */
+    /*private void pingMultipleQueues(Message message, int numPings) throws JMSException
+    {
+        int queueIndex = 0;
+        for (int i = 0; i < numPings; i++)
+        {
+            // Re-timestamp the message.
+            message.setLongProperty("timestamp", System.currentTimeMillis());
+
+            sendMessage(getQueue(queueIndex++), message);
+
+            // reset the counter to get the first queue
+            if (queueIndex == (getQueueCount() - 1))
+            {
+                queueIndex = 0;
+            }
+        }
+    }*/
+
     /**
      * A connection listener that logs out any failover complete events. Could do more interesting things with this
      * at some point...
@@ -602,12 +666,10 @@
     public static class FailoverNotifier implements ConnectionListener
     {
         public void bytesSent(long count)
-        {
-        }
+        { }
 
         public void bytesReceived(long count)
-        {
-        }
+        { }
 
         public boolean preFailover(boolean redirect)
         {

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java?view=diff&rev=499083&r1=499082&r2=499083
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java Tue Jan 23 09:08:03 2007
@@ -47,6 +47,7 @@
     private boolean transacted;
     private int noOfQueues;
     private int batchSize;
+    private int rate;
 
     public Config()
     {
@@ -92,6 +93,11 @@
         return batchSize;
     }
 
+    public int getRate()
+    {
+        return rate;
+    }
+
     public int getQueueCount()
     {
         return noOfQueues;
@@ -244,6 +250,10 @@
         else if ("-batchsize".equalsIgnoreCase(key))
         {
             batchSize = parseInt("Bad batch size", value);
+        }
+        else if ("-rate".equalsIgnoreCase(key))
+        {
+            rate = parseInt("MEssage rate", value);
         }
         else
         {

Modified: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=499083&r1=499082&r2=499083
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Tue Jan 23 09:08:03 2007
@@ -75,10 +75,13 @@
     private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
 
     /**
-     * Holds the waiting timeout for response messages
+     * Holds the name of the property to get the waiting timeout for response messages.
      */
     private static final String TIMEOUT_PROPNAME = "timeout";
 
+    /** Holds the name of the property to get the message rate from. */
+    private static final String RATE_PROPNAME = "rate";
+
     private static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
 
     /**
@@ -118,6 +121,8 @@
      */
     private static final long TIMEOUT_DEFAULT = 3000;
 
+    /** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */
+    private static final int RATE_DEFAULT = 0;
 
     private static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
     private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
@@ -126,7 +131,6 @@
     private static final String BATCH_SIZE = "BatchSize";
     private static final String FAIL_ONCE = "FailOnce";
 
-
     /**
      * Thread local to hold the per-thread test setup fields.
      */
@@ -138,13 +142,11 @@
     private Properties testParameters = System.getProperties();
     //private Properties testParameters = new ContextualProperties(System.getProperties());
 
-
     public PingTestPerf(String name)
     {
         super(name);
         // Sets up the test parameters with defaults.
 
-
         setSystemPropertyIfNull(FAIL_AFTER_COMMIT, "false");
         setSystemPropertyIfNull(FAIL_BEFORE_COMMIT, "false");
         setSystemPropertyIfNull(FAIL_AFTER_SEND, "false");
@@ -161,6 +163,7 @@
         setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT));
         setSystemPropertyIfNull(PING_QUEUE_COUNT_PROPNAME, Integer.toString(1));
         setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false));
+        setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT));
     }
 
     /**
@@ -175,7 +178,7 @@
         suite.addTest(new PingTestPerf("testPingOk"));
 
         return suite;
-        //return new junit.framework.TestSuite(PingTestPerf.class);
+               //return new junit.framework.TestSuite(PingTestPerf.class);
     }
 
     private static void setSystemPropertyIfNull(String propName, String propValue)
@@ -202,11 +205,11 @@
 
         // Generate a sample message. This message is already time stamped and has its reply-to destination set.
         ObjectMessage msg =
-                perThreadSetup._pingItselfClient.getTestMessage(null,
-                                                                Integer.parseInt(testParameters.getProperty(
-                                                                        MESSAGE_SIZE_PROPNAME)),
-                                                                Boolean.parseBoolean(testParameters.getProperty(
-                                                                        PERSISTENT_MODE_PROPNAME)));
+            perThreadSetup._pingItselfClient.getTestMessage(null,
+                                                            Integer.parseInt(testParameters.getProperty(
+                                                                                 MESSAGE_SIZE_PROPNAME)),
+                                                            Boolean.parseBoolean(testParameters.getProperty(
+                                                                                     PERSISTENT_MODE_PROPNAME)));
 
         // start the test
         long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
@@ -215,7 +218,8 @@
         // Fail the test if the timeout was exceeded.
         if (numReplies != numPings)
         {
-            Assert.fail("The ping timed out after "+ timeout  + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
+            Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
+                        + numReplies);
         }
     }
 
@@ -243,6 +247,7 @@
             String selector = null;
             boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
             int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+            int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
 
             boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
             boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
@@ -254,14 +259,14 @@
 
             // This is synchronized because there is a race condition, which causes one connection to sleep if
             // all threads try to create connection concurrently
-            synchronized(this)
+            synchronized (this)
             {
                 // Establish a client to ping a Queue and listen the reply back from same Queue
                 perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
                                                                       queueName, selector, transacted, persistent,
-                                                                      messageSize, verbose,
-                                                                      afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-                                                                      batchSize, queueCount);
+                                                                      messageSize, verbose, afterCommit, beforeCommit,
+                                                                      afterSend, beforeSend, failOnce, batchSize, queueCount,
+                                                                      rate);
             }
             // Start the client connection
             perThreadSetup._pingItselfClient.getConnection().start();

Modified: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java?view=diff&rev=499083&r1=499082&r2=499083
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java Tue Jan 23 09:08:03 2007
@@ -109,16 +109,18 @@
      */
     private static final long TIMEOUT = 15000;
 
-    // Sets up the test parameters with defaults.
-    static
-    {
-        setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
-        setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
-        setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
-        setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
-        setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
-        setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
-    }
+    /** Holds the name of the property to get the message rate from. */
+    private static final String RATE_PROPNAME = "rate";
+
+    /** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */
+    private static final int RATE_DEFAULT = 0;
+
+    private static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
+    private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
+    private static final String FAIL_AFTER_SEND = "FailAfterSend";
+    private static final String FAIL_BEFORE_SEND = "FailBeforeSend";
+    private static final String BATCH_SIZE = "BatchSize";
+    private static final String FAIL_ONCE = "FailOnce";
 
     /**
      * Thread local to hold the per-thread test setup fields.
@@ -131,17 +133,18 @@
     private Properties testParameters = System.getProperties();
     //private Properties testParameters = new ContextualProperties(System.getProperties());
 
-    private static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
-    private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
-    private static final String FAIL_AFTER_SEND = "FailAfterSend";
-    private static final String FAIL_BEFORE_SEND = "FailBeforeSend";
-    private static final String BATCH_SIZE = "BatchSize";
-    private static final String FAIL_ONCE = "FailOnce";
-
-
     public PingPongTestPerf(String name)
     {
         super(name);
+
+        // Sets up the test parameters with defaults.
+        setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
+        setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+        setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
+        setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
+        setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
+        setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
+        setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT));
     }
 
     /**
@@ -173,11 +176,11 @@
 
         // Generate a sample message. This message is already time stamped and has its reply-to destination set.
         ObjectMessage msg =
-                perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyQueue(),
-                                                                Integer.parseInt(testParameters.getProperty(
-                                                                        MESSAGE_SIZE_PROPNAME)),
-                                                                Boolean.parseBoolean(testParameters.getProperty(
-                                                                        PERSISTENT_MODE_PROPNAME)));
+            perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyQueue(),
+                                                            Integer.parseInt(testParameters.getProperty(
+                                                                                 MESSAGE_SIZE_PROPNAME)),
+                                                            Boolean.parseBoolean(testParameters.getProperty(
+                                                                                     PERSISTENT_MODE_PROPNAME)));
 
         // Use the test timing controller to reset the test timer now and obtain the current time.
         // This can be used to remove the message creation time from the test.
@@ -216,6 +219,7 @@
             String selector = null;
             boolean verbose = false;
             int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+            int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
 
             boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
             boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
@@ -234,9 +238,8 @@
             // Establish a ping-pong client on the ping queue to send the pings with.
             perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath,
                                                                     queueName, selector, transacted, persistent, messageSize,
-                                                                    verbose,
-                                                                    afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
-                                                                    batchSize, 0);
+                                                                    verbose, afterCommit, beforeCommit, afterSend,
+                                                                    beforeSend, failOnce, batchSize, 0, rate);
 
             perThreadSetup._testPingProducer.getConnection().start();
 
@@ -253,7 +256,7 @@
              {
              _testPingBouncer.getConnection().close();
              }
-
+            
              if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null))
              {
              _testPingProducer.getConnection().close();

Modified: incubator/qpid/trunk/qpid/java/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/pom.xml?view=diff&rev=499083&r1=499082&r2=499083
==============================================================================
--- incubator/qpid/trunk/qpid/java/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/pom.xml Tue Jan 23 09:08:03 2007
@@ -97,6 +97,13 @@
         <java.source.version>1.5</java.source.version>
         <compile.flags>-Xlint:fallthrough,finally</compile.flags>
 
+	<!-- 
+          This should always point to a default minimal log4j configuration that all developers are happy with as a useable default. To use your own
+          log4j preferences set up an alternative in your settings.xml and avoid corrupting the default with private preferences.
+	  -->
+	<!--<log4j.configuration>file:/${topDirectoryLocation}/etc/log4j.xml</log4j.configuration>-->
+        <amqj.logging.level>warn</amqj.logging.level> <!-- This is referenced in the default log4j.xml -->
+
         <!--Versions for various plugins and features -->
         <antrun.version>1.2-SNAPSHOT</antrun.version>
         <!--<assembly.version>2.2-SNAPSHOT</assembly.version>-->
@@ -115,8 +122,6 @@
         <surefire-report.version>2.1-SNAPSHOT</surefire-report.version>
         <surefire.version>2.2</surefire.version>
 
-        <amqj.logging.level>warn</amqj.logging.level>
-
         <eclipse.workspace.dir>${basedir}/${topDirectoryLocation}/../workspace</eclipse.workspace.dir>
         <clover.license.pathname>/set/clover/license/path/here</clover.license.pathname>
     </properties>
@@ -283,10 +288,12 @@
                         <argLine>-ea</argLine>
                         <systemproperties>
                             <property>
-                                <name>amqj.logging.level</name>
+			        <name>amqj.logging.level</name>
                                 <value>${amqj.logging.level}</value>
-				<!--<name>log4j.configuration</name>
-				<value>file:/${topDirectoryLocation}/etc/log4j.xml</value>-->
+				<!--
+				<name>log4j.configuration</name>
+				<value>${log4j.configuration}</value>
+				-->
                             </property>
                         </systemproperties>
                     </configuration>