You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/11/12 19:08:17 UTC

svn commit: r594239 - in /incubator/qpid/trunk/qpid/java: client/src/main/java/ perftests/src/main/java/org/apache/qpid/client/perf/

Author: rajith
Date: Mon Nov 12 10:08:15 2007
New Revision: 594239

URL: http://svn.apache.org/viewvc?rev=594239&view=rev
Log:
Added a new set of performance tests which is more focused on getting the throughput.
The producers and consumers can be run from separate machines.
There is no pass/fail, it merely records message count, memory etc at periodic intervals and prints a summary at the the end of the test.
Since there are periodic entries, you could identify the rough time time it crashed and memory consumption etc.

Added:
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/ConnectionUtility.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/client.log4j

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/client.log4j
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/client.log4j?rev=594239&r1=594238&r2=594239&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/client.log4j (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/client.log4j Mon Nov 12 10:08:15 2007
@@ -23,11 +23,10 @@
 #log4j.additivity.org.apache.qpid=false
 #log4j.logger.org.apache.qpidity.transport=TRACE, console
 
-log4j.logger.org.apache.qpid=MAJOR, console
+log4j.logger.org.apache.qpid=ERROR, console
 log4j.additivity.org.apache.qpid=false
-log4j.logger.org.apache.qpidity.transport=MAJOR, console
-log4j.logger.org.apache.qpid.testutil.QpidTestCase=MAJOR, console
-
+#log4j.logger.org.apache.qpidity.transport=DEBUG, console
+#log4j.logger.org.apache.qpid.client.message.AbstractBytesTypedMessage=DEBUG, console
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.Threshold=all

Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/ConnectionUtility.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/ConnectionUtility.java?rev=594239&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/ConnectionUtility.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/ConnectionUtility.java Mon Nov 12 10:08:15 2007
@@ -0,0 +1,50 @@
+package org.apache.qpid.client.perf;
+
+import javax.naming.InitialContext;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionUtility
+{
+    private static final Logger _logger = LoggerFactory.getLogger(ConnectionUtility.class);
+
+    private InitialContext _initialContext;
+    private AMQConnectionFactory _connectionFactory;
+
+    private static ConnectionUtility _instance = new ConnectionUtility();
+
+    public static ConnectionUtility getInstance()
+    {
+        return _instance;
+    }
+
+    private InitialContext getInitialContext() throws Exception
+    {
+        _logger.info("get InitialContext");
+        if (_initialContext == null)
+        {
+            _initialContext = new InitialContext();
+        }
+        return _initialContext;
+    }
+
+    private AMQConnectionFactory getConnectionFactory() throws Exception
+    {
+        _logger.info("get ConnectionFactory");
+        if (_connectionFactory == null)
+        {
+            _connectionFactory = (AMQConnectionFactory) getInitialContext().lookup("local");
+        }
+        return _connectionFactory;
+    }
+
+    public AMQConnection getConnection() throws Exception
+    {
+        _logger.info("get Connection");
+        return (AMQConnection)getConnectionFactory().createConnection();
+    }
+
+}

Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java?rev=594239&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java Mon Nov 12 10:08:15 2007
@@ -0,0 +1,101 @@
+package org.apache.qpid.client.perf;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSConsumer implements Runnable
+{
+    private static final Logger _logger = LoggerFactory.getLogger(JMSConsumer.class);
+
+    private String _id;
+    private Connection _connection;
+    private Session _session;
+    private MessageConsumer _consumer;
+    private Destination _destination;
+    private boolean _transacted;
+    private int _ackMode = Session.AUTO_ACKNOWLEDGE;
+    private AtomicBoolean _run = new AtomicBoolean(true);
+    private long _currentMsgCount;
+
+    /* Not implementing transactions for first phase */
+    public JMSConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception
+    {
+        _id = id;
+        _connection = connection;
+        _destination = destination;
+        _transacted = transacted;
+        _ackMode = ackMode;
+    }
+
+    public void run()
+    {
+        _run.set(true);
+
+        try
+        {
+            _session = _connection.createSession(_transacted, _ackMode);
+            _consumer = _session.createConsumer(_destination);
+        }
+        catch(Exception e)
+        {
+            _logger.error("Error Setting up JMSProducer:"+ _id, e);
+        }
+
+        while (_run.get())
+        {
+            try
+            {
+                BytesMessage msg = (BytesMessage)_consumer.receive();
+                if (msg != null)
+                {
+                    long msgId = Integer.parseInt(msg.getJMSCorrelationID());
+                    if (_currentMsgCount+1 != msgId)
+                    {
+                        _logger.error("Error : Message received out of order in JMSConsumer:" + _id + " message id was " + msgId);
+                    }
+                    _currentMsgCount ++;
+                }
+            }
+            catch(Exception e)
+            {
+                _logger.error("Error Receiving message from JMSConsumer:" + _id, e);
+            }
+        }
+        try
+        {
+            _session.close();
+            _connection.close();
+        }
+        catch(Exception e)
+        {
+            _logger.error("Error Closing JMSConsumer:"+ _id, e);
+        }
+    }
+
+    public void stopConsuming()
+    {
+        _run.set(false);
+    }
+
+    public String getId()
+    {
+        return _id;
+    }
+
+    /* Not worried about synchronizing as accuracy here is not that important.
+     * So if this method is invoked the count maybe off by a few digits.
+     * But when the test stops, this will always return the proper count.
+     */
+    public long getCurrentMessageCount()
+    {
+        return _currentMsgCount;
+    }
+}

Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java?rev=594239&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java Mon Nov 12 10:08:15 2007
@@ -0,0 +1,96 @@
+package org.apache.qpid.client.perf;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.message.TestMessageFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSProducer implements Runnable
+{
+    private static final Logger _logger = LoggerFactory.getLogger(JMSProducer.class);
+
+    private String _id;
+    private int _messageSize;
+    private Connection _connection;
+    private Session _session;
+    private MessageProducer _producer;
+    private Destination _destination;
+    private BytesMessage _payload;
+    private boolean _transacted;
+    private int _ackMode = Session.AUTO_ACKNOWLEDGE;
+    private AtomicBoolean _run = new AtomicBoolean(true);
+    private long _currentMsgCount;
+
+    /* Not implementing transactions for first phase */
+    public JMSProducer(String id,Connection connection, Destination destination,int messageSize, boolean transacted) throws Exception
+    {
+        _id = id;
+        _connection = connection;
+        _destination = destination;
+        _messageSize = messageSize;
+        _transacted = transacted;
+    }
+
+    public void run()
+    {
+        try
+        {
+            _session = _connection.createSession(_transacted, _ackMode);
+            _payload = TestMessageFactory.newBytesMessage(_session, _messageSize);
+            _producer = _session.createProducer(_destination);
+        }
+        catch(Exception e)
+        {
+            _logger.error("Error Setting up JMSProducer:"+ _id, e);
+        }
+
+        while (_run.get())
+        {
+            try
+            {
+                _payload.setJMSCorrelationID(String.valueOf(_currentMsgCount+1));
+                _producer.send(_payload);
+                _currentMsgCount ++;
+            }
+            catch(Exception e)
+            {
+                _logger.error("Error Sending message from JMSProducer:" + _id, e);
+            }
+        }
+        try
+        {
+            _session.close();
+            _connection.close();
+        }
+        catch(Exception e)
+        {
+            _logger.error("Error Closing JMSProducer:"+ _id, e);
+        }
+    }
+
+    public void stopProducing()
+    {
+        _run.set(false);
+    }
+
+    public String getId()
+    {
+        return _id;
+    }
+
+    /* Not worried about synchronizing as accuracy here is not that important.
+     * So if this method is invoked the count maybe off by a few digits.
+     * But when the test stops, this will always return the proper count.
+     */
+    public long getCurrentMessageCount()
+    {
+        return _currentMsgCount;
+    }
+}

Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java?rev=594239&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java Mon Nov 12 10:08:15 2007
@@ -0,0 +1,162 @@
+package org.apache.qpid.client.perf;
+
+import java.io.FileWriter;
+import java.io.RandomAccessFile;
+import java.sql.Date;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageConsumerTest extends Options implements Runnable
+{
+    private static final Logger _logger = LoggerFactory.getLogger(MessageConsumerTest.class);
+    private SimpleDateFormat df = new SimpleDateFormat("h:mm a");
+
+    private Map<Integer,JMSConsumer> _consumers = new ConcurrentHashMap<Integer,JMSConsumer>();
+    private int _count;
+    String _logFileName;
+    private long _gracePeriod = 5 * 60 * 1000;
+    long _startTime;
+    long totalMsgCount;
+
+    public void start() throws Exception
+    {
+       this.parseOptions();
+       boolean useSameDest = true;
+       _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis();
+
+       // use each destination with a different producer
+       if (_producerCount == destArray.length)
+       {
+           useSameDest = false;
+       }
+       for (;_count < _producerCount;_count++)
+       {
+           createAndStartProducer(useSameDest?destArray[0]:destArray[_count]);
+       }
+    }
+
+    private void createAndStartProducer(String routingKey)throws Exception
+    {
+        AMQConnection con = ConnectionUtility.getInstance().getConnection();
+        con.start();
+        Destination dest = new AMQTopic(con,routingKey);
+        JMSConsumer prod = new JMSConsumer(String.valueOf(_count),(Connection)con, dest, _transacted,Session.AUTO_ACKNOWLEDGE);
+        Thread t = new Thread(prod);
+        t.setName("JMSConsumer-"+_count);
+        t.start();
+        _consumers.put(_count, prod);
+    }
+
+    private void startTimerThread()
+    {
+        Thread t = new Thread(this);
+        t.setName("MessageConsumerTest-TimerThread");
+        t.start();
+    }
+
+    public void run()
+    {
+        boolean run = true;
+        _startTime = System.currentTimeMillis();
+        try
+        {
+            while(run)
+            {
+                Thread.sleep(_logDuration);
+                runReaper(false);
+
+                if(System.currentTimeMillis() + _gracePeriod - _startTime > _expiry )
+                {
+                    // time to stop the test.
+                    for (Integer id : _consumers.keySet())
+                    {
+                        JMSConsumer consumer = _consumers.get(id);
+                        consumer.stopConsuming();
+                    }
+                    runReaper(true);
+                    run = false;
+                }
+            }
+        }
+        catch (InterruptedException e)
+        {
+            _logger.error("The timer thread exited",e);
+        }
+    }
+
+    public void runReaper(boolean printSummary)
+    {
+        try
+        {
+            FileWriter _logFile = new FileWriter(_logFileName + ".csv",true);
+            for (Integer id : _consumers.keySet())
+            {
+                JMSConsumer prod = _consumers.get(id);
+                StringBuffer buf = new StringBuffer("JMSConsumer(").append(prod.getId()).append("),");
+                Date d = new Date(System.currentTimeMillis());
+                buf.append(df.format(d)).append(",");
+                buf.append(d.getTime()).append(",");
+                buf.append(prod.getCurrentMessageCount()).append("\n");
+                _logFile.write(buf.toString());
+                totalMsgCount = totalMsgCount + prod.getCurrentMessageCount();
+            }
+            _logFile.close();
+
+            FileWriter _memoryLog = new FileWriter(_logFileName + "_memory.csv",true);
+            StringBuffer buf = new StringBuffer("JMSConsumer,");
+            Date d = new Date(System.currentTimeMillis());
+            buf.append(df.format(d)).append(",");
+            buf.append(d.getTime()).append(",");
+            buf.append(totalMsgCount).append(",");
+            buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n");
+            _memoryLog.write(buf.toString());
+            _memoryLog.close();
+            if (printSummary)
+            {
+                double totaltime = (d.getTime() - _startTime)*1000; // trying to get a per sec rate
+                double dCount = totalMsgCount;
+                double ratio = dCount/totaltime;
+                FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
+                buf = new StringBuffer("MessageConsumerTest \n Test started at : ");
+                buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
+                d = new Date(System.currentTimeMillis());
+                buf.append(df.format(d)).append("\n Total Time taken (ms):");
+                buf.append(totaltime).append("\n Total messages received:");
+                buf.append(totalMsgCount).append("\n Consumer rate:");
+                buf.append(ratio).append("\n");
+                _summaryLog.write(buf.toString());
+                System.out.println("---------- Test Ended -------------");
+                _summaryLog.close();
+            }
+        }
+        catch(Exception e)
+        {
+            _logger.error("Error printing info to the log file",e);
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        try
+        {
+            MessageConsumerTest test = new MessageConsumerTest();
+            test.start();
+            test.startTimerThread();
+        }
+        catch(Exception e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+}

Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java?rev=594239&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java Mon Nov 12 10:08:15 2007
@@ -0,0 +1,159 @@
+package org.apache.qpid.client.perf;
+
+import java.io.FileWriter;
+import java.sql.Date;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageProducerTest extends Options implements Runnable
+{
+    private static final Logger _logger = LoggerFactory.getLogger(MessageProducerTest.class);
+    private SimpleDateFormat df = new SimpleDateFormat("h:mm a");
+
+    private Map<Integer,JMSProducer> _producers = new ConcurrentHashMap<Integer,JMSProducer>();
+    private int _count;
+    String _logFileName;
+    long _startTime;
+    long totalMsgCount;
+
+    public void start() throws Exception
+    {
+       this.parseOptions();
+       boolean useSameDest = true;
+       _logFileName = _logFilePath + "/MessageProducerTest_" + System.currentTimeMillis();
+
+       // use each destination with a different producer
+       if (_producerCount == destArray.length)
+       {
+           useSameDest = false;
+       }
+       for (;_count < _producerCount;_count++)
+       {
+           createAndStartProducer(useSameDest?destArray[0]:destArray[_count]);
+       }
+    }
+
+    private void createAndStartProducer(String routingKey)throws Exception
+    {
+        AMQConnection con = ConnectionUtility.getInstance().getConnection();
+        con.start();
+        Destination dest = new AMQTopic(con,routingKey);
+        JMSProducer prod = new JMSProducer(String.valueOf(_count),(Connection)con, dest,_messageSize, _transacted);
+        Thread t = new Thread(prod);
+        t.setName("JMSProducer-"+_count);
+        t.start();
+        _producers.put(_count, prod);
+    }
+
+    private void startTimerThread()
+    {
+        Thread t = new Thread(this);
+        t.setName("MessageProducerTest-TimerThread");
+        t.start();
+    }
+
+    public void run()
+    {
+        boolean run = true;
+        _startTime = System.currentTimeMillis();
+        try
+        {
+            while(run)
+            {
+                Thread.sleep(_logDuration);
+                runReaper(false);
+
+                if(System.currentTimeMillis() - _startTime > _expiry )
+                {
+                    // time to stop the test.
+                    for (Integer id : _producers.keySet())
+                    {
+                        JMSProducer prod = _producers.get(id);
+                        prod.stopProducing();
+                    }
+                    runReaper(true);
+                    run = false;
+                }
+            }
+        }
+        catch (InterruptedException e)
+        {
+            _logger.error("The timer thread exited",e);
+        }
+    }
+
+    public void runReaper(boolean printSummary)
+    {
+        try
+        {
+            FileWriter _logFile = new FileWriter(_logFileName + ".csv",true);
+            for (Integer id : _producers.keySet())
+            {
+                JMSProducer prod = _producers.get(id);
+                StringBuffer buf = new StringBuffer("JMSProducer(").append(prod.getId()).append("),");
+                Date d = new Date(System.currentTimeMillis());
+                buf.append(df.format(d)).append(",");
+                buf.append(d.getTime()).append(",");
+                buf.append(prod.getCurrentMessageCount()).append("\n");
+                _logFile.write(buf.toString());
+                totalMsgCount = totalMsgCount + prod.getCurrentMessageCount();
+            }
+            _logFile.close();
+
+            FileWriter _memoryLog = new FileWriter(_logFileName + "_memory.csv",true);
+            StringBuffer buf = new StringBuffer("JMSProducer,");
+            Date d = new Date(System.currentTimeMillis());
+            buf.append(df.format(d)).append(",");
+            buf.append(d.getTime()).append(",");
+            buf.append(totalMsgCount).append(",");
+            buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n");
+            _memoryLog.write(buf.toString());
+            _memoryLog.close();
+            if (printSummary)
+            {
+                double totaltime = (d.getTime() - _startTime)*1000; // trying to get a per sec rate
+                double dCount = totalMsgCount;
+                double ratio = dCount/totaltime;
+                FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
+                buf = new StringBuffer("MessageProducerTest \n Test started at : ");
+                buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
+                d = new Date(System.currentTimeMillis());
+                buf.append(df.format(d)).append("\n Total Time taken (ms):");
+                buf.append(totaltime).append("\n Total messages sent:");
+                buf.append(totalMsgCount).append("\n Producer rate:");
+                buf.append(ratio).append("\n");
+                _summaryLog.write(buf.toString());
+                System.out.println("---------- Test Ended -------------");
+                _summaryLog.close();
+            }
+        }
+        catch(Exception e)
+        {
+            _logger.error("Error printing info to the log file",e);
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        try
+        {
+            MessageProducerTest test = new MessageProducerTest();
+            test.start();
+            test.startTimerThread();
+        }
+        catch(Exception e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+}

Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java?rev=594239&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java Mon Nov 12 10:08:15 2007
@@ -0,0 +1,105 @@
+package org.apache.qpid.client.perf;
+
+public class Options
+{
+    int _messageSize;
+    boolean _transacted;
+    String[] destArray;
+    int _producerCount;
+    int _consumerCount;
+    long _expiry;
+    long _logDuration;
+    String _logFilePath;
+
+    /**
+     * System props
+     * -DmessageSize
+     * -Dtransacted
+     * -DproducerCount
+     * -DconsumerCount
+     * -Ddestinations
+     * -DlogFilePath
+     * -Duration=1H,30M,10S
+     * -DlogDuration=10  in mins
+     */
+    public void parseOptions()
+    {
+        _messageSize = Integer.parseInt(System.getProperty("messageSize","100"));
+        _transacted = false;
+        String destinations = System.getProperty("destinations");
+        destArray = destinations.split(",");
+        _producerCount = Integer.parseInt(System.getProperty("producerCount","1"));
+        _consumerCount = Integer.parseInt(System.getProperty("consumerCount","1"));
+        _logDuration = Long.parseLong(System.getProperty("logDuration","10"));
+        _logDuration = _logDuration*1000*60;
+        _logFilePath = System.getProperty("logFilePath");
+        _expiry = getExpiry();
+
+        System.out.println("============= Test Data ===================");
+        System.out.println("Total no of producers  : " + _producerCount);
+        System.out.println("Total no of consumer   : " + _consumerCount);
+        System.out.println("Log Frequency in mins  : " + _logDuration/(1000*60));
+        System.out.println("Log file path          : " + _logFilePath);
+        System.out.println("Test Duration          : " + printTestDuration());
+        System.out.println("============= /Test Data ===================");
+    }
+
+    private String printTestDuration()
+    {
+        StringBuffer buf = new StringBuffer();
+        int hours = (int)_expiry/(60*60*1000);
+        int mins  = (int)_expiry/(60*1000);
+        int secs  = (int)_expiry/1000;
+        if (hours > 0)
+        {
+            buf.append(hours).append(" hours, ");
+        }
+        if (mins > 0)
+        {
+            buf.append(mins).append(" mins, ");
+        }
+        if (secs > 0)
+        {
+            buf.append(secs).append(" secs");
+        }
+
+        return buf.toString();
+    }
+
+    private long getExpiry()
+    {
+        // default is 30 mins
+        long time = 0;
+        String s = System.getProperty("duration");
+        if(s != null)
+        {
+            String[] temp = s.split(",");
+            for (String st:temp)
+            {
+                if(st.indexOf("H")>0)
+                {
+                    int hour = Integer.parseInt(st.substring(0,st.indexOf("H")));
+                    time = time + hour * 60 * 60 * 1000;
+                }
+                else if(st.indexOf("M")>0)
+                {
+                    int min = Integer.parseInt(st.substring(0,st.indexOf("M")));
+                    time = time + min * 60 * 1000;
+                }
+                else if(st.indexOf("S")>0)
+                {
+                    int sec = Integer.parseInt(st.substring(0,st.indexOf("S")));
+                    time = time + sec * 1000;
+                }
+
+            }
+        }
+        if (time == 0)
+        {
+            time = 30 * 60 * 1000;
+        }
+
+        return time;
+    }
+
+}