You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/11/29 23:46:31 UTC

svn commit: r599611 - in /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf: JMSAsyncConsumer.java JMSConsumer.java JMSProducer.java JMSSyncConsumer.java MessageConsumerTest.java MessageProducerTest.java Options.java

Author: rajith
Date: Thu Nov 29 14:46:28 2007
New Revision: 599611

URL: http://svn.apache.org/viewvc?rev=599611&view=rev
Log:
Removed all the fancy threading with the test cases as these interfer with the test cases

Removed:
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java
Modified:
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java?rev=599611&r1=599610&r2=599611&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java Thu Nov 29 14:46:28 2007
@@ -4,114 +4,96 @@
 import java.io.IOException;
 import java.sql.Date;
 import java.text.SimpleDateFormat;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
 
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.message.TestMessageFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MessageConsumerTest extends Options implements Runnable
+public class MessageConsumerTest extends Options implements MessageListener
 {
     private static final Logger _logger = LoggerFactory.getLogger(MessageConsumerTest.class);
     private SimpleDateFormat df = new SimpleDateFormat("h:mm a");
 
-    private Map<Integer, JMSConsumer> _consumers = new ConcurrentHashMap<Integer, JMSConsumer>();
-    private int _count;
     String _logFileName;
-    private long _gracePeriod = 5 * 60 * 1000;
     long _startTime;
     long _totalMsgCount;
+    long _intervalCount;
 
-    public void start() throws Exception
-    {
-        this.parseOptions();
-        boolean useSameDest = true;
-        _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis();
-
-        // use each destination with a different consumerucer
-        if (_consumerCount == destArray.length)
-        {
-            useSameDest = false;
-        }
-        for (; _count < _consumerCount; _count++)
-        {
-            createAndStartConsumer(useSameDest ? destArray[0] : destArray[_count]);
-        }
-    }
-
-    private void createAndStartConsumer(String routingKey) throws Exception
-    {
-        AMQConnection con = ConnectionUtility.getInstance().getConnection();
-        con.start();
-        Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(con,routingKey) : new AMQTopic(con,routingKey);
-        JMSConsumer consumer;
-        if (_synchronous)
-        {
-            consumer = new JMSSyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE);
-            Thread t = new Thread((JMSSyncConsumer) consumer);
-            t.setName("JMSSyncConsumer-" + _count);
-            t.start();
-        }
-        else
-        {
-            consumer = new JMSAsyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE);
-        }
-        _consumers.put(_count, consumer);
-    }
-
-    private void startTimerThread()
-    {
-        _startTime = System.currentTimeMillis();
-        if(Boolean.getBoolean("collect_stats"))
-        {
-            Thread t = new Thread(this);
-            t.setName("MessageConsumerTest-TimerThread");
-            t.start();
-        }
-        try
-        {
-            printSummary();
-        }
-        catch(Exception e)
-        {
-            e.printStackTrace();
-        }
+    private Connection _connection;
+    private Session _session;
+    private BytesMessage _payload;
+    private MessageConsumer _consumer;
+    private boolean _verifyOrder = false;
+
+    public void init() throws Exception
+    {
+       this.parseOptions();
+       _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis();
+
+       AMQConnection _connection = ConnectionUtility.getInstance().getConnection();
+       _connection.start();
+       Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(_connection,_destination) : new AMQTopic(_connection,_destination);
+       _session = _connection.createSession(_transacted, Session.AUTO_ACKNOWLEDGE);
+       _payload = TestMessageFactory.newBytesMessage(_session, _messageSize);
+       _consumer = _session.createConsumer(dest);
+       if(!_synchronous)
+       {
+           _consumer.setMessageListener(this);
+       }
+       _verifyOrder = Boolean.getBoolean("verifyOrder");
+
+       _startTime = System.currentTimeMillis();
+       boolean run = true;
+       if(Boolean.getBoolean("collect_stats"))
+       {
+           printHeading();
+           runReaper();
+       }
     }
 
-    public void run()
+    public void onMessage(Message message)
     {
-        boolean run = true;
-        printHeading();
-        runReaper();
         try
         {
-            while (run)
+            /* long msgId = Integer.parseInt(message.getJMSMessageID());
+            if (_verifyOrder && _totalMsgCount+1 != msgId)
             {
-                Thread.sleep(_logDuration);
-                runReaper();
-
-                if (System.currentTimeMillis() + _gracePeriod - _startTime > _expiry)
+                _logger.error("Error : Message received out of order in JMSSyncConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1);
+            }*/
+            message = null;
+            _totalMsgCount ++;
+            _intervalCount++;
+            if(_intervalCount >= _logFrequency)
+            {
+                _intervalCount = 0;
+                if (Boolean.getBoolean("collect_stats"))
                 {
-                    // time to stop the test.
-                    for (Integer id : _consumers.keySet())
-                    {
-                        JMSConsumer consumer = _consumers.get(id);
-                        consumer.stopConsuming();
-                    }
                     runReaper();
-                    run = false;
+                }
+                if (System.currentTimeMillis() - _startTime >= _expiry)
+                {
+                    _session.close();
+                    _connection.stop();
+                    printSummary();
+                    return;
                 }
             }
         }
-        catch (InterruptedException e)
+        catch(Exception e)
         {
-            _logger.error("The timer thread exited", e);
+            e.printStackTrace();
         }
     }
 
@@ -119,16 +101,6 @@
     {
         try
         {
-            long totalMsgCountThisInterval = 0;
-
-            for (Integer id : _consumers.keySet())
-            {
-                JMSConsumer consumer = _consumers.get(id);
-                totalMsgCountThisInterval = totalMsgCountThisInterval + consumer.getCurrentMessageCount();
-
-            }
-            _totalMsgCount = _totalMsgCount + totalMsgCountThisInterval;
-
             FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true);
             StringBuffer buf = new StringBuffer();
             Date d = new Date(System.currentTimeMillis());
@@ -137,8 +109,6 @@
             buf.append(d.getTime()).append(",");
             buf.append(_totalMsgCount).append(",");
             buf.append(_totalMsgCount*1000 /totaltime).append(",");
-            buf.append(totalMsgCountThisInterval).append(",");
-            buf.append(totalMsgCountThisInterval*1000/_logDuration).append(",");
             buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n");
             buf.append("\n");
             _memoryLog.write(buf.toString());
@@ -156,7 +126,7 @@
         try
         {
             FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true);
-            String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),interval count,interval rate (msg/sec),memory";
+            String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),memory";
             _memoryLog.write(s);
             _memoryLog.close();
         }
