You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/03/08 17:29:23 UTC

svn commit: r920389 - in /qpid/branches/0.5.x-dev/qpid/java: junit-toolkit/src/main/org/apache/qpid/junit/extensions/ perftests/src/main/java/org/apache/qpid/ping/ perftests/src/main/java/org/apache/qpid/requestreply/ perftests/src/main/java/org/apache...

Author: ritchiem
Date: Mon Mar  8 16:29:23 2010
New Revision: 920389

URL: http://svn.apache.org/viewvc?rev=920389&view=rev
Log:
QPID-2421 : Fixed preFill when numConsumers > 1. This requried the introduction of a new phase in TestThreadAware : postThreadSetup. This ensures that all the threads have correctly setup their clients and so registered their topic subscribers. We can then preFill in the postThreadSetup and reliably receive the data from each of the client's publishers. 
Added new sendOnly parameter to allow send only testing with topics where numConsumers must be set.

Modified:
    qpid/branches/0.5.x-dev/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java
    qpid/branches/0.5.x-dev/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java
    qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
    qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
    qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java

Modified: qpid/branches/0.5.x-dev/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java?rev=920389&r1=920388&r2=920389&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/ScaledTestDecorator.java Mon Mar  8 16:29:23 2010
@@ -323,6 +323,22 @@
                 // Wait until all test threads have completed their setups.
                 barrier.await();
 
+
+                // Call setup on all underlying tests in the suite that are thread aware.
+                for (Test childTest : test.getAllUnderlyingTests())
+                {
+                    // Check that the test is concurrency aware, so provides a setup method to call.
+                    if (childTest instanceof TestThreadAware)
+                    {
+                        // Call the tests post thread setup.
+                        TestThreadAware setupTest = (TestThreadAware) childTest;
+                        setupTest.postThreadSetUp();
+                    }
+                }
+
+                // Wait until all test threads have completed their prefill.
+                barrier.await();
+
                 // Start timing the test batch, only after thread setups have completed.
                 if (testResult instanceof TKTestResult)
                 {

Modified: qpid/branches/0.5.x-dev/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java?rev=920389&r1=920388&r2=920389&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/junit-toolkit/src/main/org/apache/qpid/junit/extensions/TestThreadAware.java Mon Mar  8 16:29:23 2010
@@ -43,6 +43,11 @@
     public void threadSetUp();
 
     /**
+     * Called after all threads have completed their setup.
+     */
+    public void postThreadSetUp();
+
+    /**
      * Called when a test thread is destroyed.
      */
     public void threadTearDown();

Modified: qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java?rev=920389&r1=920388&r2=920389&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java Mon Mar  8 16:29:23 2010
@@ -154,15 +154,25 @@
 
 
         // Initialize the count and timing controller for the new correlation id.
+        // This perCorrelationId is only used for controlling the test.
+        // The PingClient itself uses its own perCorrelationId see in PingPongProducer
         PerCorrelationId perCorrelationId = new PerCorrelationId();
         TimingController tc = getTimingController().getControllerForCurrentThread();
         perCorrelationId._tc = tc;
         perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill);
         perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
 
-        // Start the client that will have been paused due to preFill requirement.
-        // or if we have not yet started client because messages are sitting on broker. 
-        if (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+        // Must be called before pingAndWaitForReply to setup the CorrelationID.
+        // This is required because pingClient.start() will start all client threads
+        // This means that the CorrelationID must be registered before hand.
+        pingClient.setupCorrelationID(perThreadSetup._correlationId, perCorrelationId._expectedCount);
+
+        // Start the client connection if:
+        // 1) we are not in a SEND_ONLY test.
+        // 2) if we have not yet started client because messages are sitting on broker.
+        // This is either due to a preFill or a consume only test.
+        if (!testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME) &&
+            (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME)))
         {
             pingClient.start();
         }
@@ -285,7 +295,7 @@
                     {
                         // Record the total latency for the batch.
                         // if batchSize=1 then this will just be the message latency
-                        tc.completeTest(true, receivedInBatch, null, _batchLatency);
+                        tc.completeTest(true, receivedInBatch, null, _batchSize == 1 ? latency : _batchLatency);
                         // Reset latency
                         _batchLatency = 0;
                     }

