You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by bh...@apache.org on 2007/01/17 12:16:43 UTC

svn commit: r496991 - /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java

Author: bhupendrab
Date: Wed Jan 17 03:16:41 2007
New Revision: 496991

URL: http://svn.apache.org/viewvc?view=rev&rev=496991
Log:
added timer for callbackHandler to wait for next message before exiting

Modified:
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java?view=diff&rev=496991&r1=496990&r2=496991
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java Wed Jan 17 03:16:41 2007
@@ -40,8 +40,11 @@
  * A client that behaves as follows:
  * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
  * <li>Creates a temporary queue</li>
- * <li>Creates messages containing a property that is the name of the temporary queue</li>
- * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
+ * <li>Creates messages containing a property(reply-to) that is the name of the temporary queue</li>
+ * <li>Fires off a message on the original queue and registers the callbackHandler to listen to the response on the temporary queue</li>
+ * <li>Start the loop to send all messages</li>
+ * <li>CallbackHandler keeps listening to the responses and exits if all the messages have been received back or
+ *  if the waiting time for next message is elapsed</li>
  * </ul>
  */
 public class ServiceRequestingClient implements ExceptionListener
@@ -49,6 +52,10 @@
     private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);
 
     private long _messageIdentifier = 0;
+
+    // time for which callbackHandler should wait for a message before exiting. Default time= 60 secs
+    private static long _callbackHandlerWaitingTime = 60000;
+
     private String MESSAGE_DATA;
 
     private AMQConnection _connection;
