You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/10/22 15:33:42 UTC
[4/5] qpid-broker-j git commit: QPID-7984: [Qpid Broker-J] [Tools]
Remove unused tools.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/bin/testkit.py
----------------------------------------------------------------------
diff --git a/tools/bin/testkit.py b/tools/bin/testkit.py
deleted file mode 100755
index 1095f78..0000000
--- a/tools/bin/testkit.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import time, string, traceback
-from brokertest import *
-from qpid.messaging import *
-
-
-try:
- import java.lang.System
- _cp = java.lang.System.getProperty("java.class.path");
-except ImportError:
- _cp = checkenv("QP_CP")
-
-class Formatter:
-
- def __init__(self, message):
- self.message = message
- self.environ = {"M": self.message,
- "P": self.message.properties,
- "C": self.message.content}
-
- def __getitem__(self, st):
- return eval(st, self.environ)
-
-# The base test case has support for launching the generic
-# receiver and sender through the TestLauncher with all the options.
-#
-class JavaClientTest(BrokerTest):
- """Base Case for Java Test cases"""
-
- client_class = "org.apache.qpid.testkit.TestLauncher"
-
- # currently there is no transparent reconnection.
- # temp hack: just creating the queue here and closing it.
- def start_error_watcher(self,broker=None):
- ssn = broker.connect().session()
- err_watcher = ssn.receiver("control; {create:always}", capacity=1)
- ssn.close()
-
- def store_module_args(self):
- if BrokerTest.store_lib:
- return ["--load-module", BrokerTest.store_lib]
- else:
- print "Store module not present."
- return [""]
-
- def client(self,**options):
- cmd = ["java","-cp",_cp]
-
- cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")]
- cmd += ["-Dhost=" + options.get("host","127.0.0.1")]
- cmd += ["-Dport=" + str(options.get("port",5672))]
- cmd += ["-Dcon_count=" + str(options.get("con_count",1))]
- cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))]
- cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))]
- cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))]
- cmd += ["-Ddurable=" + str(options.get("durable",False))]
- cmd += ["-Dtransacted=" + str(options.get("transacted",False))]
- cmd += ["-Dreceiver=" + str(options.get("receiver",False))]
- cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))]
- cmd += ["-Dsender=" + str(options.get("sender",False))]
- cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))]
- cmd += ["-Dtx_size=" + str(options.get("tx_size",10))]
- cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))]
- cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))]
- cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))]
- cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))]
- cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))]
- cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")]
- cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))]
- cmd += ["-Dlog.level=" + options.get("log.level", "warn")]
- cmd += [self.client_class]
- cmd += [options.get("address", "my_queue; {create: always}")]
-
- #print str(options.get("port",5672))
- return cmd
-
- # currently there is no transparent reconnection.
- # temp hack: just creating a receiver and closing session soon after.
- def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60):
- ssn = broker.connect().session()
- err_watcher = ssn.receiver("control; {create:always}", capacity=1)
- i = run_time/error_ck_freq
- is_error = False
- for j in range(i):
- not_empty = True
- while not_empty:
- try:
- m = err_watcher.fetch(timeout=error_ck_freq)
- ssn.acknowledge()
- print "Java process notified of an error"
- self.print_error(m)
- is_error = True
- except messaging.Empty, e:
- not_empty = False
-
- ssn.close()
- return is_error
-
- def print_error(self,msg):
- print msg.properties.get("exception-trace")
-
- def verify(self, receiver,sender):
- sender_running = receiver.is_running()
- receiver_running = sender.is_running()
-
- self.assertTrue(receiver_running,"Receiver has exited prematually")
- self.assertTrue(sender_running,"Sender has exited prematually")
-
- def start_sender_and_receiver(self,**options):
-
- receiver_opts = options
- receiver_opts["receiver"]=True
- receiver = self.popen(self.client(**receiver_opts),
- expect=EXPECT_RUNNING)
-
- sender_opts = options
- sender_opts["sender"]=True
- sender = self.popen(self.client(**sender_opts),
- expect=EXPECT_RUNNING)
-
- return receiver, sender
-
- def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options):
- if options.get("durable",False)==True:
- cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args())
- else:
- cluster = Cluster(self, count=count)
- return cluster
-
-class ConcurrencyTest(JavaClientTest):
- """A concurrency test suite for the JMS client"""
- skip = False
-
- def base_case(self,**options):
- if self.skip :
- print "Skipping test"
- return
-
- cluster = self.start_cluster(count=2,**options)
- self.start_error_watcher(broker=cluster[0])
- options["port"] = port=cluster[0].port()
-
- options["use_unique_dests"]=True
- options["address"]="amq.topic"
- receiver, sender = self.start_sender_and_receiver(**options)
- self.monitor_clients(broker=cluster[0],run_time=180)
- self.verify(receiver,sender)
-
- def test_multiplexing_con(self):
- """Tests multiple sessions on a single connection"""
-
- self.base_case(ssn_per_con=25,test_name=self.id())
-
- def test_multiplexing_con_with_tx(self):
- """Tests multiple transacted sessions on a single connection"""
-
- self.base_case(ssn_per_con=25,transacted=True,test_name=self.id())
-
- def test_multiplexing_con_with_sync_rcv(self):
- """Tests multiple sessions with sync receive"""
-
- self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id())
-
- def test_multiplexing_con_with_durable_sub(self):
- """Tests multiple sessions with durable subs"""
-
- self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id())
-
- def test_multiplexing_con_with_sync_ack(self):
- """Tests multiple sessions with sync ack"""
-
- self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id())
-
- def test_multiplexing_con_with_sync_pub(self):
- """Tests multiple sessions with sync pub"""
-
- self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id())
-
- def test_multiple_cons_and_ssns(self):
- """Tests multiple connections and sessions"""
-
- self.base_case(con_count=10,ssn_per_con=25,test_name=self.id())
-
-
-class SoakTest(JavaClientTest):
- """A soak test suite for the JMS client"""
-
- def base_case(self,**options):
- cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options)
- options["port"] = port=cluster[0].port()
- self.start_error_watcher(broker=cluster[0])
- options["use_unique_dests"]=True
- options["address"]="amq.topic"
- receiver,sender = self.start_sender_and_receiver(**options)
- is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30)
-
- if (is_error):
- print "The sender or receiver didn't start properly. Exiting test."
- return
- else:
- "Print no error !"
-
- # grace period for clients to get the failover properly setup.
- time.sleep(30)
- error_msg= None
- # Kill original brokers, start new ones.
- try:
- for i in range(8):
- cluster[i].kill()
- b=cluster.start()
- self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
- print "iteration : " + str(i)
- except ConnectError, e1:
- error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
-
- except SessionError, e2:
- error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
-
- self.verify(receiver,sender)
- if error_msg:
- raise Exception(error_msg)
-
-
- def test_failover(self) :
- """Test basic failover"""
-
- self.base_case(test_name=self.id())
-
-
- def test_failover_with_durablesub(self):
- """Test failover with durable subscriber"""
-
- self.base_case(durable=True,jms_durable_sub=True,test_name=self.id())
-
-
- def test_failover_with_sync_rcv(self):
- """Test failover with sync receive"""
-
- self.base_case(sync_rcv=True,test_name=self.id())
-
-
- def test_failover_with_sync_ack(self):
- """Test failover with sync ack"""
-
- self.base_case(sync_ack=True,test_name=self.id())
-
-
- def test_failover_with_noprefetch(self):
- """Test failover with no prefetch"""
-
- self.base_case(max_prefetch=1,test_name=self.id())
-
-
- def test_failover_with_multiple_cons_and_ssns(self):
- """Test failover with multiple connections and sessions"""
-
- self.base_case(use_unique_dests=True,address="amq.topic",
- con_count=10,ssn_per_con=25,test_name=self.id())
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/etc/perf-report.gnu
----------------------------------------------------------------------
diff --git a/tools/etc/perf-report.gnu b/tools/etc/perf-report.gnu
deleted file mode 100644
index b7662b0..0000000
--- a/tools/etc/perf-report.gnu
+++ /dev/null
@@ -1,61 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-set terminal png
-set datafile separator ","
-
-set title "Variation of avg latency between iterations"
-set yrange [10:20]
-set xlabel "Iterations"
-set ylabel "Latency (ms)"
-set output "avg_latency.png"
-plot "stats-csv.log" using 9 title "avg latency" with lines, 14 title "target latency" with lines
-
-
-set title "Variation of max latency between iterations"
-set yrange [0:1000]
-set xlabel "Iterations"
-set ylabel "Latency (ms)"
-set output "max_latency.png"
-plot "stats-csv.log" using 11 title "max latency" with lines,14 title "target latency" with lines,100 title "100 ms" with lines
-
-
-set title "Variation of standard deviation of latency between iterations"
-set yrange [0:20]
-set xlabel "Iterations"
-set ylabel "Standard Deviation"
-set output "std_dev_latency.png"
-plot "stats-csv.log" using 12 title "standard deviation" with lines
-
-
-set title "Variation of system throughput between iterations"
-set yrange [400000:450000]
-set xlabel "Iterations"
-set ylabel "System Throuhgput (msg/sec)"
-set output "system_rate.png"
-plot "stats-csv.log" using 2 title "system throughput" with lines
-
-
-set title "Variation of avg producer & consumer rates between iterations"
-set yrange [6500:7500]
-set xlabel "Iterations"
-set ylabel "Avg Rates (msg/sec)"
-set output "prod_cons_rate.png"
-plot "stats-csv.log" using 6 title "producer rate" with lines,"stats-csv.log" using 3 title "consumer rate" with lines
-
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/pom.xml
----------------------------------------------------------------------
diff --git a/tools/pom.xml b/tools/pom.xml
index 998e450..a6c92cb 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -30,12 +30,6 @@
<dependencies>
<dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-client</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/testkit/Client.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/testkit/Client.java b/tools/src/main/java/org/apache/qpid/testkit/Client.java
deleted file mode 100644
index 22dfa14..0000000
--- a/tools/src/main/java/org/apache/qpid/testkit/Client.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-public abstract class Client implements ExceptionListener
-{
- private Connection con;
- private Session ssn;
- private boolean durable = false;
- private boolean transacted = false;
- private int txSize = 10;
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
- private String contentType = "application/octet-stream";
-
- private long reportFrequency = 60000; // every min
-
- private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- private NumberFormat nf = new DecimalFormat("##.00");
-
- private long startTime = System.currentTimeMillis();
- private ErrorHandler errorHandler = null;
-
- public Client(Connection con) throws Exception
- {
- this.con = con;
- this.con.setExceptionListener(this);
- durable = Boolean.getBoolean("durable");
- transacted = Boolean.getBoolean("transacted");
- txSize = Integer.getInteger("tx_size",10);
- contentType = System.getProperty("content_type","application/octet-stream");
- reportFrequency = Long.getLong("report_frequency", 60000);
- }
-
- public void close()
- {
- try
- {
- con.close();
- }
- catch (Exception e)
- {
- handleError("Error closing connection",e);
- }
- }
-
- @Override
- public void onException(JMSException e)
- {
- handleError("Connection error",e);
- }
-
- public void setErrorHandler(ErrorHandler h)
- {
- this.errorHandler = h;
- }
-
- public void handleError(String msg,Exception e)
- {
- if (errorHandler != null)
- {
- errorHandler.handleError(msg, e);
- }
- else
- {
- System.err.println(msg);
- e.printStackTrace();
- }
- }
-
- protected Session getSsn()
- {
- return ssn;
- }
-
- protected void setSsn(Session ssn)
- {
- this.ssn = ssn;
- }
-
- protected boolean isDurable()
- {
- return durable;
- }
-
- protected boolean isTransacted()
- {
- return transacted;
- }
-
- protected int getTxSize()
- {
- return txSize;
- }
-
- protected int getAck_mode()
- {
- return ack_mode;
- }
-
- protected String getContentType()
- {
- return contentType;
- }
-
- protected long getReportFrequency()
- {
- return reportFrequency;
- }
-
- protected long getStartTime()
- {
- return startTime;
- }
-
- protected void setStartTime(long startTime)
- {
- this.startTime = startTime;
- }
-
- public DateFormat getDf()
- {
- return df;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
deleted file mode 100644
index de7748a..0000000
--- a/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-public interface ErrorHandler {
-
- public void handleError(String msg,Exception e);
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
deleted file mode 100644
index 5773ab8..0000000
--- a/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-
-/**
- * A generic receiver which consumes messages
- * from a given address in a broker (host/port)
- * until told to stop by killing it.
- *
- * It participates in a feedback loop to ensure the producer
- * doesn't fill up the queue. If it receives an "End" msg
- * it sends a reply to the replyTo address in that msg.
- *
- * It doesn't check for correctness or measure anything
- * leaving those concerns to another entity.
- * However it prints a timestamp every x secs(-Dreport_frequency)
- * as checkpoint to figure out how far the test has progressed if
- * a failure occurred.
- *
- * It also takes in an optional Error handler to
- * pass out any error in addition to writing them to std err.
- *
- * This is intended more as building block to create
- * more complex test cases. However there is a main method
- * provided to use this standalone.
- *
- * The following options are available and configurable
- * via jvm args.
- *
- * sync_rcv - Whether to consume sync (instead of using a listener).
- * report_frequency - how often a timestamp is printed
- * durable
- * transacted
- * tx_size - size of transaction batch in # msgs. *
- * check_for_dups - check for duplicate messages and out of order messages.
- * jms_durable_sub - create a durable subscription instead of a regular subscription.
- */
-public class Receiver extends Client implements MessageListener
-{
- private long msg_count = 0;
- private int sequence = 0;
- private boolean syncRcv = Boolean.getBoolean("sync_rcv");
- private boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub");
- private boolean checkForDups = Boolean.getBoolean("check_for_dups");
- private MessageConsumer consumer;
- private List<Integer> duplicateMessages = new ArrayList<Integer>();
-
- public Receiver(Connection con,String addr) throws Exception
- {
- super(con);
- setSsn(con.createSession(isTransacted(), getAck_mode()));
- consumer = getSsn().createConsumer(new AMQAnyDestination(addr));
- if (!syncRcv)
- {
- consumer.setMessageListener(this);
- }
-
- System.out.println("Receiving messages from : " + addr);
- }
-
- @Override
- public void onMessage(Message msg)
- {
- handleMessage(msg);
- }
-
- public void run() throws Exception
- {
- long sleepTime = getReportFrequency();
- while(true)
- {
- if(syncRcv)
- {
- long t = sleepTime;
- while (t > 0)
- {
- long start = System.currentTimeMillis();
- Message msg = consumer.receive(t);
- t = t - (System.currentTimeMillis() - start);
- handleMessage(msg);
- }
- }
- Thread.sleep(sleepTime);
- System.out.println(getDf().format(System.currentTimeMillis())
- + " - messages received : " + msg_count);
- }
- }
-
- private void handleMessage(Message m)
- {
- if (m == null) { return; }
-
- try
- {
- if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
- {
- MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo());
- Message controlMsg = getSsn().createTextMessage();
- temp.send(controlMsg);
- if (isTransacted())
- {
- getSsn().commit();
- }
- temp.close();
- }
- else
- {
-
- int seq = m.getIntProperty("sequence");
- if (checkForDups)
- {
- if (seq == 0)
- {
- sequence = 0; // wrap around for each iteration
- System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration");
- duplicateMessages.clear();
- }
-
- if (seq < sequence)
- {
- duplicateMessages.add(seq);
- }
- else if (seq == sequence)
- {
- sequence++;
- msg_count ++;
- }
- else
- {
- // Multiple publishers are not allowed in this test case.
- // So out of order messages are not allowed.
- throw new Exception(": Received an out of order message (expected="
- + sequence + ",received=" + seq + ")" );
- }
- }
- else
- {
- msg_count ++;
- }
-
- // Please note that this test case doesn't expect duplicates
- // When testing for transactions.
- if (isTransacted() && msg_count % getTxSize() == 0)
- {
- getSsn().commit();
- }
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- handleError("Exception receiving messages",e);
- }
- }
-
- // Receiver host port address
- public static void main(String[] args) throws Exception
- {
- String host = "127.0.0.1";
- int port = 5672;
- String addr = "message_queue";
-
- if (args.length > 0)
- {
- host = args[0];
- }
- if (args.length > 1)
- {
- port = Integer.parseInt(args[1]);
- }
- if (args.length > 2)
- {
- addr = args[2];
- }
-
- AMQConnection con = new AMQConnection(
- "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "'");
-
- Receiver rcv = new Receiver(con,addr);
- rcv.run();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/testkit/Sender.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/tools/src/main/java/org/apache/qpid/testkit/Sender.java
deleted file mode 100644
index 14b9b73..0000000
--- a/tools/src/main/java/org/apache/qpid/testkit/Sender.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.Random;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.tools.MessageFactory;
-
-/**
- * A generic sender which sends a stream of messages
- * to a given address in a broker (host/port)
- * until told to stop by killing it.
- *
- * It has a feedback loop to ensure it doesn't fill
- * up queues due to a slow consumer.
- *
- * It doesn't check for correctness or measure anything
- * leaving those concerns to another entity.
- * However it prints a timestamp every x secs(-Dreport_frequency)
- * as checkpoint to figure out how far the test has progressed if
- * a failure occurred.
- *
- * It also takes in an optional Error handler to
- * pass out any error in addition to writing them to std err.
- *
- * This is intended more as building block to create
- * more complex test cases. However there is a main method
- * provided to use this standalone.
- *
- * The following options are available and configurable
- * via jvm args.
- *
- * msg_size (256)
- * msg_count (10) - # messages before waiting for feedback
- * sleep_time (1000 ms) - sleep time btw each iteration
- * report_frequency - how often a timestamp is printed
- * durable
- * transacted
- * tx_size - size of transaction batch in # msgs.
- */
-public class Sender extends Client
-{
- protected int msg_size = 256;
- protected int msg_count = 10;
- protected int iterations = -1;
- protected long sleep_time = 1000;
-
- protected Destination dest = null;
- protected Destination replyTo = null;
- protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- protected NumberFormat nf = new DecimalFormat("##.00");
-
- protected MessageProducer producer;
- Random gen = new Random(19770905);
-
- public Sender(Connection con,String addr) throws Exception
- {
- super(con);
- this.msg_size = Integer.getInteger("msg_size", 100);
- this.msg_count = Integer.getInteger("msg_count", 10);
- this.iterations = Integer.getInteger("iterations", -1);
- this.sleep_time = Long.getLong("sleep_time", 1000);
- this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE));
- this.dest = new AMQAnyDestination(addr);
- this.producer = getSsn().createProducer(dest);
- this.replyTo = getSsn().createTemporaryQueue();
-
- System.out.println("Sending messages to : " + addr);
- }
-
- /*
- * If msg_size not specified it generates a message
- * between 500-1500 bytes.
- */
- protected Message getNextMessage() throws Exception
- {
- int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size;
- Message msg = (getContentType().equals("text/plain")) ?
- MessageFactory.createTextMessage(getSsn(), s):
- MessageFactory.createBytesMessage(getSsn(), s);
-
- msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT
- : DeliveryMode.NON_PERSISTENT);
- return msg;
- }
-
- public void run()
- {
- try
- {
- boolean infinite = (iterations == -1);
- for (int x=0; infinite || x < iterations; x++)
- {
- long now = System.currentTimeMillis();
- if (now - getStartTime() >= getReportFrequency())
- {
- System.out.println(df.format(now) + " - iterations : " + x);
- setStartTime(now);
- }
-
- for (int i = 0; i < msg_count; i++)
- {
- Message msg = getNextMessage();
- msg.setIntProperty("sequence",i);
- producer.send(msg);
- if (isTransacted() && msg_count % getTxSize() == 0)
- {
- getSsn().commit();
- }
- }
- TextMessage m = getSsn().createTextMessage("End");
- m.setJMSReplyTo(replyTo);
- producer.send(m);
-
- if (isTransacted())
- {
- getSsn().commit();
- }
-
- MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo);
- feedbackConsumer.receive();
- feedbackConsumer.close();
- if (isTransacted())
- {
- getSsn().commit();
- }
- Thread.sleep(sleep_time);
- }
- }
- catch (Exception e)
- {
- handleError("Exception sending messages",e);
- }
- }
-
- // Receiver host port address
- public static void main(String[] args) throws Exception
- {
- String host = "127.0.0.1";
- int port = 5672;
- String addr = "message_queue";
-
- if (args.length > 0)
- {
- host = args[0];
- }
- if (args.length > 1)
- {
- port = Integer.parseInt(args[1]);
- }
- if (args.length > 2)
- {
- addr = args[2];
- }
-
- AMQConnection con = new AMQConnection(
- "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "'");
-
- Sender sender = new Sender(con,addr);
- sender.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
deleted file mode 100644
index 3d0da07..0000000
--- a/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.thread.Threading;
-
-/**
- * A basic test case class that could launch a Sender/Receiver
- * or both, each on it's own separate thread.
- *
- * If con_count == ssn_count, then each entity created will have
- * it's own Connection. Else if con_count {@literal <} ssn_count, then
- * a connection will be shared by ssn_count/con_count # of entities.
- *
- * The if both sender and receiver options are set, it will
- * share a connection.
- *
- * The following options are available as jvm args
- * host, port
- * con_count,ssn_count
- * con_idle_time - which determines heartbeat
- * sender, receiver - booleans which indicate which entity to create.
- * Setting them both is also a valid option.
- */
-public class TestLauncher implements ErrorHandler
-{
- protected String host = "127.0.0.1";
- protected int port = 5672;
- protected int sessions_per_con = 1;
- protected int connection_count = 1;
- protected long heartbeat = 5000;
- protected boolean sender = false;
- protected boolean receiver = false;
- protected boolean useUniqueDests = false;
- protected String url;
-
- protected String address = "my_queue; {create: always}";
- protected boolean durable = false;
- protected String failover = "";
- protected AMQConnection controlCon;
- protected Destination controlDest = null;
- protected Session controlSession = null;
- protected MessageProducer statusSender;
- protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
- protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- protected NumberFormat nf = new DecimalFormat("##.00");
- protected String testName;
-
- public TestLauncher()
- {
- testName = System.getProperty("test_name","UNKNOWN");
- host = System.getProperty("host", "127.0.0.1");
- port = Integer.getInteger("port", 5672);
- sessions_per_con = Integer.getInteger("ssn_per_con", 1);
- connection_count = Integer.getInteger("con_count", 1);
- heartbeat = Long.getLong("heartbeat", 5);
- sender = Boolean.getBoolean("sender");
- receiver = Boolean.getBoolean("receiver");
- useUniqueDests = Boolean.getBoolean("use_unique_dests");
-
- failover = System.getProperty("failover", "");
- durable = Boolean.getBoolean("durable");
-
- url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "?heartbeat='" + heartbeat+ "''";
-
- if (failover.equalsIgnoreCase("failover_exchange"))
- {
- url += "&failover='failover_exchange'";
-
- System.out.println("Failover exchange " + url );
- }
- }
-
- public void setUpControlChannel()
- {
- try
- {
- controlCon = new AMQConnection(url);
- controlCon.start();
-
- controlDest = new AMQAnyDestination("control; {create: always}"); // durable
-
- // Create the session to setup the messages
- controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
- statusSender = controlSession.createProducer(controlDest);
-
- }
- catch (Exception e)
- {
- handleError("Error while setting up the test",e);
- }
- }
-
- public void cleanup()
- {
- try
- {
- controlSession.close();
- controlCon.close();
- for (AMQConnection con : clients)
- {
- con.close();
- }
- }
- catch (Exception e)
- {
- handleError("Error while tearing down the test",e);
- }
- }
-
- public void start(String addr)
- {
- try
- {
- if (addr == null)
- {
- addr = address;
- }
-
- int ssn_per_con = sessions_per_con;
- String addrTemp = addr;
- for (int i = 0; i< connection_count; i++)
- {
- AMQConnection con = new AMQConnection(url);
- con.start();
- clients.add(con);
- for (int j = 0; j< ssn_per_con; j++)
- {
- String index = createPrefix(i,j);
- if (useUniqueDests)
- {
- addrTemp = modifySubject(index,addr);
- }
-
- if (sender)
- {
- createSender(index,con,addrTemp,this);
- }
-
- if (receiver)
- {
- System.out.println("########## Creating receiver ##################");
-
- createReceiver(index,con,addrTemp,this);
- }
- }
- }
- }
- catch (Exception e)
- {
- handleError("Exception while setting up the test",e);
- }
-
- }
-
- protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h)
- {
- Runnable r = new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- Receiver rcv = new Receiver(con,addr);
- rcv.setErrorHandler(h);
- rcv.run();
- }
- catch (Exception e)
- {
- h.handleError("Error Starting Receiver", e);
- }
- }
- };
-
- Thread t = null;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- handleError("Error creating Receive thread",e);
- }
-
- t.setName("ReceiverThread-" + index);
- t.start();
- }
-
- protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h)
- {
- Runnable r = new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- Sender sender = new Sender(con, addr);
- sender.setErrorHandler(h);
- sender.run();
- }
- catch (Exception e)
- {
- h.handleError("Error Starting Sender", e);
- }
- }
- };
-
- Thread t = null;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- handleError("Error creating Sender thread",e);
- }
-
- t.setName("SenderThread-" + index);
- t.start();
- }
-
- @Override
- public synchronized void handleError(String msg, Exception e)
- {
- // In case sending the message fails
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" @ ");
- sb.append(df.format(new Date(System.currentTimeMillis())));
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
-
- try
- {
- TextMessage errorMsg = controlSession.createTextMessage();
- errorMsg.setStringProperty("status", "error");
- errorMsg.setStringProperty("desc", msg);
- errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));
- errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
-
- System.out.println("Msg " + errorMsg);
-
- statusSender.send(errorMsg);
- }
- catch (JMSException e1)
- {
- e1.printStackTrace();
- }
- }
-
- private String serializeStackTrace(Exception e)
- {
- ByteArrayOutputStream bOut = new ByteArrayOutputStream();
- PrintStream printStream = new PrintStream(bOut);
- e.printStackTrace(printStream);
- printStream.close();
- return bOut.toString();
- }
-
- private String createPrefix(int i, int j)
- {
- return String.valueOf(i).concat(String.valueOf(j));
- }
-
- /**
- * A basic helper function to modify the subjects by
- * appending an index.
- */
- private String modifySubject(String index,String addr)
- {
- if (addr.indexOf("/") > 0)
- {
- addr = addr.substring(0,addr.indexOf("/")+1) +
- index +
- addr.substring(addr.indexOf("/")+1,addr.length());
- }
- else if (addr.indexOf(";") > 0)
- {
- addr = addr.substring(0,addr.indexOf(";")) +
- "/" + index +
- addr.substring(addr.indexOf(";"),addr.length());
- }
- else
- {
- addr = addr + "/" + index;
- }
-
- return addr;
- }
-
- public static void main(String[] args)
- {
- final TestLauncher test = new TestLauncher();
- test.setUpControlChannel();
- System.out.println("args.length " + args.length);
- System.out.println("args [0] " + args [0]);
- test.start(args.length > 0 ? args [0] : null);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() { test.cleanup(); }
- });
-
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/Clock.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/Clock.java b/tools/src/main/java/org/apache/qpid/tools/Clock.java
deleted file mode 100644
index 7eb83a5..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/Clock.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * In the future this will be replaced by a Clock abstraction
- * that can utilize a realtime clock when running in RT Java.
- */
-
-public class Clock
-{
- private static final Logger _logger = LoggerFactory.getLogger(Clock.class);
-
- public final static long SEC = 60000;
-
- private static Precision precision;
- private static long offset = -1; // in nano secs
-
- public enum Precision
- {
- NANO_SECS, MILI_SECS;
-
- static Precision getPrecision(String str)
- {
- if ("mili".equalsIgnoreCase(str))
- {
- return MILI_SECS;
- }
- else
- {
- return NANO_SECS;
- }
- }
- };
-
- static
- {
- precision = Precision.getPrecision(System.getProperty("precision","mili"));
- //offset = Long.getLong("offset",-1);
-
- if (_logger.isDebugEnabled())
- {
- System.out.println("Using precision : " + precision );
- //+ " and offset " + offset);
- }
- }
-
- public static Precision getPrecision()
- {
- return precision;
- }
-
- public static long getTime()
- {
- if (precision == Precision.NANO_SECS)
- {
- if (offset == -1)
- {
- return System.nanoTime();
- }
- else
- {
- return System.nanoTime() + offset;
- }
- }
- else
- {
- if (offset == -1)
- {
- return System.currentTimeMillis();
- }
- else
- {
- return System.currentTimeMillis() + offset/convertToMiliSecs();
- }
- }
- }
-
- public static long convertToSecs()
- {
- if (precision == Precision.NANO_SECS)
- {
- return 1000000000;
- }
- else
- {
- return 1000;
- }
- }
-
- public static long convertToMiliSecs()
- {
- if (precision == Precision.NANO_SECS)
- {
- return 1000000;
- }
- else
- {
- return 1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
deleted file mode 100644
index bd6cfd4..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tools;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Enumeration;
-import java.util.Hashtable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.jms.FailoverPolicy;
-
-public class JNDICheck
-{
- private static final String QUEUE = "queue.";
- private static final String TOPIC = "topic.";
- private static final String DESTINATION = "destination.";
- private static final String CONNECTION_FACTORY = "connectionfactory.";
-
- public static void main(String[] args)
- {
-
- if (args.length != 1)
- {
- usage();
- }
-
- String propertyFile = args[0];
-
- new JNDICheck(propertyFile);
- }
-
- private static void usage()
- {
- exit("Usage: JNDICheck <JNDI Config file>", 0);
- }
-
- private static void exit(String message, int exitCode)
- {
- System.err.println(message);
- System.exit(exitCode);
- }
-
- private static String JAVA_NAMING = "java.naming.factory.initial";
-
- private Context _context = null;
- private Hashtable _environment = null;
-
- public JNDICheck(String propertyFile)
- {
-
- // Load JNDI properties
- Properties properties = new Properties();
-
- try(FileInputStream propertiesStream = new FileInputStream(new File(propertyFile)))
- {
- properties.load(propertiesStream);
- }
- catch (IOException e)
- {
- exit("Unable to open property file:" + propertyFile + ". Due to:" + e.getMessage(), 1);
- }
-
- //Create the initial context
- try
- {
-
- System.setProperty(JAVA_NAMING, properties.getProperty(JAVA_NAMING));
-
- _context = new InitialContext(properties);
-
- _environment = _context.getEnvironment();
-
- Enumeration keys = _environment.keys();
-
- List<String> queues = new LinkedList<String>();
- List<String> topics = new LinkedList<String>();
- List<String> destinations = new LinkedList<String>();
- List<String> connectionFactories = new LinkedList<String>();
-
- while (keys.hasMoreElements())
- {
- String key = keys.nextElement().toString();
-
- if (key.startsWith(QUEUE))
- {
- queues.add(key);
- }
- else if (key.startsWith(TOPIC))
- {
- topics.add(key);
- }
- else if (key.startsWith(DESTINATION))
- {
- destinations.add(key);
- }
- else if (key.startsWith(CONNECTION_FACTORY))
- {
- connectionFactories.add(key);
- }
- }
-
- printHeader(propertyFile);
- printEntries(QUEUE, queues);
- printEntries(TOPIC, topics);
- printEntries(DESTINATION, destinations);
- printEntries(CONNECTION_FACTORY, connectionFactories);
-
- }
- catch (NamingException e)
- {
- exit("Unable to load JNDI Context due to:" + e.getMessage(), 1);
- }
-
- }
-
- private void printHeader(String file)
- {
- print("JNDI file :" + file);
- }
-
- private void printEntries(String type, List<String> list)
- {
- if (list.size() > 0)
- {
- String name = type.substring(0, 1).toUpperCase() + type.substring(1, type.length() - 1);
- print(name + " elements in file:");
- printList(list);
- print("");
- }
- }
-
- private void printList(List<String> list)
- {
- for (String item : list)
- {
- String key = item.substring(item.indexOf('.') + 1);
-
- try
- {
- print(key, _context.lookup(key));
- }
- catch (NamingException e)
- {
- exit("Error: item " + key + " no longer in context.", 1);
- }
- }
- }
-
- private void print(String key, Object object)
- {
- if (object instanceof AMQDestination)
- {
- print(key + ":" + object);
- }
- else if (object instanceof AMQConnectionFactory)
- {
- AMQConnectionFactory factory = (AMQConnectionFactory) object;
- print(key + ":Connection");
- print("ConnectionURL:");
- print(factory.getConnectionURL().toString());
- print("FailoverPolicy");
- print(new FailoverPolicy(factory.getConnectionURL(),null).toString());
- print("");
- }
- }
-
- private void print(String msg)
- {
- System.out.println(msg);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java b/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java
deleted file mode 100644
index e0e4851..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.text.DecimalFormat;
-
-import javax.jms.Connection;
-import javax.jms.Session;
-
-import org.apache.qpid.client.AMQConnection;
-
-public class JVMArgConfiguration implements TestConfiguration
-{
- /*
- * By default the connection URL is used.
- * This allows a user to easily specify a fully fledged URL any given property.
- * Ex. SSL parameters
- *
- * By providing a host & port allows a user to simply override the URL.
- * This allows to create multiple clients in test scripts easily,
- * without having to deal with the long URL format.
- */
- private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
-
- private String host = "";
-
- private int port = -1;
-
- private String address = "queue; {create : always}";
-
- private long timeout = 0;
-
- private int msg_size = 1024;
-
- private int random_msg_size_start_from = 1;
-
- private boolean cacheMessage = false;
-
- private boolean disableMessageID = false;
-
- private boolean disableTimestamp = false;
-
- private boolean durable = false;
-
- private int transaction_size = 0;
-
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
-
- private int msg_count = 10;
-
- private int warmup_count = 1;
-
- private boolean random_msg_size = false;
-
- private String msgType = "bytes";
-
- private boolean printStdDev = false;
-
- private int sendRate = 0;
-
- private boolean externalController = false;
-
- private boolean useUniqueDest = false; // useful when using multiple connections.
-
- private int ackFrequency = 100;
-
- private DecimalFormat df = new DecimalFormat("###.##");
-
- private int reportEvery = 0;
-
- private boolean isReportTotal = false;
-
- private boolean isReportHeader = true;
-
- private int sendEOS = 0;
-
- private int connectionCount = 1;
-
- private int rollbackFrequency = 0;
-
- private boolean printHeaders;
-
- private boolean printContent;
-
- private long ttl;
-
- private int priority;
-
- private String readyAddress;
-
- public JVMArgConfiguration()
- {
-
- url = System.getProperty("url",url);
- host = System.getProperty("host","");
- port = Integer.getInteger("port", -1);
- address = System.getProperty("address",address);
-
- timeout = Long.getLong("timeout",0);
- msg_size = Integer.getInteger("msg-size", 0);
- cacheMessage = true; //Boolean.getBoolean("cache-msg");
- disableMessageID = Boolean.getBoolean("disable-message-id");
- disableTimestamp = Boolean.getBoolean("disable-timestamp");
- durable = Boolean.getBoolean("durable");
- transaction_size = Integer.getInteger("tx",1000);
- ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE);
- msg_count = Integer.getInteger("msg-count",msg_count);
- warmup_count = Integer.getInteger("warmup-count",warmup_count);
- random_msg_size = Boolean.getBoolean("random-msg-size");
- msgType = System.getProperty("msg-type","bytes");
- printStdDev = Boolean.getBoolean("print-std-dev");
- sendRate = Integer.getInteger("rate",0);
- externalController = Boolean.getBoolean("ext-controller");
- useUniqueDest = Boolean.getBoolean("use-unique-dest");
- random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1);
- reportEvery = Integer.getInteger("report-every",0);
- isReportTotal = Boolean.getBoolean("report-total");
- isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header");
- sendEOS = Integer.getInteger("send-eos",1);
- connectionCount = Integer.getInteger("con_count",1);
- ackFrequency = Integer.getInteger("ack-frequency",100);
- rollbackFrequency = Integer.getInteger("rollback-frequency",0);
- printHeaders = Boolean.getBoolean("print-headers");
- printContent = Boolean.getBoolean("print-content");
- ttl = Long.getLong("ttl", 0);
- priority = Integer.getInteger("priority", 0);
- readyAddress = System.getProperty("ready-address");
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getUrl()
- */
- @Override
- public String getUrl()
- {
- return url;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getHost()
- */
- @Override
- public String getHost()
- {
- return host;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getPort()
- */
- @Override
- public int getPort()
- {
- return port;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getAddress()
- */
- @Override
- public String getAddress()
- {
- return address;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getTimeout()
- */
- @Override
- public long getTimeout()
- {
- return timeout;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getAckMode()
- */
- @Override
- public int getAckMode()
- {
- return ack_mode;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getMsgCount()
- */
- @Override
- public int getMsgCount()
- {
- return msg_count;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getMsgSize()
- */
- @Override
- public int getMsgSize()
- {
- return msg_size;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom()
- */
- @Override
- public int getRandomMsgSizeStartFrom()
- {
- return random_msg_size_start_from;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isDurable()
- */
- @Override
- public boolean isDurable()
- {
- return durable;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isTransacted()
- */
- @Override
- public boolean isTransacted()
- {
- return transaction_size > 0;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize()
- */
- @Override
- public int getTransactionSize()
- {
- return transaction_size;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount()
- */
- @Override
- public int getWarmupCount()
- {
- return warmup_count;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage()
- */
- @Override
- public boolean isCacheMessage()
- {
- return cacheMessage;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID()
- */
- @Override
- public boolean isDisableMessageID()
- {
- return disableMessageID;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp()
- */
- @Override
- public boolean isDisableTimestamp()
- {
- return disableTimestamp;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize()
- */
- @Override
- public boolean isRandomMsgSize()
- {
- return random_msg_size;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getMessageType()
- */
- @Override
- public String getMessageType()
- {
- return msgType;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev()
- */
- @Override
- public boolean isPrintStdDev()
- {
- return printStdDev;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getSendRate()
- */
- @Override
- public int getSendRate()
- {
- return sendRate;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isExternalController()
- */
- @Override
- public boolean isExternalController()
- {
- return externalController;
- }
-
- public void setAddress(String addr)
- {
- address = addr;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests()
- */
- @Override
- public boolean isUseUniqueDests()
- {
- return useUniqueDest;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency()
- */
- @Override
- public int getAckFrequency()
- {
- return ackFrequency;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#createConnection()
- */
- @Override
- public Connection createConnection() throws Exception
- {
- if (getHost().equals("") || getPort() == -1)
- {
- return new AMQConnection(getUrl());
- }
- else
- {
- return new AMQConnection(getHost(),getPort(),"guest","guest","test","test");
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat()
- */
- @Override
- public DecimalFormat getDecimalFormat()
- {
- return df;
- }
-
- @Override
- public int reportEvery()
- {
- return reportEvery;
- }
-
- @Override
- public boolean isReportTotal()
- {
- return isReportTotal;
- }
-
- @Override
- public boolean isReportHeader()
- {
- return isReportHeader;
- }
-
- @Override
- public int getSendEOS()
- {
- return sendEOS;
- }
-
- @Override
- public int getConnectionCount()
- {
- return connectionCount;
- }
-
- @Override
- public int getRollbackFrequency()
- {
- return rollbackFrequency;
- }
-
- @Override
- public boolean isPrintHeaders()
- {
- return printHeaders;
- }
-
- @Override
- public boolean isPrintContent()
- {
- return printContent;
- }
-
- @Override
- public long getTTL()
- {
- return ttl;
- }
-
- @Override
- public int getPriority()
- {
- return priority;
- }
-
- @Override
- public String getReadyAddress()
- {
- return readyAddress;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/MemoryConsumptionTestClient.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/MemoryConsumptionTestClient.java b/tools/src/main/java/org/apache/qpid/tools/MemoryConsumptionTestClient.java
index 5e6cae0..2ab04a2 100644
--- a/tools/src/main/java/org/apache/qpid/tools/MemoryConsumptionTestClient.java
+++ b/tools/src/main/java/org/apache/qpid/tools/MemoryConsumptionTestClient.java
@@ -60,15 +60,11 @@ import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import org.apache.qpid.client.AMQConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-
+/* TODO this program assumes addresses understood by the Qpid JMS Client 0-x. */
public class MemoryConsumptionTestClient
{
private static final Logger LOGGER = LoggerFactory.getLogger(MemoryConsumptionTestClient.class);
@@ -292,12 +288,12 @@ public class MemoryConsumptionTestClient
}
}
- private void purgeQueue(ConnectionFactory connectionFactory, String queueString, long receiveTimeout) throws JMSException, QpidException
+ private void purgeQueue(ConnectionFactory connectionFactory, String queueString, long receiveTimeout) throws JMSException
{
LOGGER.debug("Consuming left over messages, using receive timeout:" + receiveTimeout);
Connection connection = connectionFactory.createConnection();
- Session session = ((AMQConnection)connection).createSession(false, Session.AUTO_ACKNOWLEDGE, 10);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueString);
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
@@ -309,12 +305,8 @@ public class MemoryConsumptionTestClient
if(msg == null)
{
- long queueDepth = ((AMQSession<?,?>)session).getQueueDepth((AMQDestination)destination);
- if (queueDepth == 0)
- {
- LOGGER.debug("Received " + count + " messages");
- break;
- }
+ LOGGER.debug("Received {} message(s)", connection);
+ break;
}
else
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java b/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
deleted file mode 100644
index 7ceef47..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.net.InetAddress;
-import java.util.UUID;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.messaging.Address;
-import org.apache.qpid.tools.TestConfiguration.MessageType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MercuryBase
-{
- private static final Logger _logger = LoggerFactory.getLogger(MercuryBase.class);
-
- public final static String CODE = "CODE";
- public final static String ID = "ID";
- public final static String REPLY_ADDR = "REPLY_ADDR";
- public final static String MAX_LATENCY = "MAX_LATENCY";
- public final static String MIN_LATENCY = "MIN_LATENCY";
- public final static String AVG_LATENCY = "AVG_LATENCY";
- public final static String STD_DEV = "STD_DEV";
- public final static String CONS_RATE = "CONS_RATE";
- public final static String PROD_RATE = "PROD_RATE";
- public final static String MSG_COUNT = "MSG_COUNT";
- public final static String TIMESTAMP = "Timestamp";
-
- String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
-
- TestConfiguration config;
- Connection con;
- Session session;
- Session controllerSession;
- Destination dest;
- Destination myControlQueue;
- Destination controllerQueue;
- String id;
- String myControlQueueAddr;
-
- MessageProducer sendToController;
- MessageConsumer receiveFromController;
- String prefix = "";
-
- enum OPCode
- {
- REGISTER_CONSUMER, REGISTER_PRODUCER,
- PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
- CONSUMER_READY, PRODUCER_READY,
- PRODUCER_START,
- RECEIVED_END_MSG, CONSUMER_STOP,
- RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS,
- CONTINUE_TEST, STOP_TEST
- };
-
- MessageType msgType = MessageType.BYTES;
-
- public MercuryBase(TestConfiguration config,String prefix)
- {
- this.config = config;
- String host = "";
- try
- {
- host = InetAddress.getLocalHost().getHostName();
- }
- catch (Exception e)
- {
- }
- id = host + "-" + UUID.randomUUID().toString();
- this.prefix = prefix;
- this.myControlQueueAddr = id + ";{create: always}";
- }
-
- public void setUp() throws Exception
- {
- con = config.createConnection();
- con.start();
-
- controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- dest = createDestination();
- controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR, false);
- myControlQueue = session.createQueue(myControlQueueAddr);
- msgType = MessageType.getType(config.getMessageType());
- _logger.debug("Using " + msgType + " messages");
-
- sendToController = controllerSession.createProducer(controllerQueue);
- receiveFromController = controllerSession.createConsumer(myControlQueue);
- }
-
- private Destination createDestination() throws Exception
- {
- if (config.isUseUniqueDests())
- {
- _logger.debug("Prefix : " + prefix);
- Address addr = Address.parse(config.getAddress());
- AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress(), false);
- int type = ((AMQSession_0_10)session).resolveAddressType(temp);
-
- if ( type == AMQDestination.TOPIC_TYPE)
- {
- addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions());
- System.out.println("Setting subject : " + addr);
- }
- else
- {
- addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions());
- System.out.println("Setting name : " + addr);
- }
-
- return AMQDestination.createDestination(addr.toString(), false);
- }
- else
- {
- return AMQDestination.createDestination(config.getAddress(), false);
- }
- }
-
- public synchronized void sendMessageToController(MapMessage m) throws Exception
- {
- m.setString(ID, id);
- m.setString(REPLY_ADDR,myControlQueueAddr);
- sendToController.send(m);
- }
-
- public void receiveFromController(OPCode expected) throws Exception
- {
- MapMessage m = (MapMessage)receiveFromController.receive();
- OPCode code = OPCode.values()[m.getInt(CODE)];
- _logger.debug("Received Code : " + code);
- if (expected != code)
- {
- throw new Exception("Expected OPCode : " + expected + " but received : " + code);
- }
-
- }
-
- public boolean continueTest() throws Exception
- {
- MapMessage m = (MapMessage)receiveFromController.receive();
- OPCode code = OPCode.values()[m.getInt(CODE)];
- _logger.debug("Received Code : " + code);
- return (code == OPCode.CONTINUE_TEST);
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- controllerSession.close();
- con.close();
- }
-
- public void handleError(Exception e,String msg)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java b/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
deleted file mode 100644
index 77c9e52..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
-
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.tools.report.MercuryReporter;
-import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughputAndLatency;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * PerfConsumer will receive x no of messages in warmup mode.
- * Once it receives the Start message it will then signal the PerfProducer.
- * It will start recording stats from the first message it receives after
- * the warmup mode is done.
- *
- * The following calculations are done.
- * The important numbers to look at is
- * a) Avg Latency
- * b) System throughput.
- *
- * Latency.
- * =========
- * Currently this test is written with the assumption that either
- * a) The Perf Producer and Consumer are on the same machine
- * b) They are on separate machines that have their time synced via a Time Server
- *
- * In order to calculate latency the producer inserts a timestamp
- * when the message is sent. The consumer will note the current time the message is
- * received and will calculate the latency as follows
- * latency = rcvdTime - msg.getJMSTimestamp()
- *
- * Through out the test it will keep track of the max and min latency to show the
- * variance in latencies.
- *
- * Avg latency is measured by adding all latencies and dividing by the total msgs.
- *
- * Throughput
- * ===========
- * Consumer rate is calculated as
- * rcvdMsgCount/(rcvdTime - startTime)
- *
- * Note that the testStartTime referes to when the producer sent the first message
- * and startTime is when the consumer first received a message.
- *
- * rcvdTime keeps track of when the last message is received.
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- */
-
-public class MercuryConsumerController extends MercuryBase
-{
- private static final Logger _logger = LoggerFactory.getLogger(MercuryConsumerController.class);
- MercuryReporter reporter;
- TestConfiguration config;
- QpidReceive receiver;
-
- public MercuryConsumerController(TestConfiguration config, MercuryReporter reporter, String prefix)
- {
- super(config,prefix);
- this.reporter = reporter;
- if (_logger.isInfoEnabled())
- {
- _logger.info("Consumer ID : " + id);
- }
- }
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- receiver = new QpidReceive(reporter,config, con,dest);
- receiver.setUp();
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
- sendMessageToController(m);
- }
-
- public void warmup()throws Exception
- {
- receiveFromController(OPCode.CONSUMER_STARTWARMUP);
- receiver.waitforCompletion(config.getWarmupCount());
-
- // It's more realistic for the consumer to signal this.
- MapMessage m1 = controllerSession.createMapMessage();
- m1.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
- sendMessageToController(m1);
-
- MapMessage m2 = controllerSession.createMapMessage();
- m2.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
- sendMessageToController(m2);
- }
-
- public void runReceiver() throws Exception
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Consumer: " + id + " Starting iteration......" + "\n");
- }
- resetCounters();
- receiver.waitforCompletion(config.getMsgCount());
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
- sendMessageToController(m);
- }
-
- public void resetCounters()
- {
- reporter.clear();
- }
-
- public void sendResults() throws Exception
- {
- receiveFromController(OPCode.CONSUMER_STOP);
- reporter.report();
-
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
- m.setDouble(AVG_LATENCY, reporter.getAvgLatency());
- m.setDouble(MIN_LATENCY, reporter.getMinLatency());
- m.setDouble(MAX_LATENCY, reporter.getMaxLatency());
- m.setDouble(STD_DEV, reporter.getStdDev());
- m.setDouble(CONS_RATE, reporter.getRate());
- m.setLong(MSG_COUNT, reporter.getSampleSize());
- sendMessageToController(m);
-
- reporter.log(new StringBuilder("Total Msgs Received : ").append(reporter.getSampleSize()).toString());
- reporter.log(new StringBuilder("Consumer rate : ").
- append(config.getDecimalFormat().format(reporter.getRate())).
- append(" msg/sec").toString());
- reporter.log(new StringBuilder("Avg Latency : ").
- append(config.getDecimalFormat().format(reporter.getAvgLatency())).
- append(" ms").toString());
- reporter.log(new StringBuilder("Min Latency : ").
- append(config.getDecimalFormat().format(reporter.getMinLatency())).
- append(" ms").toString());
- reporter.log(new StringBuilder("Max Latency : ").
- append(config.getDecimalFormat().format(reporter.getMaxLatency())).
- append(" ms").toString());
- if (config.isPrintStdDev())
- {
- reporter.log(new StringBuilder("Std Dev : ").
- append(reporter.getStdDev()).toString());
- }
- }
-
- public void run()
- {
- try
- {
- setUp();
- warmup();
- boolean nextIteration = true;
- while (nextIteration)
- {
- System.out.println("=========================================================\n");
- System.out.println("Consumer: " + id + " starting a new iteration ......\n");
- runReceiver();
- sendResults();
- nextIteration = continueTest();
- }
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- @Override
- public void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- MercuryReporter reporter= new MercuryReporter(MercuryThroughputAndLatency.class,System.out,10,true);
- String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = config.getConnectionCount();
- final CountDownLatch testCompleted = new CountDownLatch(conCount);
- for (int i=0; i < conCount; i++)
- {
- final MercuryConsumerController cons = new MercuryConsumerController(config, reporter, scriptId + i);
- Runnable r = new Runnable()
- {
- @Override
- public void run()
- {
- cons.run();
- testCompleted.countDown();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
- }
- t.start();
- }
- testCompleted.await();
- reporter.log("Consumers have completed the test......\n");
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org