You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/08/15 13:26:57 UTC
svn commit: r1157780 [13/13] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./
broker-plugins/access-control/
broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/
broker-plugins/access-control/src/main/java/org...
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.cluster.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.cluster.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.cluster.testprofile:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.cluster.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:1073294-1090000
+/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:1073294-1157765
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.noprefetch.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.noprefetch.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.noprefetch.testprofile:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.noprefetch.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:1073294-1090000
+/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:1073294-1157765
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.ssl.excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.excludes:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.excludes:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.ssl.excludes:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:1073294-1090000
+/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:1073294-1157765
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.ssl.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.testprofile:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.ssl.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:1073294-1090000
+/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:1073294-1157765
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.testprofile
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.testprofile?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.testprofile (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.testprofile Mon Aug 15 11:26:46 2011
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-broker.version=0-10
+broker.version=v0_10
broker.language=cpp
broker.dir=${project.root}/../cpp/src
@@ -32,7 +32,8 @@ broker.stopped=Exception constructed
broker.modules=
broker.args=
-broker=${broker.executable} -p @PORT --data-dir ${build.data}/@PORT -t --auth no --no-module-dir ${broker.modules} ${broker.args}
+broker.type=spawned
+broker.command=${broker.executable} -p @PORT --data-dir ${build.data}/@PORT -t --auth no --no-module-dir ${broker.modules} ${broker.args}
profile.excludes=CPPPrefetchExcludes CPPTransientExcludes
test.excludes=Excludes CPPExcludes ${profile}.excludes ${profile.excludes} cpp.excludes
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/cpp.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.testprofile:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.testprofile:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/cpp.testprofile:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:1073294-1090000
+/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:1073294-1157765
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -0,0 +1,5 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:443187-726139
+/qpid/branches/java-broker-0-10/qpid/java/test-profiles/java-derby.testprofile:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/test-profiles/java-derby.testprofile:805429-821809
+/qpid/branches/qpid-2935/qpid/java/test-profiles/java-derby.testprofile:1061302-1072333
+/qpid/trunk/qpid/java/test-profiles/java-dby-spawn.0-9-1.testprofile:1145001-1157765
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/java-dby.0-9-1.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -0,0 +1,5 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:443187-726139
+/qpid/branches/java-broker-0-10/qpid/java/test-profiles/java-derby.testprofile:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/test-profiles/java-derby.testprofile:805429-821809
+/qpid/branches/qpid-2935/qpid/java/test-profiles/java-derby.testprofile:1061302-1072333
+/qpid/trunk/qpid/java/test-profiles/java-dby.0-9-1.testprofile:1145001-1157765
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/java-mms-spawn.0-10.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -0,0 +1,5 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/java.testprofile:443187-726139
+/qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.testprofile:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/test-profiles/java.testprofile:805429-821809
+/qpid/branches/qpid-2935/qpid/java/test-profiles/java.testprofile:1061302-1072333
+/qpid/trunk/qpid/java/test-profiles/java-mms-spawn.0-10.testprofile:1145001-1157765
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/java-mms-spawn.0-9-1.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -0,0 +1,5 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/java.testprofile:443187-726139
+/qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.testprofile:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/test-profiles/java.testprofile:805429-821809
+/qpid/branches/qpid-2935/qpid/java/test-profiles/java.testprofile:1061302-1072333
+/qpid/trunk/qpid/java/test-profiles/java-mms-spawn.0-9-1.testprofile:1145001-1157765
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/java-mms.0-9-1.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -0,0 +1,5 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/java.testprofile:443187-726139
+/qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.testprofile:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/test-profiles/java.testprofile:805429-821809
+/qpid/branches/qpid-2935/qpid/java/test-profiles/java.testprofile:1061302-1072333
+/qpid/trunk/qpid/java/test-profiles/java-mms.0-9-1.testprofile:1145001-1157765
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/log4j-test.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -3,4 +3,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/log4j-test.xml:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/log4j-test.xml:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/log4j-test.xml:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:1073294-1090000
+/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:1073294-1157765
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/test-provider.properties
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/test-provider.properties?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/test-provider.properties (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/test-provider.properties Mon Aug 15 11:26:46 2011
@@ -30,16 +30,12 @@ test.port.alt.ssl=25671
connectionfactory.default = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
connectionfactory.default.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.ssl}?ssl='true''
-connectionfactory.default.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'
connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
-connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:2;vm://:1'&failover='roundrobin?cyclecount='20''
connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'
connectionfactory.connection2 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt}'
-connectionfactory.connection1.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'
-connectionfactory.connection2.vm = amqp://username:password@clientid/test?brokerlist='vm://:2'
queue.MyQueue = example.MyQueue
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/test-provider.properties
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/test-provider.properties:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/test-provider.properties:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/test-provider.properties:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/test-provider.properties:1073294-1090000
+/qpid/trunk/qpid/java/test-profiles/test-provider.properties:1073294-1157765
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/test_resources/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/test-profiles/test_resources:795950-829653
/qpid/branches/java-network-refactor/qpid/java/test-profiles/test_resources:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/test_resources:1061302-1072333
-/qpid/trunk/qpid/java/test-profiles/test_resources:1073294-1090000
+/qpid/trunk/qpid/java/test-profiles/test_resources:1073294-1157765
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/test-profiles/testprofile.defaults
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Mon Aug 15 11:26:46 2011
@@ -0,0 +1,5 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/default.testprofile:443187-726139
+/qpid/branches/java-broker-0-10/qpid/java/test-profiles/default.testprofile:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/test-profiles/default.testprofile:805429-821809
+/qpid/branches/qpid-2935/qpid/java/test-profiles/default.testprofile:1061302-1072333
+/qpid/trunk/qpid/java/test-profiles/testprofile.defaults:1145001-1157765
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/qpid-bench
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/qpid-bench?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/qpid-bench (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/qpid-bench Mon Aug 15 11:26:46 2011
@@ -18,18 +18,6 @@
# under the License.
#
-if [ -z "$QPID_HOME" ]; then
- export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
- export PATH=${PATH}:${QPID_HOME}/bin
-fi
+. check-qpid-java-env
-# Set classpath to include Qpid jar with all required jars in manifest
-QPID_LIBS=$QPID_HOME/lib/qpid-all.jar
-
-# Set other variables used by the qpid-run script before calling
-export JAVA=java \
- JAVA_VM=-server \
- JAVA_MEM=-Xmx1024m \
- QPID_CLASSPATH=$QPID_LIBS
-
-. qpid-run org.apache.qpid.tools.QpidBench "$@"
+$JAVA -cp $CLASSPATH -server $JAVA_MEM $LOG_CONFIG org.apache.qpid.tools.QpidBench "$@"
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/qpid-bench
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/qpid-python-testkit
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/qpid-python-testkit?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/qpid-python-testkit (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/qpid-python-testkit Mon Aug 15 11:26:46 2011
@@ -22,9 +22,12 @@
# via the python test runner. The defaults are set for a running
# from an svn checkout
-. ./set-testkit-env.sh
+. check-qpid-java-env
export PYTHONPATH=./:$PYTHONPATH
-rm -rf $OUTDIR
-qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@"
-
+echo $PYTHONPATH
+if [ "$OUTDIR" = "" ] ; then
+ OUTDIR=$PWD
+fi
+testdir=$OUTDIR/testkit-out-`date +%F-%H-%M-%S`
+qpid-python-test -m testkit -DOUTDIR=$testdir"$@"
Copied: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/run-pub (from r1150000, qpid/trunk/qpid/java/tools/bin/run-pub)
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/run-pub?p2=qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/run-pub&p1=qpid/trunk/qpid/java/tools/bin/run-pub&r1=1150000&r2=1157780&rev=1157780&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/bin/run-pub (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/run-pub Mon Aug 15 11:26:46 2011
@@ -20,5 +20,9 @@
. check-qpid-java-env
-echo "$@"
-$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $@ org.apache.qpid.tools.PerfProducer
+JVM_ARGS="$1"
+PROGRAM_ARGS="$2"
+
+echo "JVM ARGS : $JAVA_MEM $JVM_ARGS"
+echo "PROGRAM ARGS : $PROGRAM_ARGS"
+$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $JVM_ARGS org.apache.qpid.tools.PerfProducer $PROGRAM_ARGS
Copied: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/run-sub (from r1150000, qpid/trunk/qpid/java/tools/bin/run-sub)
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/run-sub?p2=qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/run-sub&p1=qpid/trunk/qpid/java/tools/bin/run-sub&r1=1150000&r2=1157780&rev=1157780&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/bin/run-sub (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/bin/run-sub Mon Aug 15 11:26:46 2011
@@ -20,6 +20,13 @@
. check-qpid-java-env
-echo "$@"
-$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $@ org.apache.qpid.tools.PerfConsumer
+echo "All args $@"
+
+JVM_ARGS="$1"
+PROGRAM_ARGS="$2"
+
+echo "JVM ARGS : $JAVA_MEM $JVM_ARGS"
+echo "PROGRAM ARGS : $PROGRAM_ARGS"
+
+$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $JVM_ARGS org.apache.qpid.tools.PerfConsumer $PROGRAM_ARGS
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java Mon Aug 15 11:26:46 2011
@@ -77,7 +77,7 @@ public class LatencyTest extends PerfBas
public LatencyTest()
{
- super();
+ super("");
warmedUp = lock.newCondition();
testCompleted = lock.newCondition();
// Storing the following two for efficiency
@@ -314,7 +314,7 @@ public class LatencyTest extends PerfBas
public static void main(String[] args)
{
- final LatencyTest latencyTest = new LatencyTest();
+ final LatencyTest latencyTest = new LatencyTest();
Runnable r = new Runnable()
{
public void run()
@@ -334,16 +334,16 @@ public class LatencyTest extends PerfBas
}
}
};
-
+
Thread t;
try
{
- t = Threading.getThreadFactory().createThread(r);
+ t = Threading.getThreadFactory().createThread(r);
}
catch(Exception e)
{
throw new Error("Error creating latency test thread",e);
}
- t.start();
+ t.start();
}
-}
\ No newline at end of file
+}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java Mon Aug 15 11:26:46 2011
@@ -20,36 +20,113 @@
*/
package org.apache.qpid.tools;
+import java.net.InetAddress;
import java.text.DecimalFormat;
-import java.util.Hashtable;
+import java.util.UUID;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
public class PerfBase
{
+ public final static String CODE = "CODE";
+ public final static String ID = "ID";
+ public final static String REPLY_ADDR = "REPLY_ADDR";
+ public final static String MAX_LATENCY = "MAX_LATENCY";
+ public final static String MIN_LATENCY = "MIN_LATENCY";
+ public final static String AVG_LATENCY = "AVG_LATENCY";
+ public final static String STD_DEV = "STD_DEV";
+ public final static String CONS_RATE = "CONS_RATE";
+ public final static String PROD_RATE = "PROD_RATE";
+ public final static String MSG_COUNT = "MSG_COUNT";
+ public final static String TIMESTAMP = "Timestamp";
+
+ String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
+
TestParams params;
Connection con;
Session session;
+ Session controllerSession;
Destination dest;
- Destination feedbackDest;
+ Destination myControlQueue;
+ Destination controllerQueue;
DecimalFormat df = new DecimalFormat("###.##");
+ String id;
+ String myControlQueueAddr;
+
+ MessageProducer sendToController;
+ MessageConsumer receiveFromController;
+ String prefix = "";
+
+ enum OPCode {
+ REGISTER_CONSUMER, REGISTER_PRODUCER,
+ PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
+ CONSUMER_READY, PRODUCER_READY,
+ PRODUCER_START,
+ RECEIVED_END_MSG, CONSUMER_STOP,
+ RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS,
+ CONTINUE_TEST, STOP_TEST
+ };
+
+ enum MessageType {
+ BYTES, TEXT, MAP, OBJECT;
+
+ public static MessageType getType(String s) throws Exception
+ {
+ if ("text".equalsIgnoreCase(s))
+ {
+ return TEXT;
+ }
+ else if ("bytes".equalsIgnoreCase(s))
+ {
+ return BYTES;
+ }
+ /*else if ("map".equalsIgnoreCase(s))
+ {
+ return MAP;
+ }
+ else if ("object".equalsIgnoreCase(s))
+ {
+ return OBJECT;
+ }*/
+ else
+ {
+ throw new Exception("Unsupported message type");
+ }
+ }
+ };
+
+ MessageType msgType = MessageType.BYTES;
- public PerfBase()
+ public PerfBase(String prefix)
{
params = new TestParams();
+ String host = "";
+ try
+ {
+ host = InetAddress.getLocalHost().getHostName();
+ }
+ catch (Exception e)
+ {
+ }
+ id = host + "-" + UUID.randomUUID().toString();
+ this.prefix = prefix;
+ this.myControlQueueAddr = id + ";{create: always}";
}
public void setUp() throws Exception
- {
-
+ {
if (params.getHost().equals("") || params.getPort() == -1)
{
con = new AMQConnection(params.getUrl());
@@ -62,7 +139,78 @@ public class PerfBase
session = con.createSession(params.isTransacted(),
params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
- dest = new AMQAnyDestination(params.getAddress());
+ controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ dest = createDestination();
+ controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
+ myControlQueue = session.createQueue(myControlQueueAddr);
+ msgType = MessageType.getType(params.getMessageType());
+ System.out.println("Using " + msgType + " messages");
+
+ sendToController = controllerSession.createProducer(controllerQueue);
+ receiveFromController = controllerSession.createConsumer(myControlQueue);
+ }
+
+ private Destination createDestination() throws Exception
+ {
+ if (params.isUseUniqueDests())
+ {
+ System.out.println("Prefix : " + prefix);
+ Address addr = Address.parse(params.getAddress());
+ AMQAnyDestination temp = new AMQAnyDestination(params.getAddress());
+ int type = ((AMQSession_0_10)session).resolveAddressType(temp);
+
+ if ( type == AMQDestination.TOPIC_TYPE)
+ {
+ addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions());
+ System.out.println("Setting subject : " + addr);
+ }
+ else
+ {
+ addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions());
+ System.out.println("Setting name : " + addr);
+ }
+
+ return new AMQAnyDestination(addr);
+ }
+ else
+ {
+ return new AMQAnyDestination(params.getAddress());
+ }
+ }
+
+ public synchronized void sendMessageToController(MapMessage m) throws Exception
+ {
+ m.setString(ID, id);
+ m.setString(REPLY_ADDR,myControlQueueAddr);
+ sendToController.send(m);
+ }
+
+ public void receiveFromController(OPCode expected) throws Exception
+ {
+ MapMessage m = (MapMessage)receiveFromController.receive();
+ OPCode code = OPCode.values()[m.getInt(CODE)];
+ System.out.println("Received Code : " + code);
+ if (expected != code)
+ {
+ throw new Exception("Expected OPCode : " + expected + " but received : " + code);
+ }
+
+ }
+
+ public boolean continueTest() throws Exception
+ {
+ MapMessage m = (MapMessage)receiveFromController.receive();
+ OPCode code = OPCode.values()[m.getInt(CODE)];
+ System.out.println("Received Code : " + code);
+ return (code == OPCode.CONTINUE_TEST);
+ }
+
+ public void tearDown() throws Exception
+ {
+ session.close();
+ controllerSession.close();
+ con.close();
}
public void handleError(Exception e,String msg)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java Mon Aug 15 11:26:46 2011
@@ -20,13 +20,17 @@
*/
package org.apache.qpid.tools;
-import javax.jms.Destination;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
import javax.jms.TextMessage;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
@@ -47,7 +51,7 @@ import org.apache.qpid.thread.Threading;
* b) They are on separate machines that have their time synced via a Time Server
*
* In order to calculate latency the producer inserts a timestamp
- * hen the message is sent. The consumer will note the current time the message is
+ * when the message is sent. The consumer will note the current time the message is
* received and will calculate the latency as follows
* latency = rcvdTime - msg.getJMSTimestamp()
*
@@ -55,13 +59,9 @@ import org.apache.qpid.thread.Threading;
* variance in latencies.
*
* Avg latency is measured by adding all latencies and dividing by the total msgs.
- * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
*
* Throughput
* ===========
- * System throughput is calculated as follows
- * rcvdMsgCount/(rcvdTime - testStartTime)
- *
* Consumer rate is calculated as
* rcvdMsgCount/(rcvdTime - startTime)
*
@@ -81,130 +81,160 @@ public class PerfConsumer extends PerfBa
long minLatency = Long.MAX_VALUE;
long totalLatency = 0; // to calculate avg latency.
int rcvdMsgCount = 0;
- long testStartTime = 0; // to measure system throughput
long startTime = 0; // to measure consumer throughput
long rcvdTime = 0;
boolean transacted = false;
int transSize = 0;
+ boolean printStdDev = false;
+ List<Long> sample;
+
final Object lock = new Object();
- public PerfConsumer()
+ public PerfConsumer(String prefix)
{
- super();
+ super(prefix);
+ System.out.println("Consumer ID : " + id);
}
public void setUp() throws Exception
{
super.setUp();
consumer = session.createConsumer(dest);
+ System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n");
// Storing the following two for efficiency
transacted = params.isTransacted();
transSize = params.getTransactionSize();
+ printStdDev = params.isPrintStdDev();
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
+ sendMessageToController(m);
}
public void warmup()throws Exception
{
- System.out.println("Warming up......");
-
- boolean start = false;
- while (!start)
+ receiveFromController(OPCode.CONSUMER_STARTWARMUP);
+ Message msg = consumer.receive();
+ // This is to ensure we drain the queue before we start the actual test.
+ while ( msg != null)
{
- Message msg = consumer.receive();
- if (msg instanceof TextMessage)
+ if (msg.getBooleanProperty("End") == true)
{
- if (((TextMessage)msg).getText().equals("End"))
- {
- start = true;
- MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
- temp.send(session.createMessage());
- if (params.isTransacted())
- {
- session.commit();
- }
- temp.close();
- }
+ // It's more realistic for the consumer to signal this.
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
+ sendMessageToController(m);
}
+ msg = consumer.receive(1000);
+ }
+
+ if (params.isTransacted())
+ {
+ session.commit();
}
+
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
+ sendMessageToController(m);
+ consumer.setMessageListener(this);
}
public void startTest() throws Exception
{
- System.out.println("Starting test......");
- consumer.setMessageListener(this);
+ System.out.println("Consumer: " + id + " Starting test......" + "\n");
+ resetCounters();
}
- public void printResults() throws Exception
+ public void resetCounters()
{
- synchronized (lock)
+ rcvdMsgCount = 0;
+ maxLatency = 0;
+ minLatency = Long.MAX_VALUE;
+ totalLatency = 0;
+ if (printStdDev)
{
- lock.wait();
+ sample = null;
+ sample = new ArrayList<Long>(params.getMsgCount());
}
+ }
+
+ public void sendResults() throws Exception
+ {
+ receiveFromController(OPCode.CONSUMER_STOP);
double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
- double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
+ double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime);
+ double stdDev = 0.0;
+ if (printStdDev)
+ {
+ stdDev = calculateStdDev(avgLatency);
+ }
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
+ m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs());
+ m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs());
+ m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs());
+ m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs());
+ m.setDouble(CONS_RATE, consRate);
+ m.setLong(MSG_COUNT, rcvdMsgCount);
+ sendMessageToController(m);
+
System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
System.out.println(new StringBuilder("Consumer rate : ").
append(df.format(consRate)).
append(" msg/sec").toString());
- System.out.println(new StringBuilder("System Throughput : ").
- append(df.format(throughput)).
- append(" msg/sec").toString());
System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency)).
+ append(df.format(avgLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
System.out.println(new StringBuilder("Min Latency : ").
- append(minLatency).
+ append(df.format(minLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
System.out.println(new StringBuilder("Max Latency : ").
- append(maxLatency).
+ append(df.format(maxLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
- System.out.println("Completed the test......\n");
- }
-
- public void notifyCompletion(Destination replyTo) throws Exception
- {
- MessageProducer tmp = session.createProducer(replyTo);
- Message endMsg = session.createMessage();
- tmp.send(endMsg);
- if (params.isTransacted())
+ if (printStdDev)
{
- session.commit();
+ System.out.println(new StringBuilder("Std Dev : ").
+ append(stdDev/Clock.convertToMiliSecs()).toString());
}
- tmp.close();
}
- public void tearDown() throws Exception
+ public double calculateStdDev(double mean)
{
- consumer.close();
- session.close();
- con.close();
+ double v = 0;
+ for (double latency: sample)
+ {
+ v = v + Math.pow((latency-mean), 2);
+ }
+ v = v/sample.size();
+ return Math.round(Math.sqrt(v));
}
public void onMessage(Message msg)
{
try
{
- if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+ // To figure out the decoding overhead of text
+ if (msgType == MessageType.TEXT)
{
- notifyCompletion(msg.getJMSReplyTo());
+ ((TextMessage)msg).getText();
+ }
- synchronized (lock)
- {
- lock.notifyAll();
- }
+ if (msg.getBooleanProperty("End"))
+ {
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
+ sendMessageToController(m);
}
else
{
- rcvdTime = System.currentTimeMillis();
+ rcvdTime = Clock.getTime();
rcvdMsgCount ++;
if (rcvdMsgCount == 1)
{
startTime = rcvdTime;
- testStartTime = msg.getJMSTimestamp();
}
if (transacted && (rcvdMsgCount % transSize == 0))
@@ -212,10 +242,14 @@ public class PerfConsumer extends PerfBa
session.commit();
}
- long latency = rcvdTime - msg.getJMSTimestamp();
+ long latency = rcvdTime - msg.getLongProperty(TIMESTAMP);
maxLatency = Math.max(maxLatency, latency);
minLatency = Math.min(minLatency, latency);
totalLatency = totalLatency + latency;
+ if (printStdDev)
+ {
+ sample.add(latency);
+ }
}
}
@@ -226,14 +260,21 @@ public class PerfConsumer extends PerfBa
}
- public void test()
+ public void run()
{
try
{
setUp();
warmup();
- startTest();
- printResults();
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ System.out.println("=========================================================\n");
+ System.out.println("Consumer: " + id + " starting a new iteration ......\n");
+ startTest();
+ sendResults();
+ nextIteration = continueTest();
+ }
tearDown();
}
catch(Exception e)
@@ -242,26 +283,43 @@ public class PerfConsumer extends PerfBa
}
}
- public static void main(String[] args)
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public static void main(String[] args) throws InterruptedException
{
- final PerfConsumer cons = new PerfConsumer();
- Runnable r = new Runnable()
+ String scriptId = (args.length == 1) ? args[0] : "";
+ int conCount = Integer.getInteger("con_count",1);
+ final CountDownLatch testCompleted = new CountDownLatch(conCount);
+ for (int i=0; i < conCount; i++)
{
- public void run()
+
+ final PerfConsumer cons = new PerfConsumer(scriptId + i);
+ Runnable r = new Runnable()
{
- cons.test();
+ public void run()
+ {
+ cons.run();
+ testCompleted.countDown();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
}
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+ t.start();
+
}
- t.start();
+ testCompleted.await();
+ System.out.println("Consumers have completed the test......\n");
}
}
\ No newline at end of file
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java Mon Aug 15 11:26:46 2011
@@ -23,13 +23,15 @@ package org.apache.qpid.tools;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
+import javax.jms.MapMessage;
import javax.jms.Message;
-import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
@@ -51,38 +53,52 @@ import org.apache.qpid.thread.Threading;
* System throughput and latencies calculated by the PerfConsumer are more realistic
* numbers.
*
+ * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs
+ * I have done so far, it seems quite useful to compute the producer rate as it gives an
+ * indication of how the system behaves. For ex if there is a gap between producer and consumer rates
+ * you could clearly see the higher latencies and when producer and consumer rates are very close,
+ * latency is good.
+ *
*/
public class PerfProducer extends PerfBase
{
+ private static long SEC = 60000;
+
MessageProducer producer;
Message msg;
- byte[] payload;
- List<byte[]> payloads;
+ Object payload;
+ List<Object> payloads;
boolean cacheMsg = false;
boolean randomMsgSize = false;
boolean durable = false;
Random random;
int msgSizeRange = 1024;
-
- public PerfProducer()
+ boolean rateLimitProducer = false;
+ double rateFactor = 0.4;
+ double rate = 0.0;
+
+ public PerfProducer(String prefix)
{
- super();
+ super(prefix);
+ System.out.println("Producer ID : " + id);
}
public void setUp() throws Exception
{
super.setUp();
- feedbackDest = session.createTemporaryQueue();
-
durable = params.isDurable();
-
+ rateLimitProducer = params.getRate() > 0 ? true : false;
+ if (rateLimitProducer)
+ {
+ System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec");
+ }
+
// if message caching is enabled we pre create the message
// else we pre create the payload
if (params.isCacheMessage())
{
cacheMsg = true;
-
- msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+ msg = createMessage(createPayload(params.getMsgSize()));
msg.setJMSDeliveryMode(durable?
DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT
@@ -93,21 +109,52 @@ public class PerfProducer extends PerfBa
random = new Random(20080921);
randomMsgSize = true;
msgSizeRange = params.getMsgSize();
- payloads = new ArrayList<byte[]>(msgSizeRange);
-
+ payloads = new ArrayList<Object>(msgSizeRange);
+
for (int i=0; i < msgSizeRange; i++)
{
- payloads.add(MessageFactory.createMessagePayload(i).getBytes());
+ payloads.add(createPayload(i));
}
- }
+ }
else
{
- payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+ payload = createPayload(params.getMsgSize());
}
producer = session.createProducer(dest);
+ System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName());
producer.setDisableMessageID(params.isDisableMessageID());
producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
+ sendMessageToController(m);
+ }
+
+ Object createPayload(int size)
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return MessageFactory.createMessagePayload(size);
+ }
+ else
+ {
+ return MessageFactory.createMessagePayload(size).getBytes();
+ }
+ }
+
+ Message createMessage(Object payload) throws Exception
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return session.createTextMessage((String)payload);
+ }
+ else
+ {
+ BytesMessage m = session.createBytesMessage();
+ m.writeBytes((byte[])payload);
+ return m;
+ }
}
protected Message getNextMessage() throws Exception
@@ -117,117 +164,130 @@ public class PerfProducer extends PerfBa
return msg;
}
else
- {
- msg = session.createBytesMessage();
-
+ {
+ Message m;
+
if (!randomMsgSize)
{
- ((BytesMessage)msg).writeBytes(payload);
+ m = createMessage(payload);
}
else
{
- ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
+ m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
}
- msg.setJMSDeliveryMode(durable?
+ m.setJMSDeliveryMode(durable?
DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT
);
- return msg;
+ return m;
}
}
public void warmup()throws Exception
{
- System.out.println("Warming up......");
- MessageConsumer tmp = session.createConsumer(feedbackDest);
+ receiveFromController(OPCode.PRODUCER_STARTWARMUP);
+ System.out.println("Producer: " + id + " Warming up......");
for (int i=0; i < params.getWarmupCount() -1; i++)
{
producer.send(getNextMessage());
}
- Message msg = session.createTextMessage("End");
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.receive();
+ sendEndMessage();
if (params.isTransacted())
{
session.commit();
}
-
- tmp.close();
}
public void startTest() throws Exception
{
- System.out.println("Starting test......");
+ resetCounters();
+ receiveFromController(OPCode.PRODUCER_START);
int count = params.getMsgCount();
boolean transacted = params.isTransacted();
int tranSize = params.getTransactionSize();
- long start = System.currentTimeMillis();
+ long limit = (long)(params.getRate() * rateFactor); // in msecs
+ long timeLimit = (long)(SEC * rateFactor); // in msecs
+
+ long start = Clock.getTime(); // defaults to nano secs
+ long interval = start;
for(int i=0; i < count; i++ )
{
Message msg = getNextMessage();
- msg.setJMSTimestamp(System.currentTimeMillis());
+ msg.setLongProperty(TIMESTAMP, Clock.getTime());
producer.send(msg);
if ( transacted && ((i+1) % tranSize == 0))
{
session.commit();
}
+
+ if (rateLimitProducer && i%limit == 0)
+ {
+ long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs
+ if (elapsed < timeLimit)
+ {
+ Thread.sleep(elapsed);
+ }
+ interval = Clock.getTime();
+
+ }
+ }
+ sendEndMessage();
+ if ( transacted)
+ {
+ session.commit();
}
- long time = System.currentTimeMillis() - start;
- double rate = ((double)count/(double)time)*1000;
+ long time = Clock.getTime() - start;
+ rate = (double)count*Clock.convertToSecs()/(double)time;
System.out.println(new StringBuilder("Producer rate: ").
append(df.format(rate)).
append(" msg/sec").
toString());
}
- public void waitForCompletion() throws Exception
+ public void resetCounters()
{
- MessageConsumer tmp = session.createConsumer(feedbackDest);
- Message msg = session.createTextMessage("End");
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
- tmp.receive();
+ }
- if (params.isTransacted())
- {
- session.commit();
- }
+ public void sendEndMessage() throws Exception
+ {
+ Message msg = session.createMessage();
+ msg.setBooleanProperty("End", true);
+ producer.send(msg);
+ }
- tmp.close();
- System.out.println("Consumer has completed the test......");
+ public void sendResults() throws Exception
+ {
+ MapMessage msg = controllerSession.createMapMessage();
+ msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
+ msg.setDouble(PROD_RATE, rate);
+ sendMessageToController(msg);
}
+ @Override
public void tearDown() throws Exception
{
- producer.close();
- session.close();
- con.close();
+ super.tearDown();
}
- public void test()
+ public void run()
{
try
{
setUp();
warmup();
- startTest();
- waitForCompletion();
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ System.out.println("=========================================================\n");
+ System.out.println("Producer: " + id + " starting a new iteration ......\n");
+ startTest();
+ sendResults();
+ nextIteration = continueTest();
+ }
tearDown();
}
catch(Exception e)
@@ -236,27 +296,63 @@ public class PerfProducer extends PerfBa
}
}
-
- public static void main(String[] args)
+ public void startControllerIfNeeded()
{
- final PerfProducer prod = new PerfProducer();
- Runnable r = new Runnable()
+ if (!params.isExternalController())
{
- public void run()
+ final PerfTestController controller = new PerfTestController();
+ Runnable r = new Runnable()
{
- prod.test();
+ public void run()
+ {
+ controller.run();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
}
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
+ catch(Exception e)
+ {
+ throw new Error("Error creating controller thread",e);
+ }
+ t.start();
}
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
+ }
+
+
+ public static void main(String[] args) throws InterruptedException
+ {
+ String scriptId = (args.length == 1) ? args[0] : "";
+ int conCount = Integer.getInteger("con_count",1);
+ final CountDownLatch testCompleted = new CountDownLatch(conCount);
+ for (int i=0; i < conCount; i++)
+ {
+ final PerfProducer prod = new PerfProducer(scriptId + i);
+ prod.startControllerIfNeeded();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ prod.run();
+ testCompleted.countDown();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
+ t.start();
}
- t.start();
+ testCompleted.await();
+ System.out.println("Producers have completed the test......");
}
}
\ No newline at end of file
Copied: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java (from r1150000, qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java)
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java?p2=qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java&p1=qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java&r1=1150000&r2=1157780&rev=1157780&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java Mon Aug 15 11:26:46 2011
@@ -1,5 +1,6 @@
package org.apache.qpid.tools;
+import java.io.FileWriter;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -13,8 +14,40 @@ import javax.jms.MessageProducer;
import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+/**
+ * The Controller coordinates a test run between a number
+ * of producers and consumers, configured via -Dprod_count and -Dcons_count.
+ *
+ * It waits till all the producers and consumers have registered and then
+ * conducts a warmup run. Once all consumers and producers have completed
+ * the warmup run and is ready, it will conduct the actual test run and
+ * collect all stats from the participants and calculates the system
+ * throughput, the avg/min/max for producer rates, consumer rates and latency.
+ *
+ * These stats are then printed to std out.
+ * The Controller also prints events to std out to give a running account
+ * of the test run in progress. Ex registering of participants, starting warmup ..etc.
+ * This allows a scripting tool to monitor the progress.
+ *
+ * The Controller can be run in two modes.
+ * 1. A single test run (default) where it just runs until the message count specified
+ * for the producers via -Dmsg_count is sent and received.
+ *
+ * 2. Time based, configured via -Dduration=x, where x is in mins.
+ * In this mode, the Controller repeatedly cycles through the tests (after an initial
+ * warmup run) until the desired time is reached. If a test run is in progress
+ * and the time is up, it will allow the run the complete.
+ *
+ * After each iteration, the stats will be printed out in csv format to a separate log file.
+ * System throughput is calculated as follows
+ * totalMsgCount/(totalTestTime)
+ */
public class PerfTestController extends PerfBase implements MessageListener
{
+ enum TestMode { SINGLE_RUN, TIME_BASED };
+
+ TestMode testMode = TestMode.SINGLE_RUN;
+
long totalTestTime;
private double avgSystemLatency = 0.0;
@@ -35,6 +68,7 @@ public class PerfTestController extends
private int consumerCount = Integer.getInteger("cons_count", 1);
private int producerCount = Integer.getInteger("prod_count", 1);
+ private int duration = Integer.getInteger("duration", -1); // in mins
private Map<String,MapMessage> consumers;
private Map<String,MapMessage> producers;
@@ -48,10 +82,11 @@ public class PerfTestController extends
private MessageConsumer consumer;
private boolean printStdDev = false;
+ FileWriter writer;
public PerfTestController()
{
- super();
+ super("");
consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
@@ -59,16 +94,20 @@ public class PerfTestController extends
prodRegistered = new CountDownLatch(producerCount);
consReady = new CountDownLatch(consumerCount);
prodReady = new CountDownLatch(producerCount);
- receivedConsStats = new CountDownLatch(consumerCount);
- receivedProdStats = new CountDownLatch(producerCount);
- receivedEndMsg = new CountDownLatch(producerCount);
printStdDev = params.isPrintStdDev();
+ testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED;
}
public void setUp() throws Exception
{
super.setUp();
+ if (testMode == TestMode.TIME_BASED)
+ {
+ writer = new FileWriter("stats-csv.log");
+ }
consumer = controllerSession.createConsumer(controllerQueue);
+ System.out.println("\nController: " + producerCount + " producers are expected");
+ System.out.println("Controller: " + consumerCount + " consumers are expected \n");
consumer.setMessageListener(this);
consRegistered.await();
prodRegistered.await();
@@ -87,6 +126,7 @@ public class PerfTestController extends
public void startTest() throws Exception
{
+ resetCounters();
System.out.println("\nController Starting test......");
long start = Clock.getTime();
sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
@@ -97,6 +137,22 @@ public class PerfTestController extends
receivedConsStats.await();
}
+ public void resetCounters()
+ {
+ minSystemLatency = Double.MAX_VALUE;
+ maxSystemLatency = 0;
+ maxSystemConsRate = 0.0;
+ minSystemConsRate = Double.MAX_VALUE;
+ maxSystemProdRate = 0.0;
+ minSystemProdRate = Double.MAX_VALUE;
+
+ totalMsgCount = 0;
+
+ receivedConsStats = new CountDownLatch(consumerCount);
+ receivedProdStats = new CountDownLatch(producerCount);
+ receivedEndMsg = new CountDownLatch(producerCount);
+ }
+
public void calcStats() throws Exception
{
double totLatency = 0.0;
@@ -194,7 +250,6 @@ public class PerfTestController extends
System.out.println(new StringBuilder("Avg System Std Dev : ").
append(avgSystemLatencyStdDev));
}
- System.out.println("Controller: Completed the test......\n");
}
private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception
@@ -230,11 +285,23 @@ public class PerfTestController extends
switch (code)
{
case REGISTER_CONSUMER :
+ if (consRegistered.getCount() == 0)
+ {
+ System.out.println("Warning : Expected number of consumers have already registered," +
+ "ignoring extra consumer");
+ break;
+ }
consumers.put(m.getString(ID),m);
consRegistered.countDown();
break;
case REGISTER_PRODUCER :
+ if (prodRegistered.getCount() == 0)
+ {
+ System.out.println("Warning : Expected number of producers have already registered," +
+ "ignoring extra producer");
+ break;
+ }
producers.put(m.getString(ID),m);
prodRegistered.countDown();
break;
@@ -277,10 +344,36 @@ public class PerfTestController extends
{
setUp();
warmup();
- startTest();
- calcStats();
- printResults();
+ if (testMode == TestMode.SINGLE_RUN)
+ {
+ startTest();
+ calcStats();
+ printResults();
+ }
+ else
+ {
+ long startTime = Clock.getTime();
+ long timeLimit = duration * 60 * 1000; // duration is in mins.
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ startTest();
+ calcStats();
+ writeStatsToFile();
+ if (Clock.getTime() - startTime < timeLimit)
+ {
+ sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values());
+ sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values());
+ nextIteration = true;
+ }
+ else
+ {
+ nextIteration = false;
+ }
+ }
+ }
tearDown();
+
}
catch(Exception e)
{
@@ -288,6 +381,39 @@ public class PerfTestController extends
}
}
+ @Override
+ public void tearDown() throws Exception {
+ System.out.println("Controller: Completed the test......\n");
+ if (testMode == TestMode.TIME_BASED)
+ {
+ writer.close();
+ }
+ sendMessageToNodes(OPCode.STOP_TEST,consumers.values());
+ sendMessageToNodes(OPCode.STOP_TEST,producers.values());
+ super.tearDown();
+ }
+
+ public void writeStatsToFile() throws Exception
+ {
+ writer.append(String.valueOf(totalMsgCount)).append(",");
+ writer.append(df.format(totalSystemThroughput)).append(",");
+ writer.append(df.format(avgSystemConsRate)).append(",");
+ writer.append(df.format(minSystemConsRate)).append(",");
+ writer.append(df.format(maxSystemConsRate)).append(",");
+ writer.append(df.format(avgSystemProdRate)).append(",");
+ writer.append(df.format(minSystemProdRate)).append(",");
+ writer.append(df.format(maxSystemProdRate)).append(",");
+ writer.append(df.format(avgSystemLatency)).append(",");
+ writer.append(df.format(minSystemLatency)).append(",");
+ writer.append(df.format(maxSystemLatency));
+ if (printStdDev)
+ {
+ writer.append(",").append(String.valueOf(avgSystemLatencyStdDev));
+ }
+ writer.append("\n");
+ writer.flush();
+ }
+
public static void main(String[] args)
{
PerfTestController controller = new PerfTestController();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java?rev=1157780&r1=1157779&r2=1157780&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java Mon Aug 15 11:26:46 2011
@@ -25,25 +25,25 @@ import javax.jms.Session;
public class TestParams
{
/*
- * By default the connection URL is used.
+ * By default the connection URL is used.
* This allows a user to easily specify a fully fledged URL any given property.
* Ex. SSL parameters
- *
+ *
* By providing a host & port allows a user to simply override the URL.
* This allows to create multiple clients in test scripts easily,
- * without having to deal with the long URL format.
+ * without having to deal with the long URL format.
*/
private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
-
+
private String host = "";
-
+
private int port = -1;
private String address = "queue; {create : always}";
private int msg_size = 1024;
- private int msg_type = 1; // not used yet
+ private int random_msg_size_start_from = 1;
private boolean cacheMessage = false;
@@ -62,19 +62,28 @@ public class TestParams
private int msg_count = 10;
private int warmup_count = 1;
-
+
private boolean random_msg_size = false;
+ private String msgType = "bytes";
+
+ private boolean printStdDev = false;
+
+ private long rate = -1;
+
+ private boolean externalController = false;
+
+ private boolean useUniqueDest = false; // useful when using multiple connections.
+
public TestParams()
{
-
+
url = System.getProperty("url",url);
host = System.getProperty("host","");
port = Integer.getInteger("port", -1);
- address = System.getProperty("address","queue");
+ address = System.getProperty("address",address);
msg_size = Integer.getInteger("msg_size", 1024);
- msg_type = Integer.getInteger("msg_type",1);
cacheMessage = Boolean.getBoolean("cache_msg");
disableMessageID = Boolean.getBoolean("disableMessageID");
disableTimestamp = Boolean.getBoolean("disableTimestamp");
@@ -85,6 +94,12 @@ public class TestParams
msg_count = Integer.getInteger("msg_count",msg_count);
warmup_count = Integer.getInteger("warmup_count",warmup_count);
random_msg_size = Boolean.getBoolean("random_msg_size");
+ msgType = System.getProperty("msg_type","bytes");
+ printStdDev = Boolean.getBoolean("print_std_dev");
+ rate = Long.getLong("rate",-1);
+ externalController = Boolean.getBoolean("ext_controller");
+ useUniqueDest = Boolean.getBoolean("use_unique_dest");
+ random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1);
}
public String getUrl()
@@ -122,9 +137,9 @@ public class TestParams
return msg_size;
}
- public int getMsgType()
+ public int getRandomMsgSizeStartFrom()
{
- return msg_type;
+ return random_msg_size_start_from;
}
public boolean isDurable()
@@ -161,10 +176,39 @@ public class TestParams
{
return disableTimestamp;
}
-
+
public boolean isRandomMsgSize()
{
return random_msg_size;
}
+ public String getMessageType()
+ {
+ return msgType;
+ }
+
+ public boolean isPrintStdDev()
+ {
+ return printStdDev;
+ }
+
+ public long getRate()
+ {
+ return rate;
+ }
+
+ public boolean isExternalController()
+ {
+ return externalController;
+ }
+
+ public void setAddress(String addr)
+ {
+ address = addr;
+ }
+
+ public boolean isUseUniqueDests()
+ {
+ return useUniqueDest;
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org