@@ -71,20 +78,23 @@
 
     private class CallbackHandler implements MessageListener
     {
-        private int _expectedMessageCount;
-
         private int _actualMessageCount;
 
         private long _startTime;
+        // The time when the last message was received by the callbackHandler
+        private long _messageReceivedTime = 0;
+        private Object _timerCallbackHandler = new Object();
 
-        public CallbackHandler(int expectedMessageCount, long startTime)
+        public CallbackHandler(long startTime)
         {
-            _expectedMessageCount = expectedMessageCount;
             _startTime = startTime;
+            // Start the timer thread, which will keep checking if test should exit because the waiting time has elapsed
+            (new Thread(new TimerThread())).start();
         }
 
         public void onMessage(Message m)
         {
+            _messageReceivedTime = System.currentTimeMillis();
             if (_log.isDebugEnabled())
             {
                 _log.debug("Message received: " + m);
@@ -95,16 +105,15 @@
                 if (m.propertyExists("timeSent"))
                 {
                     long timeSent = Long.parseLong(m.getStringProperty("timeSent"));
-                    long now = System.currentTimeMillis();
                     if (_averageLatency == 0)
                     {
-                        _averageLatency = now - timeSent;
+                        _averageLatency = _messageReceivedTime - timeSent;
                         _log.info("Latency " + _averageLatency);
                     }
                     else
                     {
-                        _log.info("Individual latency: " + (now - timeSent));
-                        _averageLatency = (_averageLatency + (now - timeSent)) / 2;
+                        _log.info("Individual latency: " + (_messageReceivedTime - timeSent));
+                        _averageLatency = (_averageLatency + (_messageReceivedTime - timeSent)) / 2;
                         _log.info("Average latency now: " + _averageLatency);
                     }
                 }
@@ -124,27 +133,91 @@
             }
 
             checkForMessageID(m);
-            if (_actualMessageCount == _expectedMessageCount)
+
+            if (_actualMessageCount == _messageCount)
+            {
+                finishTesting(_actualMessageCount);
+            }
+        }
+
+        /**
+         * sets completed flag to true, closes the callbackHandler connection and notifies the waiter thread,
+         * so that the callbackHandler can finish listening for messages. This causes the test to finish.
+         * @param receivedMessageCount
+         */
+        private void finishTesting(int receivedMessageCount)
+        {
+            _completed = true;
+            notifyWaiter();
+            notifyTimerThread();
+
+            long timeTaken = System.currentTimeMillis() - _startTime;
+            _log.info("***** Result *****");
+            _log.info("Total messages received = " + receivedMessageCount);
+            _log.info("Total time taken to receive " + receivedMessageCount + " messages was " +
+                      timeTaken + "ms, equivalent to " +
+                      (receivedMessageCount / (timeTaken / 1000.0)) + " messages per second");
+
+            try
             {
-                _completed = true;
-                notifyWaiter();
-                long timeTaken = System.currentTimeMillis() - _startTime;
-                _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " +
-                          timeTaken + "ms, equivalent to " +
-                          (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second");
+                _connection.close();
+                _log.info("Connection closed");
+            }
+            catch (JMSException e)
+            {
+                _log.error("Error closing connection");
+            }
+        }
 
-                try
+        private void notifyTimerThread()
+        {
+            if (_timerCallbackHandler != null)
+            {
+                synchronized (_timerCallbackHandler)
                 {
-                    _connection.close();
-                    _log.info("Connection closed");
+                    _timerCallbackHandler.notify();
                 }
-                catch (JMSException e)
+            }
+        }
+
+        /**
+         * Thread class implementing the timer for callbackHandler. The thread will exit the test if the waiting time
+         * has elapsed before next message is received.
+         */
+        private class TimerThread implements Runnable
+        {
+            public void run()
+            {
+                do
                 {
-                    _log.error("Error closing connection");
+                    try
+                    {
+                        synchronized(_timerCallbackHandler)
+                        {
+                            _timerCallbackHandler.wait(_callbackHandlerWaitingTime);
+                        }
+                    }
+                    catch (InterruptedException ignore)
+                    {
+
+                    }
+
+                    // exit if callbackHandler has received all messages
+                    if (_completed)
+                    {
+                        _log.info("timer " + new java.util.Date());
+                        return;
+                    }
                 }
+                while ((System.currentTimeMillis() - _messageReceivedTime) < _callbackHandlerWaitingTime);
+
+                // waiting time has elapsed, so exit the test
+                _log.info("");
+                _log.info("Exited after waiting for " + _callbackHandlerWaitingTime/1000 + " secs");
+                finishTesting(_actualMessageCount);
             }
         }
-    }
+    } // end of CallbackHandler class
 
     /**
      * Checks if the received AMQ Message ID(delivery tag) is in sequence, by comparing it with the AMQ MessageID
@@ -230,7 +303,7 @@
             //now start the clock and the test...
             final long startTime = System.currentTimeMillis();
 
-            messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime));
+            messageConsumer.setMessageListener(new CallbackHandler(startTime));
         }
         catch (JMSException e)
         {
@@ -284,10 +357,12 @@
      */
     public static void main(String[] args)
     {
-        if (args.length < 6)
+        if ((args.length < 6) || (args.length == 8))
         {
-            System.err.println(
-                    "Usage: ServiceRequestingClient <brokerDetails> <username> <password> <vpath> <command queue name> <number of messages> [<message size>] [<P[ersistent]|N[onPersistent] (Default N)>  <T[ransacted]|N[onTransacted] (Default N)>]");
+            System.err.println("Usage: ServiceRequestingClient <brokerDetails> <username> <password> <vpath> " +
+                    "<command queue name> <number of messages> [<message size>] " +
+                    "[<P[ersistent]|N[onPersistent] (Default N)>  <T[ransacted]|N[onTransacted] (Default N)>] " +
+                    "[<waiting time for response in sec (default 60 sec)>]");
             System.exit(1);
         }
         try
@@ -296,18 +371,24 @@
             boolean transactedMode = false;
             int deliveryMode = DeliveryMode.NON_PERSISTENT;
 
+            if (args.length > 6)
+            {
+                messageSize = Integer.parseInt(args[6]);
+            }
             if (args.length > 7)
             {
-                deliveryMode = args[args.length - 2].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT
+                deliveryMode = args[7].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT
                                : DeliveryMode.NON_PERSISTENT;
 
-                transactedMode = args[args.length - 1].toUpperCase().charAt(0) == 'T' ? true : false;
+                transactedMode = args[8].toUpperCase().charAt(0) == 'T' ? true : false;
             }
 
-            if ((args.length == 9) ||(args.length == 7))
+            if (args.length > 9)
             {
-                messageSize = Integer.parseInt(args[6]);
-            }          
+                _callbackHandlerWaitingTime = Long.parseLong(args[9]) * 1000;
+            }
+
+            _log.info("Each message size = " + messageSize + " bytes");
 
             InetAddress address = InetAddress.getLocalHost();
             String clientID = address.getHostName() + System.currentTimeMillis();
@@ -316,6 +397,8 @@
                                                                          messageSize);
             Object waiter = new Object();
             client.run(waiter);
+
+            // Start a thread to
             synchronized (waiter)
             {
                 while (!client.isCompleted())