You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/05 15:43:17 UTC

svn commit: r503703 [2/2] - in /incubator/qpid/branches/perftesting/qpid/java: ./ broker/etc/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/q...

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Mon Feb  5 06:43:14 2007
@@ -50,21 +50,21 @@
 /**
  * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
  * client (see {@link PingPongBouncer} for the bounce back client).
- *
+ * <p/>
  * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
  * This means that this class has to do some work to correlate pings with pongs; it expectes the original message
  * correlation id in the ping to be bounced back in the reply correlation id.
- *
+ * <p/>
  * <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor.
  * It can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings
  * within transactions; control the number of pings to send in each transaction; limit its sending rate; and perform
  * failover testing.
- *
+ * <p/>
  * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
  * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
  * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
  * also registered to terminate the ping-pong loop cleanly.
- *
+ * <p/>
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td> Provide a ping and wait for all responses cycle.
@@ -72,25 +72,21 @@
  * </table>
  *
  * @todo The use of a ping rate {@link #DEFAULT_RATE} and waits between pings {@link #DEFAULT_SLEEP_TIME} are overlapping.
- *       Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to
- *       be created and configured by the test runner from the -f command line option and made available through
- *       the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware
- *       tests.
- *
+ * Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to
+ * be created and configured by the test runner from the -f command line option and made available through
+ * the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware
+ * tests.
  * @todo Make acknowledege mode a test option.
- *
  * @todo Make the message listener a static for all replies to be sent to? It won't be any more of a bottle neck than
- *       having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process
- *       messages concurrently for different ids. Needs to be static so that when using a chained message listener and
- *       shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to
- *       be picked up by the PPP that it is atteched to.
- *
+ * having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process
+ * messages concurrently for different ids. Needs to be static so that when using a chained message listener and
+ * shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to
+ * be picked up by the PPP that it is atteched to.
  * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock
- *       pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message
- *       method add a block that obtains the write lock for the very last message, releases any waiting producer. Means
- *       that the last message waits until all other messages have been handled before releasing producers but allows
- *       messages to be processed concurrently, unlike the current synchronized block.
- *
+ * pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message
+ * method add a block that obtains the write lock for the very last message, releases any waiting producer. Means
+ * that the last message waits until all other messages have been handled before releasing producers but allows
+ * messages to be processed concurrently, unlike the current synchronized block.
  * @todo Need to multiply up the number of expected messages for pubsub tests as each can be received by many consumers?
  */
 public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
@@ -228,9 +224,6 @@
     /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
     private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
 
-    /** A source for providing unique ids to PingPongProducer. */
-    private static AtomicInteger _pingProducerIdGenerator;
-
     /**
      * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross
      * multiple ping producers on the same JVM.
@@ -238,7 +231,7 @@
     /*private static Map<String, CountDownLatch> trafficLights =
         Collections.synchronizedMap(new HashMap<String, CountDownLatch>());*/
     private static Map<String, PerCorrelationId> perCorrelationIds =
-        Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+            Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
 
     /** A convenient formatter to use when time stamping output. */
     protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
@@ -273,6 +266,9 @@
     /** Flag used to indicate if this is a point to point or pub/sub ping client. */
     protected boolean _isPubSub = false;
 
+    /** Flag used to indicate if the destinations should be unique client. */
+    protected static boolean _isUnique = false;
+
     /**
      * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
      * on the same JVM using this id generator will allow them to ping on the same queues.
@@ -313,6 +309,12 @@
     protected int _txBatchSize = 1;
 
     /**
+     * Holds the number of consumers that will be attached to each topic.
+     * Each pings will result in a reply from each of the attached clients
+     */
+    static int _consumersPerTopic = 1;
+
+    /**
      * Creates a ping producer with the specified parameters, of which there are many. See their individual comments
      * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on it,
      * to send and recieve its pings and replies on. The other options are kept, and control how this pinger behaves.
@@ -339,7 +341,6 @@
      *                         possible, with no rate restriction.
      * @param pubsub           True to ping topics, false to ping queues.
      * @param unique           True to use unique destinations for each ping pong producer, false to share.
-     *
      * @throws Exception Any exceptions are allowed to fall through.
      */
     public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
@@ -358,6 +359,19 @@
                       + txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate
                       + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + "): called");
 
