You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by bh...@apache.org on 2007/03/01 18:57:25 UTC

svn commit: r513419 - in /incubator/qpid/branches/perftesting/qpid/java: broker/etc/ broker/src/main/java/org/apache/qpid/server/queue/ perftests/src/main/java/org/apache/qpid/ping/ perftests/src/main/java/org/apache/qpid/requestreply/ perftests/src/te...

Author: bhupendrab
Date: Thu Mar  1 09:57:24 2007
New Revision: 513419

URL: http://svn.apache.org/viewvc?view=rev&rev=513419
Log:
QPID-395 : pause between batches of messages
QPID-367 : config updation for message alerts

Modified:
    incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
    incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
    incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
    incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml Thu Mar  1 09:57:24 2007
@@ -22,10 +22,12 @@
 <virtualhosts>
     <virtualhost>
         <name>localhost</name>
-
         <localhost>
             <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
             <maximumMessageCount>5000</maximumMessageCount>
+            <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+            <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+            <maximumMessageAge>600000</maximumMessageAge>
             <queue>
                 <name>queue</name>
                 <queue>
@@ -47,11 +49,13 @@
         </localhost>
     </virtualhost>
     <virtualhost>
-        <name>development</name>
-        
+        <name>development</name>        
         <development>
-        <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
-        <maximumMessageCount>5000</maximumMessageCount>
+            <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+            <maximumMessageCount>5000</maximumMessageCount>
+            <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+            <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+            <maximumMessageAge>600000</maximumMessageAge>
             <queue>
                 <name>queue</name>
                 <queue>
@@ -72,30 +76,32 @@
             </queue>
         </development>
     </virtualhost>
-        <virtualhost>
-            <name>test</name>
-            
-            <test>
+    <virtualhost>
+        <name>test</name>
+        <test>
             <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
             <maximumMessageCount>5000</maximumMessageCount>
+            <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+            <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+            <maximumMessageAge>600000</maximumMessageAge>
+            <queue>
+                <name>queue</name>
                 <queue>
-                    <name>queue</name>
-                    <queue>
-                        <exchange>amq.direct</exchange>
-                        <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
-                        <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
-                        <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
-                    </queue>
-                </queue>
-                <queue>
-                    <name>ping</name>
-                    <ping>
-                        <exchange>amq.direct</exchange>
-                        <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
-                        <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
-                        <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
-                    </ping>
+                    <exchange>amq.direct</exchange>
+                    <maximumQueueDepth>4235264</maximumQueueDepth>  <!-- 4Mb -->
+                    <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+                    <maximumMessageAge>600000</maximumMessageAge>  <!-- 10 mins -->
                 </queue>
-            </test>
+            </queue>
+            <queue>
+                <name>ping</name>
+                <ping>
+                    <exchange>amq.direct</exchange>
+                    <maximumQueueDepth>22352</maximumQueueDepth>  <!-- 4Mb -->
+                    <maximumMessageSize>11176</maximumMessageSize> <!-- 2Mb -->
+                    <maximumMessageAge>6000</maximumMessageAge>  <!-- 10 mins -->
+                </ping>
+            </queue>
+        </test>
     </virtualhost>
 </virtualhosts>

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Mar  1 09:57:24 2007
@@ -79,7 +79,7 @@
 
     /** max allowed number of messages on a queue. */
     @Configured(path = "maximumMessageCount", defaultValue = "0")
-    public int _maximumMessageCount;
+    public int _maximumMessageCount = 5000;
 
     /** max queue depth for the queue */
     @Configured(path = "maximumQueueDepth", defaultValue = "0")
@@ -89,7 +89,7 @@
     * maximum message age before alerts occur
     */
     @Configured(path = "maximumMessageAge", defaultValue = "0")
