You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2009/11/11 01:17:29 UTC

svn commit: r834722 - in /qpid/trunk/qpid/java: testkit/ tools/bin/ tools/src/main/java/org/apache/qpid/tools/

Author: rajith
Date: Wed Nov 11 00:17:29 2009
New Revision: 834722

URL: http://svn.apache.org/viewvc?rev=834722&view=rev
Log:
Added the LatencyTest and PerfTest kit under the tools modules alongside QpidBench.
Modified the testkit build.xml to add tools as build dependency as some of the classes in testkit will be using MessageFactory

Added:
    qpid/trunk/qpid/java/tools/bin/perf_report.sh   (with props)
    qpid/trunk/qpid/java/tools/bin/run_pub.sh
    qpid/trunk/qpid/java/tools/bin/run_sub.sh
    qpid/trunk/qpid/java/tools/bin/setenv.sh
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
Modified:
    qpid/trunk/qpid/java/testkit/build.xml

Modified: qpid/trunk/qpid/java/testkit/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/build.xml?rev=834722&r1=834721&r2=834722&view=diff
==============================================================================
--- qpid/trunk/qpid/java/testkit/build.xml (original)
+++ qpid/trunk/qpid/java/testkit/build.xml Wed Nov 11 00:17:29 2009
@@ -20,7 +20,7 @@
  -->
 <project name="Test Kit" default="build">
 
-    <property name="module.depends" value="client broker common"/>
+    <property name="module.depends" value="client common tools"/>
 
     <import file="../module.xml"/>
 

Added: qpid/trunk/qpid/java/tools/bin/perf_report.sh
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/bin/perf_report.sh?rev=834722&view=auto
==============================================================================
--- qpid/trunk/qpid/java/tools/bin/perf_report.sh (added)
+++ qpid/trunk/qpid/java/tools/bin/perf_report.sh Wed Nov 11 00:17:29 2009
@@ -0,0 +1,131 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This will run the 8 use cases defined below and produce
+# a report in tabular format. Refer to the documentation
+# for more details.
+
+SUB_MEM=-Xmx1024M
+PUB_MEM=-Xmx1024M
+LOG_CONFIG="-Damqj.logging.level=WARN"
+
+. setenv.sh
+
+waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
+cleanup()
+{
+  pids=`ps aux | grep java | grep Perf | awk '{print $2}'`
+  if [ "$pids" != "" ]; then
+    kill -3 $pids
+    kill -9 $pids >/dev/null 2>&1
+  fi
+}
+
+# $1 test name
+# $2 consumer options
+# $3 producer options
+run_testcase()
+{
+  sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out &
+  waitfor sub.out "Warming up"
+  sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out &
+  waitfor sub.out "Completed the test"
+  waitfor pub.out "Consumer has completed the test"
+  sleep 2 #give a grace period to shutdown
+  print_result $1  
+}
+
+print_result()
+{
+  prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'`
+  sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'`
+  cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'` 
+  avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'`
+  min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'`
+  max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'`
+
+  printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
+  echo "------------------------------------------------------------------------------------------------"
+}
+
+trap cleanup EXIT
+
+echo "Test report on " `date +%F`
+echo "================================================================================================"
+echo "|Test           |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|"
+echo "------------------------------------------------------------------------------------------------"
+
+# Test 1 Trans Queue
+run_testcase "Trans_Queue" "" "-Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 2 Dura Queue
+run_testcase "Dura_Queue" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 3 Dura Queue Sync
+run_testcase "Dura_Queue_Sync" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_persistence=true"
+
+# Test 4 Dura Queue Sync Publish and Ack
+run_testcase "Dura_SyncPubAck" "-Ddurable=true -Dsync_ack=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent"
+
+# Test 5 Topic
+run_testcase "Topic" "-DtransDest=transientTopic" "-DtransDest=transientTopic -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 6 Durable Topic
+run_testcase "Dura_Topic" "-Ddurable=true -DtransDest=durableTopic" "-Ddurable=true -DtransDest=durableTopic -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 7 Fanout
+run_testcase "Fanout" "-DtransDest=fanoutQueue" "-DtransDest=fanoutQueue -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 8 Small TX
+run_testcase "Small_Txs_2" "-Ddurable=true -Dtransacted=true -Dtrans_size=1" \
+ "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1"
+
+# Test 9 Large TX
+run_testcase "Large_Txs_1000" "-Ddurable=true -Dtransacted=true -Dtrans_size=10" \
+ "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10"
+
+# Test 10 256 MSG
+run_testcase "Msg_256b" "" "-Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 11 512 MSG
+run_testcase "Msg_512b" "" "-Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 12 2048 MSG
+run_testcase "Msg_2048b" "" "-Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 13 Random size MSG
+run_testcase "Random_Msg_Size" "" "-Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 14 Random size MSG Durable
+run_testcase "Rand_Msg_Dura" "-Ddurable=true" "-Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 15 64K MSG
+run_testcase "Msg_64K" "-Damqj.tcpNoDelay=true" "-Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 16 Durable 64K MSG
+run_testcase "Msg_Durable_64K" "-Ddurable=true -Damqj.tcpNoDelay=true" \
+ "-Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 17 500K MSG
+run_testcase "Msg_500K" "-Damqj.tcpNoDelay=true" "-Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 18 Durable 500K MSG
+run_testcase "Msg_Dura_500K" "-Damqj.tcpNoDelay=true -Ddurable=true" \
+ "-Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"