+        // Keep all the relevant options.
+        _persistent = persistent;
+        _messageSize = messageSize;
+        _verbose = verbose;
+        _failAfterCommit = afterCommit;
+        _failBeforeCommit = beforeCommit;
+        _failAfterSend = afterSend;
+        _failBeforeSend = beforeSend;
+        _failOnce = failOnce;
+        _txBatchSize = txBatchSize;
+        _isPubSub = pubsub;
+        _isUnique = unique;
+
         // Check that one or more destinations were specified.
         if (noOfDestinations < 1)
         {
@@ -388,18 +402,6 @@
         createProducer();
         createPingDestinations(noOfDestinations, selector, destinationName, unique);
         createReplyConsumers(getReplyDestinations(), selector);
-
-        // Keep all the remaining options.
-        _persistent = persistent;
-        _messageSize = messageSize;
-        _verbose = verbose;
-        _failAfterCommit = afterCommit;
-        _failBeforeCommit = beforeCommit;
-        _failAfterSend = afterSend;
-        _failBeforeSend = beforeSend;
-        _failOnce = failOnce;
-        _txBatchSize = txBatchSize;
-        _isPubSub = pubsub;
     }
 
     /**
@@ -407,6 +409,7 @@
      * to be started to bounce the pings back again.
      *
      * @param args The command line arguments.
+     * @throws Exception When something went wrong with the test
      */
     public static void main(String[] args) throws Exception
     {
@@ -421,7 +424,7 @@
         }
 
         String brokerDetails = config.getHost() + ":" + config.getPort();
-        String virtualpath = "test";
+        String virtualpath = "/test";
         String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
         boolean verbose = true;
         boolean transacted = config.isTransacted();
@@ -479,9 +482,9 @@
 
         // Create a ping producer to handle the request/wait/reply cycle.
         PingPongProducer pingProducer =
-            new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
-                                 transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
-                                 beforeSend, failOnce, batchSize, destCount, rate, pubsub, false);
+                new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
+                                     transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
+                                     beforeSend, failOnce, batchSize, destCount, rate, pubsub, false);
 
         pingProducer.getConnection().start();
 
@@ -511,7 +514,9 @@
                 Thread.sleep(sleepTime);
             }
             catch (InterruptedException ie)
-            { }
+            {
+                //ignore
+            }
         }
     }
 
@@ -555,11 +560,10 @@
      * @param rootName         The root of the name, or actual name if only one is being created.
      * @param unique           <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share
      *                         the numbering with all pingers on the same JVM.
-     *
      * @throws JMSException Any JMSExceptions are allowed to fall through.
      */
     public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
-                                throws JMSException
+            throws JMSException
     {
         _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations
                       + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = "
@@ -568,28 +572,32 @@
         // Create the desired number of ping destinations and consumers for them.
         for (int i = 0; i < noOfDestinations; i++)
         {
-            AMQDestination destination = null;
+            AMQDestination destination;
 
             int id;
 
             // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
             if (unique)
             {
+                _logger.debug("Creating unique destinations.");
                 id = _queueJVMSequenceID.incrementAndGet();
             }
             else
             {
+                _logger.debug("Creating shared destinations.");
                 id = _queueSharedId.incrementAndGet();
             }
 
             // Check if this is a pub/sub pinger, in which case create topics.
             if (_isPubSub)
             {
+                _logger.debug("Creating topics.");
                 destination = new AMQTopic(rootName + id);
             }
             // Otherwise this is a p2p pinger, in which case create queues.
             else
             {
+                _logger.debug("Creating queues.");
                 destination = new AMQQueue(rootName + id);
             }
 
@@ -697,11 +705,10 @@
      * @param message  The message to send.
      * @param numPings The number of ping messages to send.
      * @param timeout  The timeout in milliseconds.
-     *
      * @return The number of replies received. This may be less than the number sent if the timeout terminated the
      *         wait for all prematurely.
-     *
-     * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+     * @throws JMSException         All underlying JMSExceptions are allowed to fall through.
+     * @throws InterruptedException When interrupted by a timeout.
      */
     public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
     {
@@ -723,14 +730,13 @@
      * @param numPings             The number of ping messages to send.
      * @param timeout              The timeout in milliseconds.
      * @param messageCorrelationId The message correlation id.
-     *
      * @return The number of replies received. This may be less than the number sent if the timeout terminated the
      *         wait for all prematurely.
-     *
-     * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+     * @throws JMSException         All underlying JMSExceptions are allowed to fall through.
+     * @throws InterruptedException When interrupted by a timeout
      */
     public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
-                            throws JMSException, InterruptedException
+            throws JMSException, InterruptedException
     {
         _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
                       + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
@@ -743,7 +749,8 @@
             // chained message listener must be called before this sender can be unblocked, but that decrementing the
             // countdown needs to be done before the chained listener can be called.
             PerCorrelationId perCorrelationId = new PerCorrelationId();
-            perCorrelationId.trafficLight = new CountDownLatch(numPings + 1);
+
+            perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1);
             perCorrelationIds.put(messageCorrelationId, perCorrelationId);
 
             // Set up the current time as the start time for pinging on the correlation id. This is used to determine
@@ -763,11 +770,12 @@
                 perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
 
                 // Work out how many replies were receieved.
-                numReplies = numPings - (int) perCorrelationId.trafficLight.getCount();
-                allMessagesReceived = numReplies >= numPings;
+                numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount();
+
+                allMessagesReceived = numReplies == getExpectedNumPings(numPings);
 
-                _logger.debug("numReplies = "+ numReplies);
-                _logger.debug("allMessagesReceived = "+ allMessagesReceived);
+                _logger.debug("numReplies = " + numReplies);
+                _logger.debug("allMessagesReceived = " + allMessagesReceived);
 
                 // Recheck the timeout condition.
                 long now = System.nanoTime();
@@ -779,7 +787,7 @@
             }
             while (!timedOut && !allMessagesReceived);
 
-            if ((numReplies < numPings) && _verbose)
+            if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
             {
                 _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
             }
@@ -808,7 +816,6 @@
      * @param message              The message to send.
      * @param numPings             The number of pings to send.
      * @param messageCorrelationId A correlation id to place on all messages sent.
-     *
      * @throws JMSException All underlying JMSExceptions are allowed to fall through.
      */
     public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
@@ -864,9 +871,7 @@
         }
     }
 