-    public long _maximumMessageAge = 30000; //0
+    public long _maximumMessageAge = 30000;
 
     /*
      * the minimum interval between sending out consequetive alerts of the same type
@@ -506,10 +506,10 @@
 
     protected void updateReceivedMessageCount(AMQMessage msg)
     {
-        if (msg.isRedelivered())
-            return;
-
-        _totalMessagesReceived++;
+        if (!msg.isRedelivered())
+        {
+            _totalMessagesReceived++;
+        }
         _managedObject.checkForNotification(msg);
     }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Mar  1 09:57:24 2007
@@ -177,7 +177,8 @@
 
     public Long getMaximumQueueDepth()
     {
-        return _queue.getMaximumQueueDepth();
+        long queueDepthInBytes = _queue.getMaximumQueueDepth();
+        return queueDepthInBytes >> 10;
     }
 
     public void setMaximumQueueDepth(Long value)

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java Thu Mar  1 09:57:24 2007
@@ -74,11 +74,11 @@
                       String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
                       boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
                       int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique,
-                      int ackMode) throws Exception
+                      int ackMode, long pausetime) throws Exception
     {
         super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
               verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
-              pubsub, unique, ackMode);
+              pubsub, unique, ackMode, pausetime);
 
         _pingClientCount++;
     }

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Thu Mar  1 09:57:24 2007
@@ -146,6 +146,8 @@
 
     public static final String ACK_MODE_PROPNAME = "ackMode";
 
+    public static final String PAUSE_AFTER_BATCH_PROPNAME = "pausetimeAfterEachBatch";
+
     /** Used to set up a default message size. */
     public static final int DEFAULT_MESSAGE_SIZE = 0;
 
@@ -186,7 +188,7 @@
     public static final String DEFAULT_BROKER = "tcp://localhost:5672";
 
     /** Holds the default virtual path for the test. */
-    public static final String DEFAULT_VIRTUAL_PATH = "test";
+    public static final String DEFAULT_VIRTUAL_PATH = "/test";
 
     /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
     public static final boolean DEFAULT_PUBSUB = false;
@@ -314,6 +316,8 @@
     /** Holds the number of sends that should be performed in every transaction when using transactions. */
     protected int _txBatchSize = 1;
 
+    private static long _pausetimeAfterEachBatch = 0;
+
     /**
      * Holds the number of consumers that will be attached to each topic.
      * Each pings will result in a reply from each of the attached clients
@@ -353,7 +357,7 @@
                             String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
                             boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
                             boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
-                            boolean pubsub, boolean unique, int ackMode) throws Exception
+                            boolean pubsub, boolean unique, int ackMode, long pause) throws Exception
     {
         _logger.debug("public PingPongProducer(String brokerDetails = " + brokerDetails + ", String username = " + username
                       + ", String password = " + password + ", String virtualpath = " + virtualpath
@@ -378,6 +382,7 @@
         _txBatchSize = txBatchSize;
         _isPubSub = pubsub;
         _isUnique = unique;
+        _pausetimeAfterEachBatch = pause;
         if (ackMode != 0)
         {
             _ackMode = ackMode;
@@ -435,7 +440,7 @@
         }
 
         String brokerDetails = config.getHost() + ":" + config.getPort();
-        String virtualpath = "/test";
+        String virtualpath = DEFAULT_VIRTUAL_PATH;
         String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
         boolean verbose = true;
         boolean transacted = config.isTransacted();
@@ -495,7 +500,7 @@
         PingPongProducer pingProducer =
                 new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
                                      transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
-                                     beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0);
+                                     beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0, 0);
 
         pingProducer.getConnection().start();
 
@@ -732,6 +737,12 @@
         return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
     }
 
+    public int pingAndWaitForReply(int numPings, long timeout, String messageCorrelationId)
+            throws JMSException, InterruptedException
+    {
+        return pingAndWaitForReply(null, numPings, timeout, messageCorrelationId);   
+    }
+
     /**
      * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
      * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
@@ -834,6 +845,11 @@
         _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
                       + ", String messageCorrelationId = " + messageCorrelationId + "): called");
 
+        if (message == null)
+        {
+            message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
+        }
+
         message.setJMSCorrelationID(messageCorrelationId);
 
         // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
@@ -865,6 +881,12 @@
             {
                 commitTx(_producerSession);
                 committed = true;
+                /* This pause is required for some cases. eg in load testing when sessions are non-transacted the
+                   Mina IO layer can't clear the cache in time. So this pause gives enough time for mina to clear
+                   the cache (without this mina throws OutOfMemoryError). pause() will check if time is != 0
+                */
+                pause(_pausetimeAfterEachBatch);
+                //_logger.info("committed " + _txBatchSize + " " + i);
             }
 
             // Spew out per message timings on every message sonly in verbose mode.