Propchange: qpid/trunk/qpid/java/tools/bin/perf_report.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/java/tools/bin/run_pub.sh
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/bin/run_pub.sh?rev=834722&view=auto
==============================================================================
--- qpid/trunk/qpid/java/tools/bin/run_pub.sh (added)
+++ qpid/trunk/qpid/java/tools/bin/run_pub.sh Wed Nov 11 00:17:29 2009
@@ -0,0 +1,24 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+. $QPID_TEST_HOME/bin/setenv.sh
+
+echo "$@"
+$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.testkit.perf.PerfProducer

Added: qpid/trunk/qpid/java/tools/bin/run_sub.sh
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/bin/run_sub.sh?rev=834722&view=auto
==============================================================================
--- qpid/trunk/qpid/java/tools/bin/run_sub.sh (added)
+++ qpid/trunk/qpid/java/tools/bin/run_sub.sh Wed Nov 11 00:17:29 2009
@@ -0,0 +1,25 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+. $QPID_TEST_HOME/bin/setenv.sh
+
+echo "$@"
+$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.testkit.perf.PerfConsumer
+

Added: qpid/trunk/qpid/java/tools/bin/setenv.sh
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/bin/setenv.sh?rev=834722&view=auto
==============================================================================
--- qpid/trunk/qpid/java/tools/bin/setenv.sh (added)
+++ qpid/trunk/qpid/java/tools/bin/setenv.sh Wed Nov 11 00:17:29 2009
@@ -0,0 +1,49 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Compiles the test classes and sets the CLASSPATH
+
+# check for QPID_TEST_HOME
+if [ "$QPID_TEST_HOME" = "" ] ; then
+    echo "ERROR: Please set QPID_TEST_HOME ...."
+    exit 1
+fi
+
+# check for JAVA_HOME
+if [ "$JAVA_HOME" = "" ] ; then
+    echo "ERROR: Please set JAVA_HOME ...."
+    exit 1
+fi
+
+# VENDOR_LIB path needs to be set
+# for Qpid set this to {qpid_checkout}/java/build/lib
+if [ "$VENDOR_LIB" = "" ] ; then
+    echo "ERROR: Please set VENDOR_LIB path in the script ...."
+    exit 1
+fi
+
+
+[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes
+
+CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"`
+$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'`
+
+export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH
+

Added: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java?rev=834722&view=auto
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java (added)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java Wed Nov 11 00:17:29 2009
@@ -0,0 +1,349 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.thread.Threading;
+
+/**
+ * Latency test sends an x number of messages in warmup mode and wait for a confirmation
+ * from the consumer that it has successfully consumed them and ready to start the
+ * test. It will start sending y number of messages and each message will contain a time
+ * stamp. This will be used at the receiving end to measure the latency.
+ *
+ * It is important to have a sufficiently large number for the warmup count to
+ * ensure the system is in steady state before the test is started.
+ *
+ * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000)
+ * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1
+ *
+ * The idea is to get a latency sample for the system once it achieves steady state.
+ *
+ */
+
+public class LatencyTest extends PerfBase implements MessageListener
+{
+    MessageProducer producer;
+    MessageConsumer consumer;
+    Message msg;
+    byte[] payload;
+    long maxLatency = 0;
+    long minLatency = Long.MAX_VALUE;
+    long totalLatency = 0;  // to calculate avg latency.
+    int rcvdMsgCount = 0;
+    double stdDev = 0;
+    double avgLatency = 0;
+    boolean warmup_mode = true;
+    boolean transacted = false;
+    int transSize = 0;
+
+    final List<Long> latencies;
+    final Lock lock = new ReentrantLock();
+    final Condition warmedUp;
+    final Condition testCompleted;
+
+    public LatencyTest()
+    {
+        super();
+        warmedUp = lock.newCondition();
+        testCompleted = lock.newCondition();
+        // Storing the following two for efficiency
+        transacted = params.isTransacted();
+        transSize = params.getTransactionSize();
+        latencies = new ArrayList <Long>(params.getMsgCount());
+    }
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        consumer = session.createConsumer(dest);
+        consumer.setMessageListener(this);
+
+        // if message caching is enabled we pre create the message
+        // else we pre create the payload
+        if (params.isCacheMessage())
+        {
+            msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+            msg.setJMSDeliveryMode(params.isDurable()?
+                                   DeliveryMode.PERSISTENT :
+                                   DeliveryMode.NON_PERSISTENT
+                                  );
+        }
+        else
+        {
+            payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+        }
+
+        producer = session.createProducer(dest);
+        producer.setDisableMessageID(params.isDisableMessageID());
+        producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+    }
+
+    protected Message getNextMessage() throws Exception
+    {
+        if (params.isCacheMessage())
+        {
+            return msg;
+        }
+        else
+        {
+            msg = session.createBytesMessage();
+            ((BytesMessage)msg).writeBytes(payload);
+            return msg;
+        }
+    }
+
+    public void warmup()throws Exception
+    {
+        System.out.println("Warming up......");
+        int count = params.getWarmupCount();
+        for (int i=0; i < count; i++)
+        {
+            producer.send(getNextMessage());
+        }
+        Message msg = session.createTextMessage("End");
+        producer.send(msg);
+
+        if (params.isTransacted())
+        {
+            session.commit();
+        }
+
+        try
+        {
+            lock.lock();
+            warmedUp.await();
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    public void onMessage(Message msg)
+    {
+        try
+        {
+            if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+            {
+                if (warmup_mode)
+                {
+                    warmup_mode = false;
+                    try
+                    {
+                        lock.lock();
+                        warmedUp.signal();
+                    }
+                    finally
+                    {
+                        lock.unlock();
+                    }
+                }
+                else
+                {
+                    computeStats();
+                }
+            }
+            else if (!warmup_mode)
+            {
+                long time = System.currentTimeMillis();
+                rcvdMsgCount ++;
+
+                if (transacted && (rcvdMsgCount % transSize == 0))
+                {
+                    session.commit();
+                }
+
+                long latency = time - msg.getJMSTimestamp();
+                latencies.add(latency);
+                totalLatency = totalLatency + latency;
+            }
+
+        }
+        catch(Exception e)
+        {
+            handleError(e,"Error when receiving messages");
+        }
+
+    }
+
+    private void computeStats()
+    {
+        avgLatency = (double)totalLatency/(double)rcvdMsgCount;
+        double sigma = 0;
+
+        for (long latency: latencies)
+        {
+            maxLatency = Math.max(maxLatency, latency);
+            minLatency = Math.min(minLatency, latency);
+            sigma = sigma + Math.pow(latency - avgLatency,2);
+        }
+
+        stdDev = Math.sqrt(sigma/(rcvdMsgCount -1));
+
+        try
+        {
+            lock.lock();
+            testCompleted.signal();
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    public void writeToFile() throws Exception
+    {
+        String fileName = System.getProperty("file");
+        PrintWriter writer = new PrintWriter(new FileOutputStream(fileName));
+        for (long latency: latencies)
+        {
+            writer.println(String.valueOf(latency));
+        }
+        writer.flush();
+        writer.close();
+    }
+
+    public void printToConsole()
+    {
+        System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
+        System.out.println(new StringBuilder("Standard Deviation  : ").
+                           append(df.format(stdDev)).
+                           append(" ms").toString());
+        System.out.println(new StringBuilder("Avg Latency         : ").
+                           append(df.format(avgLatency)).
+                           append(" ms").toString());
+        System.out.println(new StringBuilder("Min Latency         : ").
+                           append(minLatency).
+                           append(" ms").toString());
+        System.out.println(new StringBuilder("Max Latency         : ").
+                           append(maxLatency).
+                           append(" ms").toString());
+        System.out.println("Completed the test......\n");
+    }
+
+    public void startTest() throws Exception
+    {
+        System.out.println("Starting test......");
+        int count = params.getMsgCount();
+
+        for(int i=0; i < count; i++ )
+        {
+            Message msg = getNextMessage();
+            msg.setJMSTimestamp(System.currentTimeMillis());
+            producer.send(msg);
+            if ( transacted && ((i+1) % transSize == 0))
+            {
+                session.commit();
+            }
+        }
+        Message msg = session.createTextMessage("End");
+        producer.send(msg);
+        if (params.isTransacted())
+        {
+            session.commit();
+        }
+    }
+
+    public void tearDown() throws Exception
+    {
+        try
+        {
+            lock.lock();
+            testCompleted.await();
+        }
+        finally
+        {
+            lock.unlock();
+        }
+
+        producer.close();
+        consumer.close();
+        session.close();
+        con.close();
+    }
+
+    public void test()
+    {
+        try
+        {
+            setUp();
+            warmup();
+            startTest();
+            tearDown();
+        }
+        catch(Exception e)
+        {
+            handleError(e,"Error when running test");
+        }
+    }
+
+
+    public static void main(String[] args)
+    {
+        final LatencyTest latencyTest = new LatencyTest();        
+        Runnable r = new Runnable()
+        {
+            public void run()
+            {
+                latencyTest.test();
+                latencyTest.printToConsole();
+                if (System.getProperty("file") != null)
+                {
+                    try
+                    {
+                        latencyTest.writeToFile();
+                    }
+                    catch(Exception e)
+                    {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        };
+        
+        Thread t;
+        try
+        {
+            t = Threading.getThreadFactory().createThread(r);                      
+        }
+        catch(Exception e)
+        {
+            throw new Error("Error creating latency test thread",e);
+        }
+        t.start(); 
+    }
+}
\ No newline at end of file

Added: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java?rev=834722&view=auto
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java (added)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java Wed Nov 11 00:17:29 2009
@@ -0,0 +1,64 @@
+package org.apache.qpid.tools;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class MessageFactory
+{
+    public static Message createBytesMessage(Session ssn, int size) throws JMSException
+    {
+        BytesMessage msg = ssn.createBytesMessage();
+        msg.writeBytes(createMessagePayload(size).getBytes());
+        return msg;
+    }
+
+    public static Message createTextMessage(Session ssn, int size) throws JMSException
+    {
+        TextMessage msg = ssn.createTextMessage();
+        msg.setText(createMessagePayload(size));
+        return msg;
+    }
+
+    public static String createMessagePayload(int size)
+    {
+        String msgData = "Qpid Test Message";
+
+        StringBuffer buf = new StringBuffer(size);
+        int count = 0;
+        while (count <= (size - msgData.length()))
+        {
+            buf.append(msgData);
+            count += msgData.length();
+        }
+        if (count < size)
+        {
+            buf.append(msgData, 0, size - count);
+        }
+
+        return buf.toString();
+    }
+}

Added: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java?rev=834722&view=auto
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java (added)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java Wed Nov 11 00:17:29 2009
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.text.DecimalFormat;
+import java.util.Hashtable;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+public class PerfBase
+{
+    TestParams params;
+    Connection con;
+    Session session;
+    Destination dest;
+    Destination feedbackDest;
+    DecimalFormat df = new DecimalFormat("###.##");
+
+    public PerfBase()
+    {
+        params = new TestParams();
+    }
+
+    public void setUp() throws Exception
+    {
+        Hashtable<String,String> env = new Hashtable<String,String>();
+        env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory());
+        env.put(Context.PROVIDER_URL, params.getProviderURL());
+
+        Context ctx = null;
+        try
+        {
+            ctx = new InitialContext(env);
+        }
+        catch(Exception e)
+        {
+            throw new Exception("Error initializing JNDI",e);
+
+        }
+
+        ConnectionFactory conFac = null;
+        try
+        {
+            conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory());
+        }
+        catch(Exception e)
+        {
+            throw new Exception("Error looking up connection factory",e);
+        }
+
+        con = conFac.createConnection();
+        con.start();
+        session = con.createSession(params.isTransacted(),
+                                    params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
+
+        try
+        {
+            dest = (Destination)ctx.lookup( params.isDurable()?
+                                            params.getDurableDestination():
+                                            params.getTransientDestination()
+                                           );
+        }
+        catch(Exception e)
+        {
+            throw new Exception("Error looking up destination",e);
+        }
+    }
+
+    public void handleError(Exception e,String msg)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append(msg);
+        sb.append(" ");
+        sb.append(e.getMessage());
+        System.err.println(sb.toString());
+        e.printStackTrace();
+    }
+}
+

Added: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java?rev=834722&view=auto
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java (added)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java Wed Nov 11 00:17:29 2009
@@ -0,0 +1,267 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.thread.Threading;
+
+/**
+ * PerfConsumer will receive x no of messages in warmup mode.
+ * Once it receives the Start message it will then signal the PerfProducer.
+ * It will start recording stats from the first message it receives after
+ * the warmup mode is done.
+ *
+ * The following calculations are done.
+ * The important numbers to look at is
+ * a) Avg Latency
+ * b) System throughput.
+ *
+ * Latency.
+ * =========
+ * Currently this test is written with the assumption that either
+ * a) The Perf Producer and Consumer are on the same machine
+ * b) They are on separate machines that have their time synced via a Time Server
+ *
+ * In order to calculate latency the producer inserts a timestamp
+ * hen the message is sent. The consumer will note the current time the message is
+ * received and will calculate the latency as follows
+ * latency = rcvdTime - msg.getJMSTimestamp()
+ *
+ * Through out the test it will keep track of the max and min latency to show the
+ * variance in latencies.
+ *
+ * Avg latency is measured by adding all latencies and dividing by the total msgs.
+ * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
+ *
+ * Throughput
+ * ===========
+ * System throughput is calculated as follows
+ * rcvdMsgCount/(rcvdTime - testStartTime)
+ *
+ * Consumer rate is calculated as
+ * rcvdMsgCount/(rcvdTime - startTime)
+ *
+ * Note that the testStartTime referes to when the producer sent the first message
+ * and startTime is when the consumer first received a message.
+ *
+ * rcvdTime keeps track of when the last message is received.
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ */
+
+public class PerfConsumer extends PerfBase implements MessageListener
+{
+    MessageConsumer consumer;
+    long maxLatency = 0;
+    long minLatency = Long.MAX_VALUE;
+    long totalLatency = 0;  // to calculate avg latency.
+    int rcvdMsgCount = 0;
+    long testStartTime = 0; // to measure system throughput
+    long startTime = 0;     // to measure consumer throughput
+    long rcvdTime = 0;
+    boolean transacted = false;
+    int transSize = 0;
+
+    final Object lock = new Object();
+
+    public PerfConsumer()
+    {
+        super();
+    }
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        consumer = session.createConsumer(dest);
+
+        // Storing the following two for efficiency
+        transacted = params.isTransacted();
+        transSize = params.getTransactionSize();
+    }
+
+    public void warmup()throws Exception
+    {
+        System.out.println("Warming up......");
+
+        boolean start = false;
+        while (!start)
+        {
+            Message msg = consumer.receive();
+            if (msg instanceof TextMessage)
+            {
+                if (((TextMessage)msg).getText().equals("End"))
+                {
+                    start = true;
+                    MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
+                    temp.send(session.createMessage());
+                    if (params.isTransacted())
+                    {
+                        session.commit();
+                    }
+                    temp.close();
+                }
+            }
+        }
+    }
+
+    public void startTest() throws Exception
+    {
+        System.out.println("Starting test......");
+        consumer.setMessageListener(this);
+    }
+
+    public void printResults() throws Exception
+    {
+        synchronized (lock)
+        {
+            lock.wait();
+        }
+
+        double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
+        double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
+        double consRate   = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
+        System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
+        System.out.println(new StringBuilder("Consumer rate       : ").
+                           append(df.format(consRate)).
+                           append(" msg/sec").toString());
+        System.out.println(new StringBuilder("System Throughput   : ").
+                           append(df.format(throughput)).
+                           append(" msg/sec").toString());
+        System.out.println(new StringBuilder("Avg Latency         : ").
+                           append(df.format(avgLatency)).
+                           append(" ms").toString());
+        System.out.println(new StringBuilder("Min Latency         : ").
+                           append(minLatency).
+                           append(" ms").toString());
+        System.out.println(new StringBuilder("Max Latency         : ").
+                           append(maxLatency).
+                           append(" ms").toString());
+        System.out.println("Completed the test......\n");
+    }
+
+    public void notifyCompletion(Destination replyTo) throws Exception
+    {
+        MessageProducer tmp = session.createProducer(replyTo);
+        Message endMsg = session.createMessage();
+        tmp.send(endMsg);
+        if (params.isTransacted())
+        {
+            session.commit();
+        }
+        tmp.close();
+    }
+
+    public void tearDown() throws Exception
+    {
+        consumer.close();
+        session.close();
+        con.close();
+    }
+
+    public void onMessage(Message msg)
+    {
+        try
+        {
+            if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+            {
+                notifyCompletion(msg.getJMSReplyTo());
+
+                synchronized (lock)
+                {
+                   lock.notifyAll();
+                }
+            }
+            else
+            {
+                rcvdTime = System.currentTimeMillis();
+                rcvdMsgCount ++;
+
+                if (rcvdMsgCount == 1)
+                {
+                    startTime = rcvdTime;
+                    testStartTime = msg.getJMSTimestamp();
+                }
+
+                if (transacted && (rcvdMsgCount % transSize == 0))
+                {
+                    session.commit();
+                }
+
+                long latency = rcvdTime - msg.getJMSTimestamp();
+                maxLatency = Math.max(maxLatency, latency);
+                minLatency = Math.min(minLatency, latency);
+                totalLatency = totalLatency + latency;
+            }
+
+        }
+        catch(Exception e)
+        {
+            handleError(e,"Error when receiving messages");
+        }
+
+    }
+
+    public void test()
+    {
+        try
+        {
+            setUp();
+            warmup();
+            startTest();
+            printResults();
+            tearDown();
+        }
+        catch(Exception e)
+        {
+            handleError(e,"Error when running test");
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        final PerfConsumer cons = new PerfConsumer();
+        Runnable r = new Runnable()
+        {
+            public void run()
+            {
+                cons.test();
+            }
+        };
+        
+        Thread t;
+        try
+        {
+            t = Threading.getThreadFactory().createThread(r);                      
+        }
+        catch(Exception e)
+        {
+            throw new Error("Error creating consumer thread",e);
+        }
+        t.start(); 
+    }
+}
\ No newline at end of file

Added: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java?rev=834722&view=auto
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java (added)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java Wed Nov 11 00:17:29 2009
@@ -0,0 +1,262 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+
+import org.apache.qpid.thread.Threading;
+
+/**
+ * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
+ * from the consumer that it has successfully consumed them and ready to start the
+ * test. It will start sending y no of messages and each message will contain a time
+ * stamp. This will be used at the receiving end to measure the latency.
+ *
+ * This is done with the assumption that both consumer and producer are running on
+ * the same machine or different machines which have time synced using a time server.
+ *
+ * This test also calculates the producer rate as follows.
+ * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ * Rajith - Producer rate is not an accurate perf metric IMO.
+ * It is heavily inlfuenced by any in memory buffering.
+ * System throughput and latencies calculated by the PerfConsumer are more realistic
+ * numbers.
+ *
+ */
+public class PerfProducer extends PerfBase
+{
+    MessageProducer producer;
+    Message msg;
+    byte[] payload;
+    List<byte[]> payloads;
+    boolean cacheMsg = false;
+    boolean randomMsgSize = false;
+    boolean durable = false;
+    Random random;
+    int msgSizeRange = 1024;
+    
+    public PerfProducer()
+    {
+        super();
+    }
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        feedbackDest = session.createTemporaryQueue();
+
+        durable = params.isDurable();
+        
+        // if message caching is enabled we pre create the message
+        // else we pre create the payload
+        if (params.isCacheMessage())
+        {
+            cacheMsg = true;
+            
+            msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+            msg.setJMSDeliveryMode(durable?
+                                   DeliveryMode.PERSISTENT :
+                                   DeliveryMode.NON_PERSISTENT
+                                  );
+        }
+        else if (params.isRandomMsgSize())
+        {
+            random = new Random(20080921);
+            randomMsgSize = true;
+            msgSizeRange = params.getMsgSize();
+            payloads = new ArrayList<byte[]>(msgSizeRange);
+            
+            for (int i=0; i < msgSizeRange; i++)
+            {
+                payloads.add(MessageFactory.createMessagePayload(i).getBytes());
+            }
+        }        
+        else
+        {
+            payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+        }
+
+        producer = session.createProducer(dest);
+        producer.setDisableMessageID(params.isDisableMessageID());
+        producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+    }
+
+    protected Message getNextMessage() throws Exception
+    {
+        if (cacheMsg)
+        {
+            return msg;
+        }
+        else
+        {            
+            msg = session.createBytesMessage();
+            
+            if (!randomMsgSize)
+            {
+                ((BytesMessage)msg).writeBytes(payload);
+            }
+            else
+            {
+                ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
+            }
+            msg.setJMSDeliveryMode(durable?
+                    DeliveryMode.PERSISTENT :
+                    DeliveryMode.NON_PERSISTENT
+                   );
+            return msg;
+        }
+    }
+
+    public void warmup()throws Exception
+    {
+        System.out.println("Warming up......");
+        MessageConsumer tmp = session.createConsumer(feedbackDest);
+
+        for (int i=0; i < params.getWarmupCount() -1; i++)
+        {
+            producer.send(getNextMessage());
+        }
+        Message msg = session.createTextMessage("End");
+        msg.setJMSReplyTo(feedbackDest);
+        producer.send(msg);
+
+        if (params.isTransacted())
+        {
+            session.commit();
+        }
+
+        tmp.receive();
+
+        if (params.isTransacted())
+        {
+            session.commit();
+        }
+
+        tmp.close();
+    }
+
+    public void startTest() throws Exception
+    {
+        System.out.println("Starting test......");
+        int count = params.getMsgCount();
+        boolean transacted = params.isTransacted();
+        int tranSize =  params.getTransactionSize();
+
+        long start = System.currentTimeMillis();
+        for(int i=0; i < count; i++ )
+        {
+            Message msg = getNextMessage();
+            msg.setJMSTimestamp(System.currentTimeMillis());
+            producer.send(msg);
+            if ( transacted && ((i+1) % tranSize == 0))
+            {
+                session.commit();
+            }
+        }
+        long time = System.currentTimeMillis() - start;
+        double rate = ((double)count/(double)time)*1000;
+        System.out.println(new StringBuilder("Producer rate: ").
+                               append(df.format(rate)).
+                               append(" msg/sec").
+                               toString());
+    }
+
+    public void waitForCompletion() throws Exception
+    {
+        MessageConsumer tmp = session.createConsumer(feedbackDest);
+        Message msg = session.createTextMessage("End");
+        msg.setJMSReplyTo(feedbackDest);
+        producer.send(msg);
+
+        if (params.isTransacted())
+        {
+            session.commit();
+        }
+
+        tmp.receive();
+
+        if (params.isTransacted())
+        {
+            session.commit();
+        }
+
+        tmp.close();
+        System.out.println("Consumer has completed the test......");
+    }
+
+    public void tearDown() throws Exception
+    {
+        producer.close();
+        session.close();
+        con.close();
+    }
+
+    public void test()
+    {
+        try
+        {
+            setUp();
+            warmup();
+            startTest();
+            waitForCompletion();
+            tearDown();
+        }
+        catch(Exception e)
+        {
+            handleError(e,"Error when running test");
+        }
+    }
+
+
+    public static void main(String[] args)
+    {
+        final PerfProducer prod = new PerfProducer();
+        Runnable r = new Runnable()
+        {
+            public void run()
+            {
+                prod.test();
+            }
+        };
+        
+        Thread t;
+        try
+        {
+            t = Threading.getThreadFactory().createThread(r);                      
+        }
+        catch(Exception e)
+        {
+            throw new Error("Error creating producer thread",e);
+        }
+        t.start();            
+    }
+}
\ No newline at end of file

Added: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java?rev=834722&view=auto
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java (added)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java Wed Nov 11 00:17:29 2009
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import javax.jms.Session;
+
+public class TestParams
+{
+    private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+
+    private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties";
+
+    private String connectionFactory = "connectionFactory";
+
+    private String transientDest = "transientQueue";
+
+    private String durableDest = "durableQueue";
+
+    private int msg_size = 1024;
+
+    private int msg_type = 1;   // not used yet
+
+    private boolean cacheMessage = false;
+
+    private boolean disableMessageID = false;
+
+    private boolean disableTimestamp = false;
+
+    private boolean durable = false;
+
+    private boolean transacted = false;
+
+    private int transaction_size = 1000;
+
+    private int ack_mode = Session.AUTO_ACKNOWLEDGE;
+
+    private int msg_count = 10;
+
+    private int warmup_count = 1;
+    
+    private boolean random_msg_size = false;
+
+    public TestParams()
+    {
+        initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory);
+        providerURL = System.getProperty("java.naming.provider.url",providerURL);
+
+        transientDest = System.getProperty("transDest",transientDest);
+        durableDest = System.getProperty("durableDest",durableDest);
+
+        msg_size  = Integer.getInteger("msg_size", 1024);
+        msg_type = Integer.getInteger("msg_type",1);
+        cacheMessage = Boolean.getBoolean("cache_msg");
+        disableMessageID = Boolean.getBoolean("disableMessageID");
+        disableTimestamp = Boolean.getBoolean("disableTimestamp");
+        durable = Boolean.getBoolean("durable");
+        transacted = Boolean.getBoolean("transacted");
+        transaction_size = Integer.getInteger("trans_size",1000);
+        ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE);
+        msg_count = Integer.getInteger("msg_count",msg_count);
+        warmup_count = Integer.getInteger("warmup_count",warmup_count);
+        random_msg_size = Boolean.getBoolean("random_msg_size");
+    }
+
+    public int getAckMode()
+    {
+        return ack_mode;
+    }
+
+    public String getConnectionFactory()
+    {
+        return connectionFactory;
+    }
+
+    public String getTransientDestination()
+    {
+        return transientDest;
+    }
+
+    public String getDurableDestination()
+    {
+        return durableDest;
+    }
+
+    public String getInitialContextFactory()
+    {
+        return initialContextFactory;
+    }
+
+    public int getMsgCount()
+    {
+        return msg_count;
+    }
+
+    public int getMsgSize()
+    {
+        return msg_size;
+    }
+
+    public int getMsgType()
+    {
+        return msg_type;
+    }
+
+    public boolean isDurable()
+    {
+        return durable;
+    }
+
+    public String getProviderURL()
+    {
+        return providerURL;
+    }
+
+    public boolean isTransacted()
+    {
+        return transacted;
+    }
+
+    public int getTransactionSize()
+    {
+        return transaction_size;
+    }
+
+    public int getWarmupCount()
+    {
+        return warmup_count;
+    }
+
+    public boolean isCacheMessage()
+    {
+        return cacheMessage;
+    }
+
+    public boolean isDisableMessageID()
+    {
+        return disableMessageID;
+    }
+
+    public boolean isDisableTimestamp()
+    {
+        return disableTimestamp;
+    }
+    
+    public boolean isRandomMsgSize()
+    {
+        return random_msg_size;
+    }
+
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org