-    /**
-     * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
-     */
+    /** The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each. */
     public void pingLoop()
     {
         try
@@ -909,9 +914,7 @@
         _chainedMessageListener = messageListener;
     }
 
-    /**
-     * Removes any chained message listeners from this pinger.
-     */
+    /** Removes any chained message listeners from this pinger. */
     public void removeChainedMessageListener()
     {
         _chainedMessageListener = null;
@@ -923,9 +926,7 @@
      * @param replyQueue  The reply-to destination for the message.
      * @param messageSize The desired size of the message in bytes.
      * @param persistent  <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
-     *
      * @return A freshly generated test message.
-     *
      * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
      */
     public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
@@ -947,9 +948,7 @@
         _publish = false;
     }
 
-    /**
-     * Implements a ping loop that repeatedly pings until the publish flag becomes false.
-     */
+    /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
     public void run()
     {
         // Keep running until the publish flag is cleared.
@@ -980,12 +979,12 @@
     public Thread getShutdownHook()
     {
         return new Thread(new Runnable()
+        {
+            public void run()
             {
-                public void run()
-                {
-                    stop();
-                }
-            });
+                stop();
+            }
+        });
     }
 
     /**
@@ -1003,7 +1002,6 @@
      *
      * @param destinations The destinations to listen to.
      * @param selector     A selector to filter the messages with.
-     *
      * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
      */
     public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
@@ -1015,8 +1013,8 @@
         {
             // Create a consumer for the destination and set this pinger to listen to its messages.
             MessageConsumer consumer =
-                _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
-                                                selector);
+                    _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
+                                                    selector);
             consumer.setMessageListener(this);
         }
     }
@@ -1039,19 +1037,20 @@
     /**
      * Convenience method to commit the transaction on the specified session. If the session to commit on is not
      * a transactional session, this method does nothing (unless the failover after send flag is set).
-     *
+     * <p/>
      * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit
      * is applied. This flag applies whether the pinger is transactional or not.
-     *
+     * <p/>
      * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
      * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
      * after the commit is applied. These flags will only apply if using a transactional pinger.
      *
+     * @param session The session to commit
      * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
-     *
-     * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
-     *       method, because commits only apply to transactional pingers, but fail after send applied to transactional
-     *       and non-transactional alike.
+     *                                <p/>
+     *                                //todo @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
+     *                                method, because commits only apply to transactional pingers, but fail after send applied to transactional
+     *                                and non-transactional alike.
      */
     protected void commitTx(Session session) throws JMSException
     {
@@ -1132,7 +1131,6 @@
      *
      * @param destination The destination to send to.
      * @param message     The message to send.
-     *
      * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
      */
     protected void sendMessage(Destination destination, Message message) throws JMSException
@@ -1170,17 +1168,35 @@
             System.in.read();
         }
         catch (IOException e)