Modified: qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java?rev=920389&r1=920388&r2=920389&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java Mon Mar  8 16:29:23 2010
@@ -122,9 +122,7 @@
         }
     }
 
-    /**
-     * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
-     */
+    /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */
     public void threadSetUp()
     {
         _logger.debug("public void threadSetUp(): called");
@@ -142,9 +140,28 @@
                 perThreadSetup._pingClient.establishConnection(true, true);
             }
 
-            // Prefill the broker unless we are in consume only mode. 
-            int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
-            if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0)
+            // Attach the per-thread set to the thread.
+            threadSetup.set(perThreadSetup);
+        }
+        catch (Exception e)
+        {
+            _logger.warn("There was an exception during per thread setup.", e);
+        }
+    }
+
+    /**
+     * Called after all threads have completed their setup.
+     */    
+    public void postThreadSetUp()
+    {
+        _logger.debug("public void postThreadSetUp(): called");
+
+        PerThreadSetup perThreadSetup = threadSetup.get();
+        // Prefill the broker unless we are in consume only mode.
+        int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
+        if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0)
+        {
+            try
             {
                 // Manually set the correlation ID to 1. This is not ideal but it is the
                 // value that the main test loop will use.
@@ -156,8 +173,12 @@
 
                 long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME);
 
-                //  Only delay if we have consumers and a delayBeforeConsume
-                if ((testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0)
+                // Only delay if we are
+                //  not doing send only
+                //  and we have consumers
+                //  and a delayBeforeConsume
+                if (!(testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME))
+                    && (testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0)
                     && delayBeforeConsume > 0)
                 {
 
@@ -170,11 +191,11 @@
                             long minutes = delayBeforeConsume / 60000;
                             long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000;
                             long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000);
-                                _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test.");
+                            _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test.");
                         }
                         else
                         {
-                                _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test.");
+                            _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test.");
                         }
                     }
 
@@ -190,22 +211,30 @@
                 // only when the test method is executed will the correlationID map be set up and ready to consume
                 // the messages we have sent here.
             }
-            else //Only start the consumer if we are not preFilling.
+            catch (Exception e)
             {
-                // Only start the consumer if we don't have messages waiting to be received.
-                // we need to set up the correlationID mapping first
-                if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+                _logger.warn("There was an exception during per thread setup.", e);
+            }
+        }
+        else //Only start the consumer if we are not preFilling.
+        {
+            // Start the consumers, unless we have data on the broker
+            // already this is signified by being in consume_only, we will
+            //  start the clients after setting up the correlation IDs.
+            // We should also not start the clients if we are in Send only
+            if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) &&
+                !(testParameters.getPropertyAsBoolean(PingPongProducer.SEND_ONLY_PROPNAME)))
+            {
+                // Start the client connection
+                try
                 {
-                    // Start the client connection
                     perThreadSetup._pingClient.start();
                 }
+                catch (JMSException e)
+                {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
             }
-            // Attach the per-thread set to the thread.
-            threadSetup.set(perThreadSetup);
-        }
-        catch (Exception e)
-        {
-            _logger.warn("There was an exception during per thread setup.", e);
         }
     }
 

Modified: qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=920389&r1=920388&r2=920389&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Mon Mar  8 16:29:23 2010
@@ -342,6 +342,11 @@
     /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */
     public static final boolean CONSUME_ONLY_DEFAULT = false;
 
+    /** Holds the name of the property to get when no messasges should be sent. */
+    public static final String SEND_ONLY_PROPNAME = "sendOnly";
+
+    /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */
+    public static final boolean SEND_ONLY_DEFAULT = false;
 
     /** Holds the default configuration properties. */
     public static ParsedProperties defaults = new ParsedProperties();
@@ -381,7 +386,8 @@
         defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
         defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT);
         defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME);
-        defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT);        
+        defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT);
+        defaults.setPropertyIfNull(SEND_ONLY_PROPNAME, SEND_ONLY_DEFAULT);
     }
 
     /** Allows setting of client ID on the connection, rather than through the connection URL. */