@@ -166,33 +136,31 @@
         }
     }
 
-    private void printSummary() throws Exception
+    private void printSummary()
     {
-        if (Boolean.getBoolean("collect_stats"))
+        try
         {
-            for (Integer id : _consumers.keySet())
-            {
-                JMSConsumer consumer = _consumers.get(id);
-                _totalMsgCount = _totalMsgCount + consumer.getCurrentMessageCount();
 
-            }
+            long current = System.currentTimeMillis();
+            double time = current - _startTime;
+            double ratio = _totalMsgCount*1000/time;
+            FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
+
+            StringBuffer buf = new StringBuffer("MessageProducerTest \n Test started at : ");
+            buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
+            Date d = new Date(current);
+            buf.append(df.format(d)).append("\n Total Time taken (ms):");
+            buf.append(time).append("\n Total messages sent:");
+            buf.append(_totalMsgCount).append("\n producer rate:");
+            buf.append(ratio).append("\n");
+            _summaryLog.write(buf.toString());
+            System.out.println("---------- Test Ended -------------");
+            _summaryLog.close();
+        }
+        catch(Exception e)
+        {
+            e.printStackTrace();
         }
-
-        long current = System.currentTimeMillis();
-        double time = current - _startTime;
-        double ratio = _totalMsgCount*1000/time;
-        FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
-
-        StringBuffer buf = new StringBuffer("MessageConsumerTest \n Test started at : ");
-        buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
-        Date d = new Date(current);
-        buf.append(df.format(d)).append("\n Total Time taken (ms):");
-        buf.append(time).append("\n Total messages sent:");
-        buf.append(_totalMsgCount).append("\n consumer rate:");
-        buf.append(ratio).append("\n");
-        _summaryLog.write(buf.toString());
-        System.out.println("---------- Test Ended -------------");
-        _summaryLog.close();
     }
 
     public static void main(String[] args)