@@ -909,10 +931,10 @@
         }
     }
 
-    /*public Destination getReplyDestination()
+    public Destination getReplyDestination()
     {
-        return _replyDestination;
-    }*/
+        return getReplyDestinations().get(0);
+    }
 
     /**
      * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
@@ -1095,7 +1117,9 @@
                     doFailover();
                 }
 
+                long l = System.currentTimeMillis();
                 session.commit();
+                _logger.debug("Time taken to commit :" + (System.currentTimeMillis() - l) + " ms" );
 
                 if (_failAfterCommit)
                 {

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Thu Mar  1 09:57:24 2007
@@ -158,30 +158,16 @@
         perCorrelationId._tc = tc;
         perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
         perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
-        
-        // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
-        // messages.
-        //pingClient.setChainedMessageListener(batchedResultsListener);
-
-        // Generate a sample message of the specified size.
-        perThreadSetup._message =
-                pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
-                                          testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
-                                          testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
-
 
         // Send the requested number of messages, and wait until they have all been received.
         long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
-        int numReplies = pingClient.pingAndWaitForReply(perThreadSetup._message, numPings, timeout, perThreadSetup._correlationId);
+        int numReplies = pingClient.pingAndWaitForReply(numPings, timeout, perThreadSetup._correlationId);
 
         // Check that all the replies were received and log a fail if they were not.
         if (numReplies < perCorrelationId._expectedCount)
         {
             perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount);
         }
-
-        // Remove the chained message listener from the ping producer.
-        //pingClient.removeChainedMessageListener();
 
         // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
         perCorrelationIds.remove(perThreadSetup._correlationId);

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Thu Mar  1 09:57:24 2007
@@ -108,6 +108,7 @@
         testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_PROPNAME, PingPongProducer.DEFAULT_UNIQUE);
         testParameters.setSysPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
                                               Integer.toString(PingPongProducer.DEFAULT_ACK_MODE));
+        testParameters.setSysPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, 0l);
     }
 
     /**
@@ -192,6 +193,7 @@
             Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
             boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_PROPNAME);
             int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
+            int pausetime = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
 
             // Extract the test set up paramaeters.
             int destinationscount =
@@ -206,7 +208,7 @@
                                                             selector, transacted, persistent, messageSize, verbose,
                                                             failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
                                                             failOnce, batchSize, destinationscount, rate, pubsub,
-                                                            unique, ackMode);
+                                                            unique, ackMode, pausetime);
             }
             // Start the client connection
             perThreadSetup._pingClient.getConnection().start();
@@ -256,7 +258,6 @@
          * Holds the test ping client.
          */
         protected PingClient _pingClient;
-        protected Message _message;
         protected String _correlationId;
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java Thu Mar  1 09:57:24 2007
@@ -113,6 +113,7 @@
         ParsedProperties.setSysPropertyIfNull(PingPongProducer.UNIQUE_PROPNAME, Boolean.toString(PingPongProducer.DEFAULT_UNIQUE));
         ParsedProperties.setSysPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
                                               Integer.toString(PingPongProducer.DEFAULT_ACK_MODE));
+        ParsedProperties.setSysPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, 0l);
     }
 
     /**
@@ -191,6 +192,7 @@
             Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
             boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_PROPNAME);
             int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
+            long pause = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
 
             synchronized (this)
             {
@@ -209,7 +211,7 @@
                                                                         messageSize, verbose, failAfterCommit,
                                                                         failBeforeCommit, failAfterSend, failBeforeSend,
                                                                         failOnce, batchSize, 0, rate, pubsub,
-                                                                        unique, ackMode);
+                                                                        unique, ackMode, pause);
                 perThreadSetup._testPingProducer.getConnection().start();
             }