@@ -490,10 +496,15 @@
     /**
      * Holds a boolean value of wither this test should just consume, i.e. skips
      * sending messages, but still expects to receive the specified number.
-     * Use in conjuction with numConsumers=0 to fill the broker.
      */
     private boolean _consumeOnly;
 
+    /**
+     * Holds a boolean value of wither this test should just send, i.e. skips
+     * consuming messages, but still creates clients just doesn't start them.
+     */
+    private boolean _sendOnly;
+
 
     /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
     private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
@@ -631,6 +642,7 @@
         _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME);
         _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME);
         _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME);
+        _sendOnly = properties.getPropertyAsBoolean(SEND_ONLY_PROPNAME);
 
         // Check that one or more destinations were specified.
         if (_noOfDestinations < 1)
@@ -1072,8 +1084,8 @@
                 }
                 else
                 {
-                    log.warn("Got unexpected message with correlationId: " + correlationID);
-                    log.warn("Map contains:" + perCorrelationIds.entrySet());
+                    log.warn(consumerNo + " Got unexpected message with correlationId: " + correlationID);
+                    log.warn(consumerNo + " Map contains:" + perCorrelationIds.entrySet());
                 }
             }
             else
@@ -1092,6 +1104,21 @@
         }
     }
 
+    public void setupCorrelationID(String correlationId, int expectedCount)
+    {
+        PerCorrelationId perCorrelationId = new PerCorrelationId();
+
+        // Create a count down latch to count the number of replies with. This is created before the messages are
+        // sent so that the replies cannot be received before the count down is created.
+        // One is added to this, so that the last reply becomes a special case. The special case is that the
+        // chained message listener must be called before this sender can be unblocked, but that decrementing the
+        // countdown needs to be done before the chained listener can be called.
+        perCorrelationId.trafficLight = new CountDownLatch(expectedCount + 1);
+
+        perCorrelationIds.put(correlationId, perCorrelationId);
+    }
+
+
     /**
      * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
      * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
@@ -1122,33 +1149,24 @@
     public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId)
         throws JMSException, InterruptedException
     {
-
-        // If we are runnning a consumeOnly test then don't send any messages
-
-
         /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
             + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
 
+        int totalPingsRequested = numPings + preFill;
+
         // Generate a unique correlation id to put on the messages before sending them, if one was not specified.
         if (messageCorrelationId == null)
         {
             messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
+
+            setupCorrelationID(messageCorrelationId, getExpectedNumPings(totalPingsRequested));
         }
 
         try
         {
             // NDC.push("prod");
 
-            // Create a count down latch to count the number of replies with. This is created before the messages are
-            // sent so that the replies cannot be received before the count down is created.
-            // One is added to this, so that the last reply becomes a special case. The special case is that the
-            // chained message listener must be called before this sender can be unblocked, but that decrementing the
-            // countdown needs to be done before the chained listener can be called.
-            PerCorrelationId perCorrelationId = new PerCorrelationId();
-
-            int totalPingsRequested = numPings + preFill;
-            perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(totalPingsRequested) + 1);
-            perCorrelationIds.put(messageCorrelationId, perCorrelationId);
+            PerCorrelationId perCorrelationId = perCorrelationIds.get(messageCorrelationId);
 
             // Set up the current time as the start time for pinging on the correlation id. This is used to determine
             // timeouts.
@@ -1167,9 +1185,9 @@
             //
             // Return the number of requested messages, this will let the test
             // report a pass.
-            if (_noOfConsumers == 0)
+            if (_noOfConsumers == 0 || _sendOnly)
             {
-                return totalPingsRequested;
+                return getExpectedNumPings(totalPingsRequested);
             }
 
             do

Modified: qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java?rev=920389&r1=920388&r2=920389&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java Mon Mar  8 16:29:23 2010
@@ -165,6 +165,14 @@
     }
 
     /**
+     * Called after all threads have completed their setup.
+     */
+    public void postThreadSetUp()
+    {
+        //Nothing to do here, potentially implement preFill as per PingTestPerf.
+    }
+
+    /**
      * Called when a test thread is destroyed.
      */
     public void threadTearDown()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org