@@ -200,8 +168,7 @@
         try
         {
             MessageConsumerTest test = new MessageConsumerTest();
-            test.start();
-            test.startTimerThread();
+            test.init();
         }
         catch (Exception e)
         {

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java?rev=599611&r1=599610&r2=599611&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java Thu Nov 29 14:46:28 2007
@@ -7,119 +7,97 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
 
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.message.TestMessageFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MessageProducerTest extends Options implements Runnable
+public class MessageProducerTest extends Options
 {
     private static final Logger _logger = LoggerFactory.getLogger(MessageProducerTest.class);
     private SimpleDateFormat df = new SimpleDateFormat("h:mm a");
 
-    private Map<Integer,JMSProducer> _producers = new ConcurrentHashMap<Integer,JMSProducer>();
-    private int _count;
     String _logFileName;
     long _startTime;
     long _totalMsgCount;
+    long _intervalCount;
 
-    public void start() throws Exception
+    private Connection _connection;
+    private Session _session;
+    private BytesMessage _payload;
+    private MessageProducer _producer;
+
+    public void init() throws Exception
     {
        this.parseOptions();
-       boolean useSameDest = true;
        _logFileName = _logFilePath + "/MessageProducerTest_" + System.currentTimeMillis();
 
-       // use each destination with a different producer
-       if (_producerCount == destArray.length)
-       {
-           useSameDest = false;
-       }
-       for (;_count < _producerCount;_count++)
-       {
-           createAndStartProducer(useSameDest?destArray[0]:destArray[_count]);
-       }
+       AMQConnection _connection = ConnectionUtility.getInstance().getConnection();
+       _connection.start();
+       Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(_connection,_destination) : new AMQTopic(_connection,_destination);
+       _session = _connection.createSession(_transacted, Session.AUTO_ACKNOWLEDGE);
+       _payload = TestMessageFactory.newBytesMessage(_session, _messageSize);
+       _producer = _session.createProducer(dest);
+       // this should speedup the message producer
+       _producer.setDisableMessageTimestamp(true);
     }
 
-    private void createAndStartProducer(String routingKey)throws Exception
-    {
-        AMQConnection con = ConnectionUtility.getInstance().getConnection();
-        con.start();
-        Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(con,routingKey) : new AMQTopic(con,routingKey);
-        JMSProducer prod = new JMSProducer(String.valueOf(_count),(Connection)con, dest,_messageSize, _transacted);
-        Thread t = new Thread(prod);
-        t.setName("JMSProducer-"+_count);
-        t.start();
-        _producers.put(_count, prod);
-    }
-
-    private void startTimerThread()
+    public void run()
     {
         _startTime = System.currentTimeMillis();
+        boolean run = true;
         if(Boolean.getBoolean("collect_stats"))
         {
-            Thread t = new Thread(this);
-            t.setName("MessageProducerTest-TimerThread");
-            t.start();
-        }
-        try
-        {
-            printSummary();
+            printHeading();
+            runReaper();
         }
-        catch(Exception e)
-        {
-            e.printStackTrace();
-        }
-    }
 
-    public void run()
-    {
-        boolean run = true;
-        printHeading();
-        runReaper();
         try
         {
             while (run)
             {
-                Thread.sleep(_logDuration);
-                runReaper();
+                _payload.setJMSMessageID(String.valueOf(_totalMsgCount+1));
+                _producer.send(_payload);
+                _totalMsgCount ++;
+                _intervalCount ++;
 
-                if (System.currentTimeMillis() - _startTime > _expiry)
+                // check every x messages to see if times up
+                if(_intervalCount >= _logFrequency)
                 {
-                    // time to stop the test.
-                    for (Integer id : _producers.keySet())
+                    _intervalCount = 0;
+                    if (Boolean.getBoolean("collect_stats"))
                     {
-                        JMSProducer producer = _producers.get(id);
-                        producer.stopProducing();
+                        runReaper();
+                    }
+                    if (System.currentTimeMillis() - _startTime >= _expiry)
+                    {
+                        // time to stop the test.
+                        _session.close();
+                        _connection.stop();
+                        run = false;
                     }
-                    runReaper();
-                    run = false;
                 }
             }
         }
-        catch (InterruptedException e)
+        catch (Exception e)
         {
             _logger.error("The timer thread exited", e);
         }
+        printSummary();
     }
 
     public void runReaper()
     {
         try
         {
-            long totalMsgCountThisInterval = 0;
-
-            for (Integer id : _producers.keySet())
-            {
-                JMSProducer producer = _producers.get(id);
-                totalMsgCountThisInterval = totalMsgCountThisInterval + producer.getCurrentMessageCount();
-
-            }
-            _totalMsgCount = _totalMsgCount + totalMsgCountThisInterval;
-
             FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true);
             StringBuffer buf = new StringBuffer();
             Date d = new Date(System.currentTimeMillis());
@@ -128,8 +106,6 @@
             buf.append(d.getTime()).append(",");
             buf.append(_totalMsgCount).append(",");
             buf.append(_totalMsgCount*1000 /totaltime).append(",");
-            buf.append(totalMsgCountThisInterval).append(",");
-            buf.append(totalMsgCountThisInterval*1000/_logDuration).append(",");
             buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n");
             buf.append("\n");
             _memoryLog.write(buf.toString());
@@ -147,7 +123,7 @@
         try
         {
             FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true);
-            String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),interval count,interval rate (msg/sec),memory";
+            String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),memory";
             _memoryLog.write(s);
             _memoryLog.close();
         }
