You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/10/22 15:33:41 UTC
[3/5] qpid-broker-j git commit: QPID-7984: [Qpid Broker-J] [Tools]
Remove unused tools.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java b/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
deleted file mode 100644
index 93f2ec1..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
-
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.tools.report.MercuryReporter;
-import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
- * from the consumer that it has successfully consumed them and ready to start the
- * test. It will start sending y no of messages and each message will contain a time
- * stamp. This will be used at the receiving end to measure the latency.
- *
- * This is done with the assumption that both consumer and producer are running on
- * the same machine or different machines which have time synced using a time server.
- *
- * This test also calculates the producer rate as follows.
- * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- * Rajith - Producer rate is not an accurate perf metric IMO.
- * It is heavily inlfuenced by any in memory buffering.
- * System throughput and latencies calculated by the PerfConsumer are more realistic
- * numbers.
- *
- * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs
- * I have done so far, it seems quite useful to compute the producer rate as it gives an
- * indication of how the system behaves. For ex if there is a gap between producer and consumer rates
- * you could clearly see the higher latencies and when producer and consumer rates are very close,
- * latency is good.
- *
- */
-public class MercuryProducerController extends MercuryBase
-{
- private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class);
- MercuryReporter reporter;
- QpidSend sender;
-
- public MercuryProducerController(TestConfiguration config, MercuryReporter reporter, String prefix)
- {
- super(config,prefix);
- this.reporter = reporter;
- System.out.println("Producer ID : " + id);
- }
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- sender = new QpidSend(reporter,config, con,dest);
- sender.setUp();
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
- sendMessageToController(m);
- }
-
- public void warmup()throws Exception
- {
- receiveFromController(OPCode.PRODUCER_STARTWARMUP);
- if (_logger.isInfoEnabled())
- {
- _logger.info("Producer: " + id + " Warming up......");
- }
- sender.send(config.getWarmupCount());
- sender.sendEndMessage();
- }
-
- public void runSender() throws Exception
- {
- resetCounters();
- receiveFromController(OPCode.PRODUCER_START);
- sender.send(config.getMsgCount());
- }
-
- public void resetCounters()
- {
- sender.resetCounters();
- }
-
- public void sendResults() throws Exception
- {
- MapMessage msg = controllerSession.createMapMessage();
- msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
- msg.setDouble(PROD_RATE, reporter.getRate());
- sendMessageToController(msg);
- reporter.log(new StringBuilder("Producer rate: ").
- append(config.getDecimalFormat().format(reporter.getRate())).
- append(" msg/sec").
- toString());
- }
-
- @Override
- public void tearDown() throws Exception
- {
- sender.tearDown();
- super.tearDown();
- }
-
- public void run()
- {
- try
- {
- setUp();
- warmup();
- boolean nextIteration = true;
- while (nextIteration)
- {
- if(_logger.isInfoEnabled())
- {
- _logger.info("=========================================================\n");
- _logger.info("Producer: " + id + " starting a new iteration ......\n");
- }
- runSender();
- sendResults();
- nextIteration = continueTest();
- }
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- public void startControllerIfNeeded()
- {
- if (!config.isExternalController())
- {
- final MercuryTestController controller = new MercuryTestController(config);
- Runnable r = new Runnable()
- {
- @Override
- public void run()
- {
- controller.run();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating controller thread",e);
- }
- t.start();
- }
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- MercuryReporter reporter= new MercuryReporter(MercuryThroughput.class,System.out,10,true);
- String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = config.getConnectionCount();
- final CountDownLatch testCompleted = new CountDownLatch(conCount);
- for (int i=0; i < conCount; i++)
- {
- final MercuryProducerController prod = new MercuryProducerController(config, reporter, scriptId + i);
- prod.startControllerIfNeeded();
- Runnable r = new Runnable()
- {
- @Override
- public void run()
- {
- prod.run();
- testCompleted.countDown();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
- }
- t.start();
- }
- testCompleted.await();
- reporter.log("Producers have completed the test......");
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java b/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
deleted file mode 100644
index 4f3e162..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.io.FileWriter;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-
-import org.apache.qpid.client.message.AMQPEncodedMapMessage;
-import org.apache.qpid.tools.report.Reporter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The Controller coordinates a test run between a number
- * of producers and consumers, configured via -Dprod_count and -Dcons_count.
- *
- * It waits till all the producers and consumers have registered and then
- * conducts a warmup run. Once all consumers and producers have completed
- * the warmup run and is ready, it will conduct the actual test run and
- * collect all stats from the participants and calculates the system
- * throughput, the avg/min/max for producer rates, consumer rates and latency.
- *
- * These stats are then printed to std out.
- * The Controller also prints events to std out to give a running account
- * of the test run in progress. Ex registering of participants, starting warmup ..etc.
- * This allows a scripting tool to monitor the progress.
- *
- * The Controller can be run in two modes.
- * 1. A single test run (default) where it just runs until the message count specified
- * for the producers via -Dmsg_count is sent and received.
- *
- * 2. Time based, configured via -Dduration=x, where x is in mins.
- * In this mode, the Controller repeatedly cycles through the tests (after an initial
- * warmup run) until the desired time is reached. If a test run is in progress
- * and the time is up, it will allow the run the complete.
- *
- * After each iteration, the stats will be printed out in csv format to a separate log file.
- * System throughput is calculated as follows
- * totalMsgCount/(totalTestTime)
- */
-public class MercuryTestController extends MercuryBase implements MessageListener
-{
- private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class);
-
- enum TestMode { SINGLE_RUN, TIME_BASED };
-
- TestMode testMode = TestMode.SINGLE_RUN;
-
- long totalTestTime;
-
- private double avgSystemLatency = 0.0;
- private double minSystemLatency = Double.MAX_VALUE;
- private double maxSystemLatency = 0;
- private double avgSystemLatencyStdDev = 0.0;
-
- private double avgSystemConsRate = 0.0;
- private double maxSystemConsRate = 0.0;
- private double minSystemConsRate = Double.MAX_VALUE;
-
- private double avgSystemProdRate = 0.0;
- private double maxSystemProdRate = 0.0;
- private double minSystemProdRate = Double.MAX_VALUE;
-
- private long totalMsgCount = 0;
- private double totalSystemThroughput = 0.0;
-
- private int consumerCount = Integer.getInteger("cons_count", 1);
- private int producerCount = Integer.getInteger("prod_count", 1);
- private int duration = Integer.getInteger("duration", -1); // in mins
- private Map<String,MapMessage> consumers;
- private Map<String,MapMessage> producers;
-
- private CountDownLatch consRegistered;
- private CountDownLatch prodRegistered;
- private CountDownLatch consReady;
- private CountDownLatch prodReady;
- private CountDownLatch receivedEndMsg;
- private CountDownLatch receivedConsStats;
- private CountDownLatch receivedProdStats;
-
- private MessageConsumer consumer;
- private boolean printStdDev = false;
- private FileWriter writer;
- private Reporter report;
-
- public MercuryTestController(TestConfiguration config)
- {
- super(config,"");
-
- consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
- producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
-
- consRegistered = new CountDownLatch(consumerCount);
- prodRegistered = new CountDownLatch(producerCount);
- consReady = new CountDownLatch(consumerCount);
- prodReady = new CountDownLatch(producerCount);
- printStdDev = config.isPrintStdDev();
- testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED;
- }
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- if (testMode == TestMode.TIME_BASED)
- {
- writer = new FileWriter("stats-csv.log");
- }
- consumer = controllerSession.createConsumer(controllerQueue);
- report.log("\nController: " + producerCount + " producers are expected");
- report.log("Controller: " + consumerCount + " consumers are expected \n");
- consumer.setMessageListener(this);
- consRegistered.await();
- prodRegistered.await();
- report.log("\nController: All producers and consumers have registered......\n");
- }
-
- public void warmup() throws Exception
- {
- report.log("Controller initiating warm up sequence......");
- sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values());
- sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values());
- prodReady.await();
- consReady.await();
- report.log("\nController : All producers and consumers are ready to start the test......\n");
- }
-
- public void startTest() throws Exception
- {
- resetCounters();
- report.log("\nController Starting test......");
- long start = Clock.getTime();
- sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
- receivedEndMsg.await();
- totalTestTime = Clock.getTime() - start;
- sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values());
- receivedProdStats.await();
- receivedConsStats.await();
- }
-
- public void resetCounters()
- {
- minSystemLatency = Double.MAX_VALUE;
- maxSystemLatency = 0;
- maxSystemConsRate = 0.0;
- minSystemConsRate = Double.MAX_VALUE;
- maxSystemProdRate = 0.0;
- minSystemProdRate = Double.MAX_VALUE;
-
- totalMsgCount = 0;
-
- receivedConsStats = new CountDownLatch(consumerCount);
- receivedProdStats = new CountDownLatch(producerCount);
- receivedEndMsg = new CountDownLatch(producerCount);
- }
-
- public void calcStats() throws Exception
- {
- double totLatency = 0.0;
- double totStdDev = 0.0;
- double totalConsRate = 0.0;
- double totalProdRate = 0.0;
-
- MapMessage conStat = null; // for error handling
- try
- {
- for (MapMessage m: consumers.values())
- {
- conStat = m;
- minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY));
- maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY));
- totLatency = totLatency + m.getDouble(AVG_LATENCY);
- totStdDev = totStdDev + m.getDouble(STD_DEV);
-
- minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE));
- maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE));
- totalConsRate = totalConsRate + m.getDouble(CONS_RATE);
-
- totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT);
- }
- }
- catch(Exception e)
- {
- System.err.println("Error calculating stats from Consumer : " + conStat);
- }
-
-
- MapMessage prodStat = null; // for error handling
- try
- {
- for (MapMessage m: producers.values())
- {
- prodStat = m;
- minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE));
- maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE));
- totalProdRate = totalProdRate + m.getDouble(PROD_RATE);
- }
- }
- catch(Exception e)
- {
- System.err.println("Error calculating stats from Producer : " + conStat);
- }
-
- avgSystemLatency = totLatency/consumers.size();
- avgSystemLatencyStdDev = totStdDev/consumers.size();
- avgSystemConsRate = totalConsRate/consumers.size();
- avgSystemProdRate = totalProdRate/producers.size();
-
- report.log("Total test time : " + totalTestTime + " in " + Clock.getPrecision());
-
- totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime);
- }
-
- public void printResults() throws Exception
- {
- report.log(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString());
- report.log(new StringBuilder("System Throughput : ").
- append(config.getDecimalFormat().format(totalSystemThroughput)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Avg Consumer rate : ").
- append(config.getDecimalFormat().format(avgSystemConsRate)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Min Consumer rate : ").
- append(config.getDecimalFormat().format(minSystemConsRate)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Max Consumer rate : ").
- append(config.getDecimalFormat().format(maxSystemConsRate)).
- append(" msg/sec").toString());
-
- report.log(new StringBuilder("Avg Producer rate : ").
- append(config.getDecimalFormat().format(avgSystemProdRate)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Min Producer rate : ").
- append(config.getDecimalFormat().format(minSystemProdRate)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Max Producer rate : ").
- append(config.getDecimalFormat().format(maxSystemProdRate)).
- append(" msg/sec").toString());
-
- report.log(new StringBuilder("Avg System Latency : ").
- append(config.getDecimalFormat().format(avgSystemLatency)).
- append(" ms").toString());
- report.log(new StringBuilder("Min System Latency : ").
- append(config.getDecimalFormat().format(minSystemLatency)).
- append(" ms").toString());
- report.log(new StringBuilder("Max System Latency : ").
- append(config.getDecimalFormat().format(maxSystemLatency)).
- append(" ms").toString());
- if (printStdDev)
- {
- report.log(new StringBuilder("Avg System Std Dev : ").
- append(avgSystemLatencyStdDev).toString());
- }
- }
-
- private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception
- {
- report.log("\nController: Sending code " + code);
- MessageProducer tmpProd = controllerSession.createProducer(null);
- MapMessage msg = controllerSession.createMapMessage();
- msg.setInt(CODE, code.ordinal());
- for (MapMessage node : nodes)
- {
- if (node.getString(REPLY_ADDR) == null)
- {
- report.log("REPLY_ADDR is null " + node);
- }
- else
- {
- report.log("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR));
- }
- tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg);
- }
- }
-
- @Override
- public void onMessage(Message msg)
- {
- try
- {
- MapMessage m = (MapMessage)msg;
- OPCode code = OPCode.values()[m.getInt(CODE)];
-
- report.log("\n---------Controller Received Code : " + code);
- report.log("---------Data : " + ((AMQPEncodedMapMessage)m).getMap());
-
- switch (code)
- {
- case REGISTER_CONSUMER :
- if (consRegistered.getCount() == 0)
- {
- report.log("Warning : Expected number of consumers have already registered," +
- "ignoring extra consumer");
- break;
- }
- consumers.put(m.getString(ID),m);
- consRegistered.countDown();
- break;
-
- case REGISTER_PRODUCER :
- if (prodRegistered.getCount() == 0)
- {
- report.log("Warning : Expected number of producers have already registered," +
- "ignoring extra producer");
- break;
- }
- producers.put(m.getString(ID),m);
- prodRegistered.countDown();
- break;
-
- case CONSUMER_READY :
- consReady.countDown();
- break;
-
- case PRODUCER_READY :
- prodReady.countDown();
- break;
-
- case RECEIVED_END_MSG :
- receivedEndMsg.countDown();
- break;
-
- case RECEIVED_CONSUMER_STATS :
- consumers.put(m.getString(ID),m);
- receivedConsStats.countDown();
- break;
-
- case RECEIVED_PRODUCER_STATS :
- producers.put(m.getString(ID),m);
- receivedProdStats.countDown();
- break;
-
- default:
- throw new Exception("Invalid OPCode " + code);
- }
- }
- catch (Exception e)
- {
- handleError(e,"Error when receiving messages " + msg);
- }
- }
-
- public void run()
- {
- try
- {
- setUp();
- warmup();
- if (testMode == TestMode.SINGLE_RUN)
- {
- startTest();
- calcStats();
- printResults();
- }
- else
- {
- long startTime = Clock.getTime();
- long timeLimit = duration * 60 * 1000; // duration is in mins.
- boolean nextIteration = true;
- while (nextIteration)
- {
- startTest();
- calcStats();
- writeStatsToFile();
- if (Clock.getTime() - startTime < timeLimit)
- {
- sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values());
- sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values());
- nextIteration = true;
- }
- else
- {
- nextIteration = false;
- }
- }
- }
- tearDown();
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- @Override
- public void tearDown() throws Exception {
- report.log("Controller: Completed the test......\n");
- if (testMode == TestMode.TIME_BASED)
- {
- writer.close();
- }
- sendMessageToNodes(OPCode.STOP_TEST,consumers.values());
- sendMessageToNodes(OPCode.STOP_TEST,producers.values());
- super.tearDown();
- }
-
- public void writeStatsToFile() throws Exception
- {
- writer.append(String.valueOf(totalMsgCount)).append(",");
- writer.append(config.getDecimalFormat().format(totalSystemThroughput)).append(",");
- writer.append(config.getDecimalFormat().format(avgSystemConsRate)).append(",");
- writer.append(config.getDecimalFormat().format(minSystemConsRate)).append(",");
- writer.append(config.getDecimalFormat().format(maxSystemConsRate)).append(",");
- writer.append(config.getDecimalFormat().format(avgSystemProdRate)).append(",");
- writer.append(config.getDecimalFormat().format(minSystemProdRate)).append(",");
- writer.append(config.getDecimalFormat().format(maxSystemProdRate)).append(",");
- writer.append(config.getDecimalFormat().format(avgSystemLatency)).append(",");
- writer.append(config.getDecimalFormat().format(minSystemLatency)).append(",");
- writer.append(config.getDecimalFormat().format(maxSystemLatency));
- if (printStdDev)
- {
- writer.append(",").append(String.valueOf(avgSystemLatencyStdDev));
- }
- writer.append("\n");
- writer.flush();
- }
-
- public static void main(String[] args)
- {
- TestConfiguration config = new JVMArgConfiguration();
- MercuryTestController controller = new MercuryTestController(config);
- controller.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
deleted file mode 100644
index 2833ea7..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-public class MessageFactory
-{
- public static Message createBytesMessage(Session ssn, int size) throws JMSException
- {
- BytesMessage msg = ssn.createBytesMessage();
- msg.writeBytes(createMessagePayload(size).getBytes(UTF_8));
- return msg;
- }
-
- public static Message createTextMessage(Session ssn, int size) throws JMSException
- {
- TextMessage msg = ssn.createTextMessage();
- msg.setText(createMessagePayload(size));
- return msg;
- }
-
- public static String createMessagePayload(int size)
- {
- String msgData = "Qpid Test Message";
-
- StringBuffer buf = new StringBuffer(size);
- int count = 0;
- while (count <= (size - msgData.length()))
- {
- buf.append(msgData);
- count += msgData.length();
- }
- if (count < size)
- {
- buf.append(msgData, 0, size - count);
- }
-
- return buf.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
deleted file mode 100644
index 3645f95..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
+++ /dev/null
@@ -1,914 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import static org.apache.qpid.tools.QpidBench.Mode.BOTH;
-import static org.apache.qpid.tools.QpidBench.Mode.CONSUME;
-import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.ExchangeBind;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageSubscribe;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.QueueDeclare;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.util.UUIDGen;
-import org.apache.qpid.util.UUIDs;
-
-/**
- * QpidBench
- *
- */
-
-public class QpidBench
-{
-
- static enum Mode
- {
- PUBLISH, CONSUME, BOTH
- }
-
- private static class Options
- {
- private StringBuilder usage = new StringBuilder("qpid-bench <options>");
-
- void usage(String name, String description, Object def)
- {
- String defval = "";
- if (def != null)
- {
- defval = String.format(" (%s)", def);
- }
- usage.append(String.format("%n %-15s%-14s %s", name, defval, description));
- }
-
- public String broker = "localhost";
- public int port = 5672;
- public long count = 1000000;
- public long window = 100000;
- public long sample = window;
- public int size = 1024;
- public Mode mode = BOTH;
- public boolean timestamp = false;
- public boolean message_id = false;
- public boolean message_cache = false;
- public boolean persistent = false;
- public boolean jms_publish = false;
- public boolean jms_consume = false;
- public boolean help = false;
-
- {
- usage("-b, --broker", "the broker hostname", broker);
- }
-
- public void parse__broker(String b)
- {
- this.broker = b;
- }
-
- public void parse_b(String b)
- {
- parse__broker(b);
- }
-
- {
- usage("-p, --port", "the broker port", port);
- }
-
- public void parse__port(String p)
- {
- this.port = Integer.parseInt(p);
- }
-
- public void parse_p(String p)
- {
- parse__port(p);
- }
-
- {
- usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count);
- }
-
- public void parse__count(String c)
- {
- this.count = Long.parseLong(c);
- }
-
- public void parse_c(String c)
- {
- parse__count(c);
- }
-
- {
- usage("-w, --window", "the number of messages to send before blocking, 0 disables", window);
- }
-
- public void parse__window(String w)
- {
- this.window = Long.parseLong(w);
- }
-
- public void parse_w(String w)
- {
- parse__window(w);
- }
-
- {
- usage("--sample", "print stats after this many messages, 0 disables", sample);
- }
-
- public void parse__sample(String s)
- {
- this.sample = Long.parseLong(s);
- }
-
- {
- usage("-i, --interval", "sets both --window and --sample", window);
- }
-
- public void parse__interval(String i)
- {
- this.window = Long.parseLong(i);
- this.sample = window;
- }
-
- public void parse_i(String i)
- {
- parse__interval(i);
- }
-
- {
- usage("-s, --size", "the message size", size);
- }
-
- public void parse__size(String s)
- {
- this.size = Integer.parseInt(s);
- }
-
- public void parse_s(String s)
- {
- parse__size(s);
- }
-
- {
- usage("-m, --mode", "one of publish, consume, or both", mode);
- }
-
- public void parse__mode(String m)
- {
- if (m.equalsIgnoreCase("publish"))
- {
- this.mode = PUBLISH;
- }
- else if (m.equalsIgnoreCase("consume"))
- {
- this.mode = CONSUME;
- }
- else if (m.equalsIgnoreCase("both"))
- {
- this.mode = BOTH;
- }
- else
- {
- throw new IllegalArgumentException
- ("must be one of 'publish', 'consume', or 'both'");
- }
- }
-
- public void parse_m(String m)
- {
- parse__mode(m);
- }
-
- {
- usage("--timestamp", "set timestamps on each message if true", timestamp);
- }
-
- public void parse__timestamp(String t)
- {
- this.timestamp = Boolean.parseBoolean(t);
- }
-
- {
- usage("--mesage-id", "set the message-id on each message if true", message_id);
- }
-
- public void parse__message_id(String m)
- {
- this.message_id = Boolean.parseBoolean(m);
- }
-
- {
- usage("--message-cache", "reuse the same message for each send if true", message_cache);
- }
-
- public void parse__message_cache(String c)
- {
- this.message_cache = Boolean.parseBoolean(c);
- }
-
- {
- usage("--persistent", "set the delivery-mode to persistent if true", persistent);
- }
-
- public void parse__persistent(String p)
- {
- this.persistent = Boolean.parseBoolean(p);
- }
-
- {
- usage("--jms-publish", "use the jms client for publish", jms_publish);
- }
-
- public void parse__jms_publish(String jp)
- {
- this.jms_publish = Boolean.parseBoolean(jp);
- }
-
- {
- usage("--jms-consume", "use the jms client for consume", jms_consume);
- }
-
- public void parse__jms_consume(String jc)
- {
- this.jms_consume = Boolean.parseBoolean(jc);
- }
-
- {
- usage("--jms", "sets both --jms-publish and --jms-consume", false);
- }
-
- public void parse__jms(String j)
- {
- this.jms_publish = this.jms_consume = Boolean.parseBoolean(j);
- }
-
- {
- usage("-h, --help", "prints this message", null);
- }
-
- public void parse__help()
- {
- this.help = true;
- }
-
- public void parse_h()
- {
- parse__help();
- }
-
- public String parse(String ... args)
- {
- Class klass = getClass();
- List<String> arguments = new ArrayList<String>();
- for (int i = 0; i < args.length; i++)
- {
- String option = args[i];
-
- if (!option.startsWith("-"))
- {
- arguments.add(option);
- continue;
- }
-
- String method = "parse" + option.replace('-', '_');
- try
- {
- try
- {
- Method parser = klass.getMethod(method);
- parser.invoke(this);
- }
- catch (NoSuchMethodException e)
- {
- try
- {
- Method parser = klass.getMethod(method, String.class);
-
- String value = null;
- if (i + 1 < args.length)
- {
- value = args[i+1];
- i++;
- }
- else
- {
- return option + " requires a value";
- }
-
- parser.invoke(this, value);
- }
- catch (NoSuchMethodException e2)
- {
- return "no such option: " + option;
- }
- }
- }
- catch (InvocationTargetException e)
- {
- Throwable t = e.getCause();
- return String.format
- ("error parsing %s: %s: %s", option, t.getClass().getName(),
- t.getMessage());
- }
- catch (IllegalAccessException e)
- {
- throw new RuntimeException
- ("unable to access parse method: " + option, e);
- }
- }
-
- return parseArguments(arguments);
- }
-
- public String parseArguments(List<String> arguments)
- {
- if (arguments.size() > 0)
- {
- String args = arguments.toString();
- return "unrecognized arguments: " + args.substring(1, args.length() - 1);
- }
- else
- {
- return null;
- }
- }
-
- @Override
- public String toString()
- {
- Class klass = getClass();
- Field[] fields = klass.getFields();
- StringBuilder str = new StringBuilder();
- for (int i = 0; i < fields.length; i++)
- {
- if (i > 0)
- {
- str.append("\n");
- }
-
- String name = fields[i].getName();
- str.append(name);
- str.append(" = ");
- Object value;
- try
- {
- value = fields[i].get(this);
- }
- catch (IllegalAccessException e)
- {
- throw new RuntimeException
- ("unable to access field: " + name, e);
- }
- str.append(value);
- }
-
- return str.toString();
- }
- }
-
- public static final void main(String[] args) throws Exception
- {
- final Options opts = new Options();
- String error = opts.parse(args);
- if (error != null)
- {
- System.err.println(error);
- System.exit(-1);
- return;
- }
-
- if (opts.help)
- {
- System.out.println(opts.usage);
- return;
- }
-
- System.out.println(opts);
-
- switch (opts.mode)
- {
- case CONSUME:
- case BOTH:
- Runnable r = new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- if (opts.jms_consume)
- {
- jms_consumer(opts);
- }
- else
- {
- native_consumer(opts);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- System.out.println("Consumer Completed");
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
- }
- t.start();
- break;
- }
-
- switch (opts.mode)
- {
- case PUBLISH:
- case BOTH:
- Runnable r = new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- if (opts.jms_publish)
- {
- jms_publisher(opts);
- }
- else
- {
- native_publisher(opts);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- System.out.println("Producer Completed");
- }
- };
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating publisher thread",e);
- }
- t.start();
- break;
- }
- }
-
- private static enum Column
- {
- LEFT, RIGHT
- }
-
- private static final void sample(Options opts, Column col, String name, long count,
- long start, long time, long lastTime)
- {
- String pfx = "";
- String sfx = "";
- if (opts.mode == BOTH)
- {
- if (col == Column.RIGHT)
- {
- pfx = " -- ";
- }
- else
- {
- sfx = " --";
- }
- }
-
- if (count == 0)
- {
- String stats = String.format("%s: %tc", name, start);
- System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
- return;
- }
-
- double cumulative = 1000 * (double) count / (double) (time - start);
- double interval = 1000 * ((double) opts.sample / (double) (time - lastTime));
-
- String stats = String.format
- ("%s: %d %.2f %.2f", name, count, cumulative, interval);
- System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
- }
-
- private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception
- {
- String url = String.format
- ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'",
- opts.broker, opts.port);
- return new AMQConnection(url);
- }
-
- private static final void jms_publisher(Options opts) throws Exception
- {
- javax.jms.Connection conn = getJMSConnection(opts);
-
- javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
- Destination dest = ssn.createQueue("test-queue");
- Destination echo_dest = ssn.createQueue("echo-queue");
- MessageProducer prod = ssn.createProducer(dest);
- MessageConsumer cons = ssn.createConsumer(echo_dest);
- prod.setDisableMessageID(!opts.message_id);
- prod.setDisableMessageTimestamp(!opts.timestamp);
- prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- StringBuilder str = new StringBuilder();
- for (int i = 0; i < opts.size; i++)
- {
- str.append((char) (i % 128));
- }
-
- String body = str.toString();
-
- TextMessage cached = ssn.createTextMessage();
- cached.setText(body);
-
- conn.start();
-
- long count = 0;
- long lastTime = 0;
- long start = System.currentTimeMillis();
- while (opts.count == 0 || count < opts.count)
- {
- if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
- {
- Message echo = cons.receive();
- }
-
- if (opts.sample > 0 && (count % opts.sample) == 0)
- {
- long time = System.currentTimeMillis();
- sample(opts, Column.LEFT, "JP", count, start, time, lastTime);
- lastTime = time;
- }
-
- TextMessage m;
- if (opts.message_cache)
- {
- m = cached;
- }
- else
- {
- m = ssn.createTextMessage();
- m.setText(body);
- }
-
- prod.send(m);
- count++;
- }
-
- conn.close();
- }
-
- private static final void jms_consumer(final Options opts) throws Exception
- {
- final javax.jms.Connection conn = getJMSConnection(opts);
- javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
- Destination dest = ssn.createQueue("test-queue");
- Destination echo_dest = ssn.createQueue("echo-queue");
- MessageConsumer cons = ssn.createConsumer(dest);
- final MessageProducer prod = ssn.createProducer(echo_dest);
- prod.setDisableMessageID(true);
- prod.setDisableMessageTimestamp(true);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- final TextMessage echo = ssn.createTextMessage();
- echo.setText("ECHO");
-
- final Object done = new Object();
- cons.setMessageListener(new MessageListener()
- {
- private long count = 0;
- private long lastTime = 0;
- private long start;
-
- @Override
- public void onMessage(Message m)
- {
- if (count == 0)
- {
- start = System.currentTimeMillis();
- }
-
- try
- {
- boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
- long time = sample ? System.currentTimeMillis() : 0;
-
- if (opts.window > 0 && (count % opts.window) == 0)
- {
- prod.send(echo);
- }
-
- if (sample)
- {
- sample(opts, Column.RIGHT, "JC", count, start, time, lastTime);
- lastTime = time;
- }
- }
- catch (JMSException e)
- {
- throw new RuntimeException(e);
- }
- count++;
-
- if (opts.count > 0 && count >= opts.count)
- {
- synchronized (done)
- {
- done.notify();
- }
- }
- }
- });
-
- conn.start();
- synchronized (done)
- {
- done.wait();
- }
- conn.close();
- }
-
- private static final org.apache.qpid.transport.Connection getConnection
- (Options opts)
- {
- org.apache.qpid.transport.Connection conn =
- new org.apache.qpid.transport.Connection();
- conn.connect(opts.broker, opts.port, null, "guest", "guest", false, null);
- return conn;
- }
-
- private static abstract class NativeListener implements SessionListener
- {
-
- @Override
- public void opened(org.apache.qpid.transport.Session ssn) {}
-
- @Override
- public void resumed(org.apache.qpid.transport.Session ssn) {}
-
- @Override
- public void exception(org.apache.qpid.transport.Session ssn,
- SessionException exc)
- {
- exc.printStackTrace();
- }
-
- @Override
- public void closed(org.apache.qpid.transport.Session ssn) {}
-
- }
-
- private static final void native_publisher(Options opts) throws Exception
- {
- final long[] echos = { 0 };
- org.apache.qpid.transport.Connection conn = getConnection(opts);
- org.apache.qpid.transport.Session ssn = conn.createSession();
- ssn.setSessionListener(new NativeListener()
- {
- @Override
- public void message(org.apache.qpid.transport.Session ssn,
- MessageTransfer xfr)
- {
- synchronized (echos)
- {
- echos[0]++;
- echos.notify();
- }
- ssn.processed(xfr);
- }
- });
-
- ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
- ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
-
- MessageProperties cached_mp = new MessageProperties();
- DeliveryProperties cached_dp = new DeliveryProperties();
- cached_dp.setRoutingKey("test-queue");
- cached_dp.setDeliveryMode
- (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
-
- int size = opts.size;
- ByteBuffer body = ByteBuffer.allocate(size);
- for (int i = 0; i < size; i++)
- {
- body.put((byte) i);
- }
- body.flip();
-
- ssn.invoke(new MessageSubscribe()
- .queue("echo-queue")
- .destination("echo-queue")
- .acceptMode(MessageAcceptMode.NONE)
- .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
- ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW);
- ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
- ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
-
- UUIDGen gen = UUIDs.newGenerator();
-
- long count = 0;
- long lastTime = 0;
- long start = System.currentTimeMillis();
- while (opts.count == 0 || count < opts.count)
- {
- if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
- {
- synchronized (echos)
- {
- while (echos[0] < (count/opts.window))
- {
- echos.wait();
- }
- }
- }
-
- if (opts.sample > 0 && (count % opts.sample) == 0)
- {
- long time = System.currentTimeMillis();
- sample(opts, Column.LEFT, "NP", count, start, time, lastTime);
- lastTime = time;
- }
-
- MessageProperties mp;
- DeliveryProperties dp;
- if (opts.message_cache)
- {
- mp = cached_mp;
- dp = cached_dp;
- }
- else
- {
- mp = new MessageProperties();
- dp = new DeliveryProperties();
- dp.setRoutingKey("test-queue");
- dp.setDeliveryMode
- (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
-
- }
-
- if (opts.message_id)
- {
- mp.setMessageId(gen.generate());
- }
-
- if (opts.timestamp)
- {
- dp.setTimestamp(System.currentTimeMillis());
- }
-
- ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
- new Header(dp, mp), body.slice());
- count++;
- }
-
- ssn.messageCancel("echo-queue");
-
- ssn.sync();
- ssn.close();
- conn.close();
- }
-
- private static final void native_consumer(final Options opts) throws Exception
- {
- final DeliveryProperties dp = new DeliveryProperties();
- final byte[] echo = new byte[0];
- dp.setRoutingKey("echo-queue");
- dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
- final MessageProperties mp = new MessageProperties();
- final Object done = new Object();
- org.apache.qpid.transport.Connection conn = getConnection(opts);
- org.apache.qpid.transport.Session ssn = conn.createSession();
- ssn.setSessionListener(new NativeListener()
- {
- private long count = 0;
- private long lastTime = 0;
- private long start;
-
- @Override
- public void message(org.apache.qpid.transport.Session ssn,
- MessageTransfer xfr)
- {
- if (count == 0)
- {
- start = System.currentTimeMillis();
- }
-
- boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
- long time = sample ? System.currentTimeMillis() : 0;
-
- if (opts.window > 0 && (count % opts.window) == 0)
- {
- ssn.messageTransfer("amq.direct",
- MessageAcceptMode.NONE,
- MessageAcquireMode.PRE_ACQUIRED,
- new Header(dp, mp),
- echo);
- }
-
- if (sample)
- {
- sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
- lastTime = time;
- }
- ssn.processed(xfr);
- count++;
-
- if (opts.count > 0 && count >= opts.count)
- {
- synchronized (done)
- {
- done.notify();
- }
- }
- }
- });
-
- ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
- ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
-
- ssn.invoke(new MessageSubscribe()
- .queue("test-queue")
- .destination("test-queue")
- .acceptMode(MessageAcceptMode.NONE)
- .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
- ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW);
- ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
- ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
-
- synchronized (done)
- {
- done.wait();
- }
-
- ssn.messageCancel("test-queue");
-
- ssn.sync();
- ssn.close();
- conn.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
deleted file mode 100644
index c563e6d..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.tools.report.BasicReporter;
-import org.apache.qpid.tools.report.Reporter;
-import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class QpidReceive implements MessageListener
-{
- private static final Logger _logger = LoggerFactory.getLogger(QpidReceive.class);
- private final CountDownLatch testCompleted = new CountDownLatch(1);
-
- private Connection con;
- private Session session;
- private Destination dest;
- private MessageConsumer consumer;
- private boolean transacted = false;
- private boolean isRollback = false;
- private int txSize = 0;
- private int rollbackFrequency = 0;
- private int ackFrequency = 0;
- private int expected = 0;
- private int received = 0;
- private Reporter report;
- private TestConfiguration config;
-
- public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest)
- {
- this(report,config, con, dest, UUID.randomUUID().toString());
- }
-
- public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
- {
- //System.out.println("Producer ID : " + id);
- this.report = report;
- this.config = config;
- this.con = con;
- this.dest = dest;
- }
-
- public void setUp() throws Exception
- {
- con.start();
- if (config.isTransacted())
- {
- session = con.createSession(true, Session.SESSION_TRANSACTED);
- }
- else if (config.getAckFrequency() > 0)
- {
- session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- }
- else
- {
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
- if (_logger.isDebugEnabled())
- {
- System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n");
- }
-
- transacted = config.isTransacted();
- txSize = config.getTransactionSize();
- isRollback = config.getRollbackFrequency() > 0;
- rollbackFrequency = config.getRollbackFrequency();
- ackFrequency = config.getAckFrequency();
-
- _logger.debug("Ready address : " + config.getReadyAddress());
- if (config.getReadyAddress() != null)
- {
- MessageProducer prod = session.createProducer(AMQDestination
- .createDestination(config.getReadyAddress(), false));
- prod.send(session.createMessage());
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending message to ready address " + prod.getDestination());
- }
- }
- }
-
- public void resetCounters()
- {
- received = 0;
- expected = 0;
- report.clear();
- }
-
- @Override
- public void onMessage(Message msg)
- {
- try
- {
- if (msg instanceof TextMessage &&
- TestConfiguration.EOS.equals(((TextMessage)msg).getText()))
- {
- testCompleted.countDown();
- return;
- }
-
- received++;
- report.message(msg);
-
- if (config.isPrintHeaders())
- {
- System.out.println(((AbstractJMSMessage)msg).toHeaderString());
- }
-
- if (config.isPrintContent())
- {
- System.out.println(((AbstractJMSMessage)msg).toBodyString());
- }
-
- if (transacted && (received % txSize == 0))
- {
- if (isRollback && (received % rollbackFrequency == 0))
- {
- session.rollback();
- }
- else
- {
- session.commit();
- }
- }
- else if (ackFrequency > 0)
- {
- msg.acknowledge();
- }
-
- if (received >= expected)
- {
- testCompleted.countDown();
- }
-
- }
- catch(Exception e)
- {
- _logger.error("Error when receiving messages",e);
- }
- }
-
- public void waitforCompletion(int expected) throws Exception
- {
- this.expected = expected;
- testCompleted.await();
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- Reporter reporter = new BasicReporter(ThroughputAndLatency.class,
- System.out,
- config.reportEvery(),
- config.isReportHeader());
- Destination dest = AMQDestination.createDestination(config.getAddress(), false);
- QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest);
- receiver.setUp();
- receiver.waitforCompletion(config.getMsgCount() + config.getSendEOS());
- if (config.isReportTotal())
- {
- reporter.report();
- }
- receiver.tearDown();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
deleted file mode 100644
index fd7503a..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.tools.TestConfiguration.MessageType;
-import org.apache.qpid.tools.report.BasicReporter;
-import org.apache.qpid.tools.report.Reporter;
-import org.apache.qpid.tools.report.Statistics.Throughput;
-
-public class QpidSend
-{
- private Connection con;
- private Session session;
- private Destination dest;
- private MessageProducer producer;
- private MessageType msgType;
- private Message msg;
- private Object payload;
- private List<Object> payloads;
- private boolean cacheMsg = false;
- private boolean randomMsgSize = false;
- private boolean durable = false;
- private Random random;
- private int msgSizeRange = 1024;
- private int totalMsgCount = 0;
- private boolean rateLimitProducer = false;
- private boolean transacted = false;
- private int txSize = 0;
-
- private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class);
- Reporter report;
- TestConfiguration config;
-
- public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest)
- {
- this(report,config, con, dest, UUID.randomUUID().toString());
- }
-
- public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
- {
- //System.out.println("Producer ID : " + id);
- this.report = report;
- this.config = config;
- this.con = con;
- this.dest = dest;
- }
-
- public void setUp() throws Exception
- {
- con.start();
- if (config.isTransacted())
- {
- session = con.createSession(true, Session.SESSION_TRANSACTED);
- }
- else
- {
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- durable = config.isDurable();
- rateLimitProducer = config.getSendRate() > 0 ? true : false;
- if (_logger.isDebugEnabled() && rateLimitProducer)
- {
- _logger.debug("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec");
- }
-
- transacted = config.isTransacted();
- txSize = config.getTransactionSize();
-
- msgType = MessageType.getType(config.getMessageType());
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (config.isCacheMessage())
- {
- cacheMsg = true;
- msg = createMessage(createPayload(config.getMsgSize()));
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else if (config.isRandomMsgSize())
- {
- random = new Random(20080921);
- randomMsgSize = true;
- msgSizeRange = config.getMsgSize();
- payloads = new ArrayList<Object>(msgSizeRange);
-
- for (int i=0; i < msgSizeRange; i++)
- {
- payloads.add(createPayload(i));
- }
- }
- else
- {
- payload = createPayload(config.getMsgSize());
- }
-
- producer = session.createProducer(dest);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName());
- }
- producer.setDisableMessageID(config.isDisableMessageID());
- //we add a separate timestamp to allow interoperability with other clients.
- producer.setDisableMessageTimestamp(true);
- if (config.getTTL() > 0)
- {
- producer.setTimeToLive(config.getTTL());
- }
- if (config.getPriority() > 0)
- {
- producer.setPriority(config.getPriority());
- }
- }
-
- Object createPayload(int size)
- {
- if (msgType == MessageType.TEXT)
- {
- return MessageFactory.createMessagePayload(size);
- }
- else
- {
- return MessageFactory.createMessagePayload(size).getBytes(UTF_8);
- }
- }
-
- Message createMessage(Object payload) throws Exception
- {
- if (msgType == MessageType.TEXT)
- {
- return session.createTextMessage((String)payload);
- }
- else
- {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes((byte[])payload);
- return m;
- }
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (cacheMsg)
- {
- return msg;
- }
- else
- {
- Message m;
-
- if (!randomMsgSize)
- {
- m = createMessage(payload);
- }
- else
- {
- m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
- }
- m.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- return m;
- }
- }
-
- public void commit() throws Exception
- {
- session.commit();
- }
-
- public void send() throws Exception
- {
- send(config.getMsgCount());
- }
-
- public void send(int count) throws Exception
- {
- int sendRate = config.getSendRate();
- if (rateLimitProducer)
- {
- int iterations = count/sendRate;
- int remainder = count%sendRate;
- for (int i=0; i < iterations; i++)
- {
- long iterationStart = System.currentTimeMillis();
- sendMessages(sendRate);
- long elapsed = System.currentTimeMillis() - iterationStart;
- long diff = Clock.SEC - elapsed;
- if (diff > 0)
- {
- // We have sent more messages in a sec than specified by the rate.
- Thread.sleep(diff);
- }
- }
- sendMessages(remainder);
- }
- else
- {
- sendMessages(count);
- }
- }
-
- private void sendMessages(int count) throws Exception
- {
- boolean isTimestamp = !config.isDisableTimestamp();
- long s = System.currentTimeMillis();
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- if (isTimestamp)
- {
- msg.setLongProperty(TestConfiguration.TIMESTAMP, System.currentTimeMillis());
- }
- producer.send(msg);
- //report.message(msg);
- totalMsgCount++;
-
- if ( transacted && ((totalMsgCount) % txSize == 0))
- {
- session.commit();
- }
- }
- long e = System.currentTimeMillis() - s;
- //System.out.println("Rate : " + totalMsgCount/e);
- }
-
- public void resetCounters()
- {
- totalMsgCount = 0;
- report.clear();
- }
-
- public void sendEndMessage() throws Exception
- {
- Message msg = session.createTextMessage(TestConfiguration.EOS);
- producer.send(msg);
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- Reporter reporter = new BasicReporter(Throughput.class,
- System.out,
- config.reportEvery(),
- config.isReportHeader()
- );
- Destination dest = AMQDestination.createDestination(config.getAddress(), false);
- QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest);
- sender.setUp();
- sender.send();
- if (config.getSendEOS() > 0)
- {
- sender.sendEndMessage();
- }
- if (config.isReportTotal())
- {
- reporter.report();
- }
- sender.tearDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java b/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java
index fc0b616..4beac5f 100644
--- a/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java
+++ b/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java
@@ -55,7 +55,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.qpid.tools.util.ArgumentsParser;
-import org.apache.qpid.util.Strings;
public class RestStressTestClient
{
@@ -523,7 +522,7 @@ public class RestStressTestClient
{
try
{
- byte[] challengeBytes = Strings.decodeBase64(challenge);
+ byte[] challengeBytes = decodeBase64(challenge);
String macAlgorithm = "HmacMD5";
Mac mac = Mac.getInstance(macAlgorithm);
@@ -539,6 +538,17 @@ public class RestStressTestClient
}
}
+ public static byte[] decodeBase64(String base64String)
+ {
+ base64String = base64String.replaceAll("\\s","");
+ if(!base64String.matches("^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$"))
+ {
+ throw new IllegalArgumentException("Cannot convert string '"+ base64String+ "'to a byte[] - it does not appear to be base64 data");
+ }
+
+ return DatatypeConverter.parseBase64Binary(base64String);
+ }
+
private String toHex(byte[] data)
{
StringBuilder hash = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java b/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java
deleted file mode 100644
index 18870ba..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.text.DecimalFormat;
-
-import javax.jms.Connection;
-
-public interface TestConfiguration
-{
- enum MessageType {
- BYTES, TEXT, MAP, OBJECT;
-
- public static MessageType getType(String s) throws Exception
- {
- if ("text".equalsIgnoreCase(s))
- {
- return TEXT;
- }
- else if ("bytes".equalsIgnoreCase(s))
- {
- return BYTES;
- }
- /*else if ("map".equalsIgnoreCase(s))
- {
- return MAP;
- }
- else if ("object".equalsIgnoreCase(s))
- {
- return OBJECT;
- }*/
- else
- {
- throw new Exception("Unsupported message type");
- }
- }
- };
-
- public final static String TIMESTAMP = "ts";
-
- public final static String EOS = "eos";
-
- public final static String SEQUENCE_NUMBER = "sn";
-
- public String getUrl();
-
- public String getHost();
-
- public int getPort();
-
- public String getAddress();
-
- public long getTimeout();
-
- public int getAckMode();
-
- public int getMsgCount();
-
- public int getMsgSize();
-
- public int getRandomMsgSizeStartFrom();
-
- public boolean isDurable();
-
- public boolean isTransacted();
-
- public int getTransactionSize();
-
- public int getWarmupCount();
-
- public boolean isCacheMessage();
-
- public boolean isDisableMessageID();
-
- public boolean isDisableTimestamp();
-
- public boolean isRandomMsgSize();
-
- public String getMessageType();
-
- public boolean isPrintStdDev();
-
- public int getSendRate();
-
- public boolean isExternalController();
-
- public boolean isUseUniqueDests();
-
- public int getAckFrequency();
-
- public Connection createConnection() throws Exception;
-
- public DecimalFormat getDecimalFormat();
-
- public int reportEvery();
-
- public boolean isReportTotal();
-
- public boolean isReportHeader();
-
- public int getSendEOS();
-
- public int getConnectionCount();
-
- public int getRollbackFrequency();
-
- public boolean isPrintHeaders();
-
- public boolean isPrintContent();
-
- public long getTTL();
-
- public int getPriority();
-
- public String getReadyAddress();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java b/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java
deleted file mode 100644
index a9896c1..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.report;
-
-import java.io.PrintStream;
-import java.lang.reflect.Constructor;
-
-import javax.jms.Message;
-
-public class BasicReporter implements Reporter
-{
- PrintStream out;
- int batchSize = 0;
- int batchCount = 0;
- boolean headerPrinted = false;
- protected Statistics overall;
- Statistics batch;
- Constructor<? extends Statistics> statCtor;
-
- public BasicReporter(Class<? extends Statistics> clazz, PrintStream out,
- int batchSize, boolean printHeader) throws Exception
- {
- this.out = out;
- this.batchSize = batchSize;
- this.headerPrinted = !printHeader;
- statCtor = clazz.getConstructor();
- overall = statCtor.newInstance();
- if (batchSize > 0)
- {
- batch = statCtor.newInstance();
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.report.Reporter#message(javax.jms.Message)
- */
- @Override
- public void message(Message msg)
- {
- overall.message(msg);
- if (batchSize > 0)
- {
- batch.message(msg);
- if (++batchCount == batchSize)
- {
- if (!headerPrinted)
- {
- header();
- }
- batch.report(out);
- batch.clear();
- batchCount = 0;
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.report.Reporter#report()
- */
- @Override
- public void report()
- {
- if (!headerPrinted)
- {
- header();
- }
- overall.report(out);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.report.Reporter#header()
- */
- @Override
- public void header()
- {
- headerPrinted = true;
- overall.header(out);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.report.Reporter#log()
- */
- @Override
- public void log(String s)
- {
- // NOOP
- }
-
- @Override
- public void clear()
- {
- batch.clear();
- overall.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java b/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java
deleted file mode 100644
index 8af4da5..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.report;
-
-import java.io.PrintStream;
-
-import org.apache.qpid.tools.report.Statistics.Throughput;
-import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency;
-
-public class MercuryReporter extends BasicReporter
-{
- MercuryStatistics stats;
-
- public MercuryReporter(Class<? extends MercuryStatistics> clazz, PrintStream out,
- int batchSize, boolean printHeader) throws Exception
- {
- super(clazz, out, batchSize, printHeader);
- stats = (MercuryStatistics)overall;
- }
-
- public double getRate()
- {
- return stats.getRate();
- }
-
- public double getAvgLatency()
- {
- return stats.getAvgLatency();
- }
-
- public double getStdDev()
- {
- return stats.getStdDev();
- }
-
- public double getMinLatency()
- {
- return stats.getMinLatency();
- }
-
- public double getMaxLatency()
- {
- return stats.getMaxLatency();
- }
-
- public int getSampleSize()
- {
- return stats.getSampleSize();
- }
-
- public interface MercuryStatistics extends Statistics
- {
- public double getRate();
- public long getMinLatency();
- public long getMaxLatency();
- public double getAvgLatency();
- public double getStdDev();
- public int getSampleSize();
- }
-
- public static class MercuryThroughput extends Throughput implements MercuryStatistics
- {
- double rate = 0;
-
- @Override
- public void report(PrintStream out)
- {
- long elapsed = System.currentTimeMillis() - start;
- rate = (double)messages/(double)elapsed;
- }
-
- @Override
- public void clear()
- {
- super.clear();
- rate = 0;
- }
-
- @Override
- public double getRate()
- {
- return rate;
- }
-
- @Override
- public int getSampleSize()
- {
- return messages;
- }
-
- @Override
- public long getMinLatency() { return 0; }
- @Override
- public long getMaxLatency() { return 0; }
- @Override
- public double getAvgLatency(){ return 0; }
- @Override
- public double getStdDev(){ return 0; }
-
- }
-
- public static class MercuryThroughputAndLatency extends ThroughputAndLatency implements MercuryStatistics
- {
- double rate = 0;
- double avgLatency = 0;
- double stdDev;
-
- @Override
- public void report(PrintStream out)
- {
- long elapsed = System.currentTimeMillis() - start;
- rate = (double)messages/(double)elapsed;
- avgLatency = totalLatency/(double)sampleCount;
- }
-
- @Override
- public void clear()
- {
- super.clear();
- rate = 0;
- avgLatency = 0;
- }
-
- @Override
- public double getRate()
- {
- return rate;
- }
-
- @Override
- public long getMinLatency()
- {
- return minLatency;
- }
-
- @Override
- public long getMaxLatency()
- {
- return maxLatency;
- }
-
- @Override
- public double getAvgLatency()
- {
- return avgLatency;
- }
-
- @Override
- public double getStdDev()
- {
- return stdDev;
- }
-
- @Override
- public int getSampleSize()
- {
- return messages;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a48234d2/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java b/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java
deleted file mode 100644
index 5e48145..0000000
--- a/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.report;
-
-import javax.jms.Message;
-
-public interface Reporter
-{
-
- public void message(Message msg);
-
- public void report();
-
- public void header();
-
- // Will be used by some reporters to print statements which are greped by
- // scripts. Example see java/tools/bin/perf-report
- public void log(String s);
-
- public void clear();
-
-}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org