-        { }
+        {
+            //ignore
+        }
 
         System.out.println("Continuing.");
     }
 
     /**
+     * This value will be changed by PingClient to represent the number of clients connected to each topic.
+     *
+     * @return int The number of consumers subscribing to each topic.
+     */
+    public int getConsumersPerTopic()
+    {
+        return _consumersPerTopic;
+    }
+
+    public int getExpectedNumPings(int numpings)
+    {
+        return numpings * getConsumersPerTopic();
+    }
+
+
+    /**
      * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's
      * {@link PingPongProducer#onMessage} method is called, the chained listener set through the
      * {@link PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected
      * count of messages with that correlation id.
-     *
+     * <p/>
      * Provided only one pinger is producing messages with that correlation id, the chained listener will always be
      * given unique message counts. It will always be called while the producer waiting for all messages to arrive is
      * still blocked.

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Mon Feb  5 06:43:14 2007
@@ -70,7 +70,7 @@
 
     /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */
     private Map<String, PerCorrelationId> perCorrelationIds =
-        Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+            Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
 
     /** Holds the batched results listener, that does logging on batch boundaries. */
     private BatchedResultsListener batchedResultsListener = null;
@@ -91,6 +91,7 @@
 
     /**
      * Compile all the tests into a test suite.
+     * @return The test suite to run. Should only contain testAsyncPingOk method. 
      */
     public static Test suite()
     {
@@ -128,6 +129,7 @@
      * all replies have been received or a time out occurs before exiting this method.
      *
      * @param numPings The number of pings to send.
+     * @throws Exception pass all errors out to the test harness  
      */
     public void testAsyncPingOk(int numPings) throws Exception
     {
@@ -151,7 +153,7 @@
         PerCorrelationId perCorrelationId = new PerCorrelationId();
         TimingController tc = getTimingController().getControllerForCurrentThread();
         perCorrelationId._tc = tc;
-        perCorrelationId._expectedCount = numPings;
+        perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
         perCorrelationIds.put(messageCorrelationId, perCorrelationId);
 
         // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
@@ -160,18 +162,18 @@
 
         // Generate a sample message of the specified size.
         ObjectMessage msg =
-            pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
-                                      testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
-                                      testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+                pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+                                          testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                                          testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
 
         // Send the requested number of messages, and wait until they have all been received.
         long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
         int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, messageCorrelationId);
 
         // Check that all the replies were received and log a fail if they were not.
-        if (numReplies < numPings)
+        if (numReplies < perCorrelationId._expectedCount)
         {
-            tc.completeTest(false, numPings - numReplies);
+            tc.completeTest(false, numPings - perCorrelationId._expectedCount);
         }
 
         // Remove the chained message listener from the ping producer.

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Mon Feb  5 06:43:14 2007
@@ -110,6 +110,7 @@
 
     /**
      * Compile all the tests into a test suite.
+     * @return The test method testPingOk.
      */
     public static Test suite()
     {
@@ -139,18 +140,18 @@
 
         // Generate a sample message. This message is already time stamped and has its reply-to destination set.
         ObjectMessage msg =
-            perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
-                                                      testParameters.getPropertyAsInteger(
-                                                          PingPongProducer.MESSAGE_SIZE_PROPNAME),
-                                                      testParameters.getPropertyAsBoolean(
-                                                          PingPongProducer.PERSISTENT_MODE_PROPNAME));
+                perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+                                                          testParameters.getPropertyAsInteger(
+                                                                  PingPongProducer.MESSAGE_SIZE_PROPNAME),
+                                                          testParameters.getPropertyAsBoolean(
+                                                                  PingPongProducer.PERSISTENT_MODE_PROPNAME));
 
         // start the test
         long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
         int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout);
 
         // Fail the test if the timeout was exceeded.
-        if (numReplies != numPings)
+        if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings))
         {
             Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
                         + numReplies);
@@ -191,7 +192,7 @@
 
             // Extract the test set up paramaeters.
             int destinationscount =
-                Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
+                    Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
 
             // This is synchronized because there is a race condition, which causes one connection to sleep if
             // all threads try to create connection concurrently.

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java Mon Feb  5 06:43:14 2007
@@ -52,7 +52,7 @@
 
     public ConcurrencyTest() throws Exception
     {
-        _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false,
+        _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false,
                                                                           new DefaultQueueRegistry()));
     }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java Mon Feb  5 06:43:14 2007
@@ -21,10 +21,6 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.DeliveryManagerTest;
 
 public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest
 {
@@ -33,7 +29,7 @@
         try
         {
             System.setProperty("concurrentdeliverymanager","true");
-            _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
+            _mgr = new ConcurrentSelectorDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
                                                                               new DefaultQueueRegistry()));
         }
         catch (Throwable t)

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java Mon Feb  5 06:43:14 2007
@@ -169,7 +169,6 @@
     {
         TestSuite suite = new TestSuite();
         suite.addTestSuite(ConcurrentDeliveryManagerTest.class);
-        suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
         return suite;
     }
 }