@@ -157,33 +133,31 @@
         }
     }
 
-    private void printSummary() throws Exception
+    private void printSummary()
     {
-        if (Boolean.getBoolean("collect_stats"))
+        try
         {
-            for (Integer id : _producers.keySet())
-            {
-                JMSProducer producer = _producers.get(id);
-                _totalMsgCount = _totalMsgCount + producer.getCurrentMessageCount();
 
-            }
+            long current = System.currentTimeMillis();
+            double time = current - _startTime;
+            double ratio = _totalMsgCount*1000/time;
+            FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
+
+            StringBuffer buf = new StringBuffer("MessageProducerTest \n Test started at : ");
+            buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
+            Date d = new Date(current);
+            buf.append(df.format(d)).append("\n Total Time taken (ms):");
+            buf.append(time).append("\n Total messages sent:");
+            buf.append(_totalMsgCount).append("\n producer rate:");
+            buf.append(ratio).append("\n");
+            _summaryLog.write(buf.toString());
+            System.out.println("---------- Test Ended -------------");
+            _summaryLog.close();
+        }
+        catch(Exception e)
+        {
+            e.printStackTrace();
         }
-
-        long current = System.currentTimeMillis();
-        double time = current - _startTime;
-        double ratio = _totalMsgCount*1000/time;
-        FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
-
-        StringBuffer buf = new StringBuffer("MessageProducerTest \n Test started at : ");
-        buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
-        Date d = new Date(current);
-        buf.append(df.format(d)).append("\n Total Time taken (ms):");
-        buf.append(time).append("\n Total messages sent:");
-        buf.append(_totalMsgCount).append("\n producer rate:");
-        buf.append(ratio).append("\n");
-        _summaryLog.write(buf.toString());
-        System.out.println("---------- Test Ended -------------");
-        _summaryLog.close();
     }
 
     public static void main(String[] args)
@@ -191,8 +165,8 @@
         try
         {
             MessageProducerTest test = new MessageProducerTest();
-            test.start();
-            test.startTimerThread();
+            test.init();
+            test.run();
         }
         catch(Exception e)
         {

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java?rev=599611&r1=599610&r2=599611&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java Thu Nov 29 14:46:28 2007
@@ -5,11 +5,9 @@
     int _messageSize;
     boolean _transacted;
     boolean _synchronous;
-    String[] destArray;
-    int _producerCount;
-    int _consumerCount;
+    String _destination;
     long _expiry;
-    long _logDuration;
+    long _logFrequency;
     String _logFilePath;
 
     /**
@@ -17,8 +15,6 @@
      * -DmessageSize
      * -DuseQueue
      * -Dtransacted
-     * -DproducerCount
-     * -DconsumerCount
      * -Ddestinations
      * -DlogFilePath
      * -Duration=1H,30M,10S
@@ -27,21 +23,16 @@
     public void parseOptions()
     {
         _messageSize = Integer.parseInt(System.getProperty("messageSize","100"));
-        _synchronous = Boolean.parseBoolean( System.getProperty("synchronous", "false"));
         _transacted = false;
-        String destinations = System.getProperty("destinations");
-        destArray = destinations.split(",");
-        _producerCount = Integer.parseInt(System.getProperty("producerCount","1"));
-        _consumerCount = Integer.parseInt(System.getProperty("consumerCount","1"));
-        _logDuration = Long.parseLong(System.getProperty("logDuration","10"));
-        _logDuration = _logDuration*1000*60;
+        _destination = System.getProperty("destinations");
+        _logFrequency = Long.parseLong(System.getProperty("logDuration","10"));
         _logFilePath = System.getProperty("logFilePath");
         _expiry = getExpiry();
 
         System.out.println("============= Test Data ===================");
-        System.out.println("Total no of producers  : " + _producerCount);
-        System.out.println(_synchronous? "Total no of synchronous consumers   : " : "Total no of asynchronous consumers   :" + _consumerCount);
-        System.out.println("Log Frequency in mins  : " + _logDuration/(1000*60));
+        System.out.println("Destination            : " + _destination);
+        System.out.println("Collect stats          : " + Boolean.getBoolean("collect_stats"));
+        System.out.println("Log Frequency in msgs  : " + _logFrequency);
         System.out.println("Log file path          : " + _logFilePath);
         System.out.println("Test Duration          : " + printTestDuration());
         System.out.println("============= /Test Data ===================");