You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/26 09:52:25 UTC
svn commit: r500188 [4/5] - in
/incubator/qpid/branches/perftesting/qpid/java: ./ broker/
broker/distribution/ broker/distribution/src/ broker/distribution/src/main/
broker/distribution/src/main/assembly/ client/ client/distribution/
client/distributio...
Added: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java?view=auto&rev=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java Fri Jan 26 00:52:20 2007
@@ -0,0 +1,134 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.pingpong;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.Level;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.Session;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Topic;
+import javax.jms.JMSException;
+import java.net.InetAddress;
+
+public class TestPingSubscriber
+{
+ private static final Logger _logger = Logger.getLogger(TestPingSubscriber.class);
+
+ private static class TestPingMessageListener implements MessageListener
+ {
+ public TestPingMessageListener()
+ {
+ }
+
+ long _lastTimestamp = 0L;
+ long _lastTimestampString = 0L;
+
+ public void onMessage(javax.jms.Message message)
+ {
+ Long time = System.nanoTime();
+
+ if (_logger.isInfoEnabled())
+ {
+ long timestampString = 0L;
+
+ try
+ {
+ long timestamp = message.getLongProperty("timestamp");
+ timestampString = Long.parseLong(message.getStringProperty("timestampString"));
+
+ if (timestampString != timestamp)
+ {
+ _logger.info("Timetamps differ!:\n" +
+ "timestamp:" + timestamp + "\n" +
+ "timestampString:" + timestampString);
+ }
+
+ }
+ catch (JMSException jmse)
+ {
+ _logger.error("JMSException caught:" + jmse.getMessage(), jmse);
+ }
+
+
+ long stringDiff = time - timestampString;
+
+ _logger.info("Ping: TS:" + stringDiff / 1000 + "us");
+
+ // _logger.info(_name + " got message '" + message + "\n");
+ }
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ _logger.info("Starting...");
+
+ if (args.length < 4)
+ {
+ System.out.println("Usage: brokerdetails username password virtual-path [selector] ");
+ System.exit(1);
+ }
+ try
+ {
+ InetAddress address = InetAddress.getLocalHost();
+ AMQConnection con1 = new AMQConnection(args[0], args[1], args[2],
+ address.getHostName(), args[3]);
+
+ _logger.info("Connected with URL:" + con1.toURL());
+
+ final org.apache.qpid.jms.Session session1 = (org.apache.qpid.jms.Session)
+ con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ String selector = null;
+
+ if (args.length == 5)
+ {
+ selector = args[4];
+ _logger.info("Message selector is <" + selector + ">...");
+ }
+ else
+ {
+ _logger.info("Not using message selector ");
+ }
+
+ Topic t = new AMQTopic("ping");
+
+ MessageConsumer consumer1 = session1.createConsumer(t,
+ 1, false, false, selector);
+
+ consumer1.setMessageListener(new TestPingMessageListener());
+ con1.start();
+ }
+ catch (Throwable t)
+ {
+ System.err.println("Fatal error: " + t);
+ t.printStackTrace();
+ }
+
+ System.out.println("Waiting...");
+ }
+}
+
Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java?view=auto&rev=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java Fri Jan 26 00:52:20 2007
@@ -0,0 +1,67 @@
+package org.apache.qpid.ping;
+
+/**
+ * Throttle is a helper class used in situations where a controlled rate of processing is desired. It allows a certain
+ * number of operations-per-second to be defined and supplies a {@link #throttle} method that can only be called at
+ * most at that rate. The first call to the throttle method will return immediately, subsequent calls will introduce
+ * a short pause to fill out the remainder of the current cycle to attain the desired rate. If there is no remainder
+ * left then it will return immediately.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class Throttle
+{
+ /** Holds the length of the cycle in nano seconds. */
+ long cycleLengthNanos = 0L;
+
+ /** Records the nano time of the last call to the throttle method. */
+ long lastTimeNanos = 0L;
+
+ /**
+ * Sets up the desired rate of operation per second that the throttle method should restrict to.
+ *
+ * @param opsPerSecond The maximum number of calls per second that the throttle method will take.
+ */
+ public void setRate(int opsPerSecond)
+ {
+ // Calculate the length of a cycle.
+ cycleLengthNanos = 1000000000 / opsPerSecond;
+ }
+
+ /**
+ * Introduces a short pause to fill out any time left in the cycle since this method was last called, of length
+ * defined by a call to the {@link #setRate} method.
+ */
+ public void throttle()
+ {
+ // Record the time now.
+ long currentTimeNanos = System.nanoTime();
+
+ // Check if there is any time remaining in the current cycle and introduce a short wait to fill out the
+ // remainder of the cycle if needed.
+ long remainingTimeNanos = cycleLengthNanos - (currentTimeNanos - lastTimeNanos);
+
+ if (remainingTimeNanos > 0)
+ {
+ long milliWait = remainingTimeNanos / 1000000;
+ int nanoWait = (int) (remainingTimeNanos % 1000000);
+
+ try
+ {
+ Thread.currentThread().sleep(milliWait, nanoWait);
+ }
+ catch (InterruptedException e)
+ {
+ // Just ignore this?
+ }
+ }
+
+ // Keep the time of the last call to this method to calculate the next cycle.
+ //lastTimeNanos = currentTimeNanos;
+ lastTimeNanos = System.nanoTime();
+ }
+}
Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?view=auto&rev=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java Fri Jan 26 00:52:20 2007
@@ -0,0 +1,300 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.net.InetAddress;
+import java.util.Date;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.ping.AbstractPingClient;
+import org.apache.qpid.topic.Config;
+
+/**
+ * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
+ * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes
+ * too.
+ *
+ * <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages
+ * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique
+ * temporary queue or the correlation id to correlate the original message to the reply.
+ *
+ * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
+ * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
+ * be disabled for real timing tests as writing to the console will slow things down.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Bounce back messages to their reply to destination.
+ * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url.
+ * </table>
+ *
+ * @todo Replace the command line parsing with a neater tool.
+ *
+ * @todo Make verbose accept a number of messages, only prints to console every X messages.
+ */
+public class PingPongBouncer extends AbstractPingClient implements MessageListener
+{
+ private static final Logger _logger = Logger.getLogger(PingPongBouncer.class);
+
+ /** The default prefetch size for the message consumer. */
+ private static final int PREFETCH = 1;
+
+ /** The default no local flag for the message consumer. */
+ private static final boolean NO_LOCAL = true;
+
+ private static final String DEFAULT_DESTINATION_NAME = "ping";
+
+ /** The default exclusive flag for the message consumer. */
+ private static final boolean EXCLUSIVE = false;
+
+ /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
+ private boolean _verbose = false;
+
+ /** Determines whether this bounce back client bounces back messages persistently. */
+ private boolean _persistent = false;
+
+ private Destination _consumerDestination;
+
+ /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
+ private Destination _lastResponseDest;
+
+ /** The producer for sending replies with. */
+ private MessageProducer _replyProducer;
+
+ /** The consumer session. */
+ private Session _consumerSession;
+
+ /** The producer session. */
+ private Session _producerSession;
+
+ /**
+ * Creates a PingPongBouncer on the specified producer and consumer sessions.
+ *
+ * @param brokerDetails The addresses of the brokers to connect to.
+ * @param username The broker username.
+ * @param password The broker password.
+ * @param virtualpath The virtual host name within the broker.
+ * @param destinationName The name of the queue to receive pings on
+ * (or root of the queue name where many queues are generated).
+ * @param persistent A flag to indicate that persistent message should be used.
+ * @param transacted A flag to indicate that pings should be sent within transactions.
+ * @param selector A message selector to filter received pings with.
+ * @param verbose A flag to indicate that message timings should be sent to the console.
+ *
+ * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
+ */
+ public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath,
+ String destinationName, boolean persistent, boolean transacted, String selector,
+ boolean verbose, boolean pubsub) throws Exception
+ {
+ // Create a client id to uniquely identify this client.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientId = address.getHostName() + System.currentTimeMillis();
+ _verbose = verbose;
+ _persistent = persistent;
+ setPubSub(pubsub);
+ // Connect to the broker.
+ setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
+ _logger.info("Connected with URL:" + getConnection().toURL());
+
+ // Set up the failover notifier.
+ getConnection().setConnectionListener(new FailoverNotifier());
+
+ // Create a session to listen for messages on and one to send replies on, transactional depending on the
+ // command line option.
+ _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+ // Create the queue to listen for message on.
+ createConsumerDestination(destinationName);
+ MessageConsumer consumer = _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+
+ // Create a producer for the replies, without a default destination.
+ _replyProducer = _producerSession.createProducer(null);
+ _replyProducer.setDisableMessageTimestamp(true);
+ _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ // Set this up to listen for messages on the queue.
+ consumer.setMessageListener(this);
+ }
+
+ private void createConsumerDestination(String name)
+ {
+ if (isPubSub())
+ {
+ _consumerDestination = new AMQTopic(name);
+ }
+ else
+ {
+ _consumerDestination = new AMQQueue(name);
+ }
+ }
+
+ /**
+ * Starts a stand alone ping-pong client running in verbose mode.
+ *
+ * @param args
+ */
+ public static void main(String[] args) throws Exception
+ {
+ System.out.println("Starting...");
+
+ // Display help on the command line.
+ if (args.length == 0)
+ {
+ _logger.info("Running test with default values...");
+ //usage();
+ //System.exit(0);
+ }
+
+ // Extract all command line parameters.
+ Config config = new Config();
+ config.setOptions(args);
+ String brokerDetails = config.getHost() + ":" + config.getPort();
+ String virtualpath = "/test";
+ String destinationName = config.getDestination();
+ if (destinationName == null)
+ {
+ destinationName = DEFAULT_DESTINATION_NAME;
+ }
+ String selector = config.getSelector();
+ boolean transacted = config.isTransacted();
+ boolean persistent = config.usePersistentMessages();
+ boolean pubsub = config.isPubSub();
+ boolean verbose = true;
+
+ //String selector = null;
+
+ // Instantiate the ping pong client with the command line options and start it running.
+ PingPongBouncer pingBouncer = new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath,
+ destinationName, persistent, transacted, selector, verbose, pubsub);
+ pingBouncer.getConnection().start();
+
+ System.out.println("Waiting...");
+ }
+
+ /**
+ * This is a callback method that is notified of all messages for which this has been registered as a message
+ * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
+ * destination of the message.
+ *
+ * @param message The message that triggered this callback.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ String messageCorrelationId = message.getJMSCorrelationID();
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, "
+ + messageCorrelationId);
+ }
+
+ // Get the reply to destination from the message and check it is set.
+ Destination responseDest = message.getJMSReplyTo();
+
+ if (responseDest == null)
+ {
+ _logger.debug("Cannot send reply because reply-to destination is null.");
+
+ return;
+ }
+
+ // Spew out some timing information if verbose mode is on.
+ if (_verbose)
+ {
+ Long timestamp = message.getLongProperty("timestamp");
+
+ if (timestamp != null)
+ {
+ long diff = System.currentTimeMillis() - timestamp;
+ _logger.info("Time to bounce point: " + diff);
+ }
+ }
+
+ // Correlate the reply to the original.
+ message.setJMSCorrelationID(messageCorrelationId);
+
+ // Send the receieved message as the pong reply.
+ _replyProducer.send(responseDest, message);
+
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, "
+ + messageCorrelationId);
+ }
+
+ // Commit the transaction if running in transactional mode.
+ commitTx(_producerSession);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+ }
+
+ private static void usage()
+ {
+ System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" +
+ "-destinationname : queue/topic name\n" +
+ "-transacted : (true/false). Default is false\n" +
+ "-persistent : (true/false). Default is false\n" +
+ "-pubsub : (true/false). Default is false\n" +
+ "-selector : selector string\n");
+ }
+
+ /**
+ * A connection listener that logs out any failover complete events. Could do more interesting things with this
+ * at some point...
+ */
+ public static class FailoverNotifier implements ConnectionListener
+ {
+ public void bytesSent(long count)
+ { }
+
+ public void bytesReceived(long count)
+ { }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _logger.info("App got failover complete callback.");
+ }
+ }
+}
Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=auto&rev=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Fri Jan 26 00:52:20 2007
@@ -0,0 +1,806 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.net.InetAddress;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.ping.AbstractPingProducer;
+import org.apache.qpid.ping.Throttle;
+import org.apache.qpid.topic.Config;
+
+/**
+ * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
+ * client (see {@link PingPongBouncer} for the bounce back client). It is designed to be run from the command line
+ * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session,
+ * message producer and message consumer to run the ping-pong cycle on.
+ * <p/>
+ * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
+ * This means that this class has to do some work to correlate pings with pongs; it expectes the original message
+ * id in the ping to be bounced back in the correlation id. If a new temporary queue per ping were used, then
+ * this correlation would not need to be done.
+ * <p/>
+ * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
+ * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
+ * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
+ * also registered to terminate the ping-pong loop cleanly.
+ * <p/>
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a ping and wait for response cycle.
+ * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
+ * </table>
+ *
+ * @todo Make temp queue per ping a command line option.
+ * @todo Make the queue name a command line option.
+ */
+public class PingPongProducer extends AbstractPingProducer implements Runnable, MessageListener, ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
+
+ /**
+ * Used to set up a default message size.
+ */
+ protected static final int DEFAULT_MESSAGE_SIZE = 0;
+
+ /**
+ * This is set and used when the test is for multiple-destinations
+ */
+ protected static final int DEFAULT_DESTINATION_COUNT = 0;
+
+ protected static final int DEFAULT_RATE = 0;
+
+ /**
+ * Used to define how long to wait between pings.
+ */
+ protected static final long SLEEP_TIME = 250;
+
+ /**
+ * Used to define how long to wait before assuming that a ping has timed out.
+ */
+ protected static final long TIMEOUT = 9000;
+
+ /**
+ * Holds the name of the destination to send pings on.
+ */
+ protected static final String PING_DESTINATION_NAME = "ping";
+
+ /**
+ * The batch size.
+ */
+ protected static final int DEFAULT_BATCH_SIZE = 100;
+
+ protected static final int PREFETCH = 100;
+ protected static final boolean NO_LOCAL = true;
+ protected static final boolean EXCLUSIVE = false;
+
+ /**
+ * The number of priming loops to run.
+ */
+ protected static final int PRIMING_LOOPS = 3;
+
+ /**
+ * A source for providing sequential unique correlation ids.
+ */
+ private static AtomicLong idGenerator = new AtomicLong(0L);
+
+ /**
+ * Holds a map from message ids to latches on which threads wait for replies.
+ */
+ private static Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
+
+ /**
+ * Destination where the responses messages will arrive
+ */
+ private Destination _replyDestination;
+
+ /**
+ * Destination where the producer will be sending message to
+ */
+ private Destination _pingDestination;
+
+ /**
+ * Determines whether this producer sends persistent messages from the run method.
+ */
+ protected boolean _persistent;
+
+ /**
+ * Holds the message size to send, from the run method.
+ */
+ protected int _messageSize;
+
+ /**
+ * Used to indicate that the ping loop should print out whenever it pings.
+ */
+ protected boolean _verbose = false;
+
+ protected Session _consumerSession;
+
+ /**
+ * Used to restrict the sending rate to a specified limit.
+ */
+ private Throttle rateLimiter = null;
+
+ /**
+ * The throttler can only reliably restrict to a few hundred cycles per second, so a throttling batch size is used
+ * to group sends together into batches large enough that the throttler runs slower than that.
+ */
+ int _throttleBatchSize;
+
+ private MessageListener _messageListener = null;
+
+ private PingPongProducer(String brokerDetails, String username, String password, String virtualpath, boolean transacted,
+ boolean persistent, int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
+ boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize, int rate)
+ throws Exception
+ {
+ // Create a connection to the broker.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+
+ setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
+
+ // Create transactional or non-transactional sessions, based on the command line arguments.
+ setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
+ _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+ _persistent = persistent;
+ _messageSize = messageSize;
+ _verbose = verbose;
+
+ // Set failover interrupts
+ _failAfterCommit = afterCommit;
+ _failBeforeCommit = beforeCommit;
+ _failAfterSend = afterSend;
+ _failBeforeSend = beforeSend;
+ _failOnce = failOnce;
+ _txBatchSize = batchSize;
+
+ // Calculate a throttling batch size and rate such that the throttle runs slower than 100 cycles per second
+ // and batched sends within each cycle multiply up to give the desired rate.
+ //
+ // total rate = throttle rate * batch size.
+ // 1 < throttle rate < 100
+ // 1 < total rate < 20000
+ if (rate > DEFAULT_RATE)
+ {
+ // Log base 10 over 2 is used here to get a feel for what power of 100 the total rate is.
+ // As the total rate goes up the powers of 100 the batch size goes up by powers of 100 to keep the
+ // throttle rate back into the range 1 to 100.
+ int x = (int) (Math.log10(rate) / 2);
+ _throttleBatchSize = (int) Math.pow(100, x);
+ int throttleRate = rate / _throttleBatchSize;
+
+ _logger.debug("rate = " + rate);
+ _logger.debug("x = " + x);
+ _logger.debug("_throttleBatchSize = " + _throttleBatchSize);
+ _logger.debug("throttleRate = " + throttleRate);
+
+ rateLimiter = new Throttle();
+ rateLimiter.setRate(throttleRate);
+ }
+ }
+
+ /**
+ * Creates a ping pong producer with the specified connection details and type.
+ *
+ * @param brokerDetails
+ * @param username
+ * @param password
+ * @param virtualpath
+ * @param transacted
+ * @throws Exception All allowed to fall through. This is only test code...
+ */
+ public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
+ String destinationName, String selector, boolean transacted, boolean persistent,
+ int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
+ boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize,
+ int noOfDestinations, int rate, boolean pubsub) throws Exception
+ {
+ this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose, afterCommit,
+ beforeCommit, afterSend, beforeSend, failOnce, batchSize, rate);
+
+ _destinationCount = noOfDestinations;
+ setPubSub(pubsub);
+
+ if (noOfDestinations == DEFAULT_DESTINATION_COUNT)
+ {
+ if (destinationName != null)
+ {
+ createPingDestination(destinationName);
+ // Create producer and the consumer
+ createProducer();
+ createConsumer(selector);
+ }
+ else
+ {
+ _logger.error("Destination is not specified");
+ throw new IllegalArgumentException("Destination is not specified");
+ }
+ }
+ }
+
+ private void createPingDestination(String name)
+ {
+ if (isPubSub())
+ {
+ _pingDestination = new AMQTopic(name);
+ }
+ else
+ {
+ _pingDestination = new AMQQueue(name);
+ }
+ }
+
+ /**
+ * Creates the producer to send the pings on. If the tests are with nultiple-destinations, then producer
+ * is created with null destination, so that any destination can be specified while sending
+ *
+ * @throws JMSException
+ */
+ public void createProducer() throws JMSException
+ {
+ if (getDestinationsCount() > DEFAULT_DESTINATION_COUNT)
+ {
+ // create producer with initial destination as null for test with multiple-destinations
+ // In this case, a different destination will be used while sending the message
+ _producer = (MessageProducer) getProducerSession().createProducer(null);
+ }
+ else
+ {
+ // Create a producer with known destination to send the pings on.
+ _producer = (MessageProducer) getProducerSession().createProducer(_pingDestination);
+
+ }
+
+ _producer.setDisableMessageTimestamp(true);
+ _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ }
+
+ /**
+ * Creates the temporary destination to listen to the responses
+ *
+ * @param selector
+ * @throws JMSException
+ */
+ public void createConsumer(String selector) throws JMSException
+ {
+ // Create a temporary destination to get the pongs on.
+ if (isPubSub())
+ {
+ _replyDestination = _consumerSession.createTemporaryTopic();
+ }
+ else
+ {
+ _replyDestination = _consumerSession.createTemporaryQueue();
+ }
+
+ // Create a message consumer to get the replies with and register this to be called back by it.
+ MessageConsumer consumer = _consumerSession.createConsumer(_replyDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+ consumer.setMessageListener(this);
+ }
+
+ /**
+ * Creates consumer instances for each destination. This is used when test is being done with multiple destinations.
+ *
+ * @param selector
+ * @throws JMSException
+ */
+ public void createConsumers(String selector) throws JMSException
+ {
+ for (int i = 0; i < getDestinationsCount(); i++)
+ {
+ MessageConsumer consumer =
+ getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
+ consumer.setMessageListener(this);
+ }
+ }
+
+
+ public Session getConsumerSession()
+ {
+ return _consumerSession;
+ }
+
+ public Destination getPingDestination()
+ {
+ return _pingDestination;
+ }
+
+ protected void setPingDestination(Destination destination)
+ {
+ _pingDestination = destination;
+ }
+
+ /**
+ * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs
+ * to be started to bounce the pings back again.
+ * <p/>
+ * <p/>The command line takes from 2 to 4 arguments:
+ * <p/><table>
+ * <tr><td>brokerDetails <td> The broker connection string.
+ * <tr><td>virtualPath <td> The virtual path.
+ * <tr><td>transacted <td> A boolean flag, telling this client whether or not to use transactions.
+ * <tr><td>size <td> The size of ping messages to use, in bytes.
+ * </table>
+ *
+ * @param args The command line arguments as defined above.
+ */
+ public static void main(String[] args) throws Exception
+ {
+ // Extract the command line.
+ Config config = new Config();
+ config.setOptions(args);
+ if (args.length == 0)
+ {
+ _logger.info("Running test with default values...");
+ //usage();
+ //System.exit(0);
+ }
+
+ String brokerDetails = config.getHost() + ":" + config.getPort();
+ String virtualpath = "/test";
+ String selector = config.getSelector();
+ boolean verbose = true;
+ boolean transacted = config.isTransacted();
+ boolean persistent = config.usePersistentMessages();
+ int messageSize = (config.getPayload() != 0) ? config.getPayload() : DEFAULT_MESSAGE_SIZE;
+ //int messageCount = config.getMessages();
+ int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DEFAULT_DESTINATION_COUNT;
+ int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_BATCH_SIZE;
+ int rate = (config.getRate() != 0) ? config.getRate() : DEFAULT_RATE;
+ boolean pubsub = config.isPubSub();
+
+ String destName = config.getDestination();
+ if (destName == null)
+ {
+ destName = PING_DESTINATION_NAME;
+ }
+
+ boolean afterCommit = false;
+ boolean beforeCommit = false;
+ boolean afterSend = false;
+ boolean beforeSend = false;
+ boolean failOnce = false;
+
+ for (String arg : args)
+ {
+ if (arg.startsWith("failover:"))
+ {
+ //failover:<before|after>:<send:commit> | failover:once
+ String[] parts = arg.split(":");
+ if (parts.length == 3)
+ {
+ if (parts[2].equals("commit"))
+ {
+ afterCommit = parts[1].equals("after");
+ beforeCommit = parts[1].equals("before");
+ }
+
+ if (parts[2].equals("send"))
+ {
+ afterSend = parts[1].equals("after");
+ beforeSend = parts[1].equals("before");
+ }
+
+ if (parts[1].equals("once"))
+ {
+ failOnce = true;
+ }
+ }
+ else
+ {
+ System.out.println("Unrecognized failover request:" + arg);
+ }
+ }
+ }
+
+ // Create a ping producer to handle the request/wait/reply cycle.
+ PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath,
+ destName, selector, transacted, persistent, messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+ destCount, rate, pubsub);
+
+ pingProducer.getConnection().start();
+
+ // Run a few priming pings to remove warm up time from test results.
+ //pingProducer.prime(PRIMING_LOOPS);
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
+
+ // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+ pingProducer.getConnection().setExceptionListener(pingProducer);
+
+ // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
+ Thread pingThread = new Thread(pingProducer);
+ pingThread.run();
+ pingThread.join();
+ }
+
+ private static void usage()
+ {
+ System.err.println("Usage: TestPingPublisher \n" + "-host : broker host" + "-port : broker port" +
+ "-destinationname : queue/topic name\n" +
+ "-transacted : (true/false). Default is false\n" +
+ "-persistent : (true/false). Default is false\n" +
+ "-pubsub : (true/false). Default is false\n" +
+ "-selector : selector string\n" +
+ "-payload : paylaod size. Default is 0\n" +
+ //"-messages : no of messages to be sent (if 0, the ping loop will run indefinitely)\n" +
+ "-destinationscount : no of destinations for multi-destinations test\n" +
+ "-batchsize : batch size\n" +
+ "-rate : thruput rate\n");
+ }
+
+ /**
+ * Primes the test loop by sending a few messages, then introduces a short wait. This allows the bounce back client
+ * on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling
+ * this a few times, in order to prime the JVMs JIT compilation.
+ *
+ * @param x The number of priming loops to run.
+ * @throws JMSException All underlying exceptions are allowed to fall through.
+ */
+ public void prime(int x) throws JMSException
+ {
+ for (int i = 0; i < x; i++)
+ {
+ // Create and send a small message.
+ Message first = getTestMessage(_replyDestination, 0, false);
+ sendMessage(first);
+
+ commitTx();
+
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ignore)
+ {
+
+ }
+ }
+
+
+ }
+
+ /**
+ * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
+ * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected
+ * in the replies map.
+ *
+ * @param message The received message.
+ */
+ public void onMessage(Message message)
+ {
+
+ try
+ {
+
+ // Store the reply, if it has a correlation id that is expected.
+ String correlationID = message.getJMSCorrelationID();
+
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Got reply with correlation id, " + correlationID);
+ //_logger.debug("Received from : " + message.getJMSDestination());
+ }
+
+ // Turn the traffic light to green.
+ CountDownLatch trafficLight = trafficLights.get(correlationID);
+
+ if (trafficLight != null)
+ {
+ if (_messageListener != null)
+ {
+ synchronized (trafficLight)
+ {
+ _messageListener.onMessage(message);
+ trafficLight.countDown();
+ }
+ }
+ else
+ {
+ trafficLight.countDown();
+ }
+
+ _logger.trace("Reply was expected, decrementing the latch for the id.");
+
+ long remainingCount = trafficLight.getCount();
+
+ if ((remainingCount % _txBatchSize) == 0)
+ {
+ commitTx(getConsumerSession());
+ }
+
+ }
+ else
+ {
+ _logger.trace("There was no thread waiting for reply: " + correlationID);
+ }
+
+ if (_verbose)
+ {
+ Long timestamp = message.getLongProperty("timestamp");
+
+ if (timestamp != null)
+ {
+ long diff = System.currentTimeMillis() - timestamp;
+ _logger.trace("Time for round trip: " + diff);
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("There was a JMSException: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
+ * before a reply arrives, then a null reply is returned from this method.
+ *
+ * @param message The message to send.
+ * @param numPings The number of ping messages to send.
+ * @param timeout The timeout in milliseconds.
+ * @return The number of replies received. This may be less than the number sent if the timeout terminated the
+ * wait for all prematurely.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
+ {
+ String messageCorrelationId = null;
+
+ try
+ {
+ // Put a unique correlation id on the message before sending it.
+ messageCorrelationId = Long.toString(getNewID());
+
+ pingNoWaitForReply(message, numPings, messageCorrelationId);
+
+ CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
+ // Block the current thread until a reply to the message is received, or it times out.
+ trafficLight.await(timeout, TimeUnit.MILLISECONDS);
+
+ // Work out how many replies were receieved.
+ int numReplies = numPings - (int) trafficLight.getCount();
+
+ if ((numReplies < numPings) && _verbose)
+ {
+ _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ }
+ else if (_verbose)
+ {
+ _logger.info("Got all replies on id, " + messageCorrelationId);
+ }
+
+ commitTx(getConsumerSession());
+
+ return numReplies;
+ }
+ finally
+ {
+ removeLock(messageCorrelationId);
+ }
+ }
+
+ public long getNewID()
+ {
+ return idGenerator.incrementAndGet();
+ }
+
+ public CountDownLatch removeLock(String correlationID)
+ {
+ return trafficLights.remove(correlationID);
+ }
+
+
+ /*
+ * Sends the specified ping message but does not wait for a correlating reply.
+ *
+ * @param message The message to send.
+ * @param numPings The number of pings to send.
+ * @return The reply, or null if no reply arrives before the timeout.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException
+ {
+ // Create a count down latch to count the number of replies with. This is created before the message is sent
+ // so that the message is not received before the count down is created.
+ CountDownLatch trafficLight = new CountDownLatch(numPings);
+ trafficLights.put(messageCorrelationId, trafficLight);
+
+ message.setJMSCorrelationID(messageCorrelationId);
+
+ // Set up a committed flag to detect uncommitted message at the end of the send loop. This may occurr if the
+ // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
+ // needed.
+ boolean committed = false;
+
+ // Send all of the ping messages.
+ for (int i = 0; i < numPings; i++)
+ {
+ // Reset the committed flag to indicate that there are uncommitted message.
+ committed = false;
+
+ // Re-timestamp the message.
+ message.setLongProperty("timestamp", System.currentTimeMillis());
+
+ // Check if the test is with multiple-destinations, in which case round robin the destinations
+ // as the messages are sent.
+ if (getDestinationsCount() > DEFAULT_DESTINATION_COUNT)
+ {
+ sendMessage(getDestination(i % getDestinationsCount()), message);
+ }
+ else
+ {
+ sendMessage(message);
+ }
+
+ // Apply message rate throttling if a rate limit has been set up and the throttling batch limit has been
+ // reached. See the comment on the throttle batch size for information about the use of batches here.
+ if ((rateLimiter != null) && ((i % _throttleBatchSize) == 0))
+ {
+ rateLimiter.throttle();
+ }
+
+ // Call commit every time the commit batch size is reached.
+ if ((i % _txBatchSize) == 0)
+ {
+ commitTx();
+ committed = true;
+ }
+ }
+
+ // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
+ if (!committed)
+ {
+ commitTx();
+ }
+
+ // Spew out per message timings only in verbose mode.
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
+ }
+
+ }
+
+ /**
+ * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and
+ * waits for replies and inserts short pauses in between each.
+ */
+ public void pingLoop()
+ {
+ try
+ {
+ // Generate a sample message and time stamp it.
+ ObjectMessage msg = getTestMessage(_replyDestination, _messageSize, _persistent);
+ msg.setLongProperty("timestamp", System.currentTimeMillis());
+
+ // Send the message and wait for a reply.
+ pingAndWaitForReply(msg, DEFAULT_BATCH_SIZE, TIMEOUT);
+
+ // Introduce a short pause if desired.
+ pause(SLEEP_TIME);
+ }
+ catch (JMSException e)
+ {
+ _publish = false;
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+ catch (InterruptedException e)
+ {
+ _publish = false;
+ _logger.debug("There was an interruption: " + e.getMessage(), e);
+ }
+ }
+
+ public Destination getReplyDestination()
+ {
+ return _replyDestination;
+ }
+
+ protected void setReplyDestination(Destination destination)
+ {
+ _replyDestination = destination;
+ }
+
+ public void setMessageListener(MessageListener messageListener)
+ {
+ _messageListener = messageListener;
+ }
+
+ public CountDownLatch getEndLock(String correlationID)
+ {
+ return trafficLights.get(correlationID);
+ }
+
+ /*
+ * When the test is being performed with multiple queues, then this method will be used, which has a loop to
+ * pick up the next queue from the queues list and sends message to it.
+ *
+ * @param message
+ * @param numPings
+ * @throws JMSException
+ */
+ /*private void pingMultipleQueues(Message message, int numPings) throws JMSException
+ {
+ int queueIndex = 0;
+ for (int i = 0; i < numPings; i++)
+ {
+ // Re-timestamp the message.
+ message.setLongProperty("timestamp", System.currentTimeMillis());
+
+ sendMessage(getDestination(queueIndex++), message);
+
+ // reset the counter to get the first queue
+ if (queueIndex == (getDestinationsCount() - 1))
+ {
+ queueIndex = 0;
+ }
+ }
+ }*/
+
+ /**
+ * A connection listener that logs out any failover complete events. Could do more interesting things with this
+ * at some point...
+ */
+ public static class FailoverNotifier implements ConnectionListener
+ {
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true; //Allow failover
+ }
+
+ public boolean preResubscribe()
+ {
+ return true; // Allow resubscription
+ }
+
+ public void failoverComplete()
+ {
+ _logger.info("App got failover complete callback.");
+ }
+ }
+}
Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java?view=diff&rev=500188&r1=500187&r2=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java Fri Jan 26 00:52:20 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,11 +21,11 @@
package org.apache.qpid.requestreply;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.jms.Session;
import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.jms.Session;
import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
@@ -42,12 +42,22 @@
private AMQConnection _connection;
+ private Session _session;
+ private Session _producerSession;
+
+ private boolean _isTransactional;
+
public ServiceProvidingClient(String brokerDetails, String username, String password,
- String clientName, String virtualPath, String serviceName)
+ String clientName, String virtualPath, String serviceName,
+ final int deliveryMode, boolean transactedMode, String selector)
throws AMQException, JMSException, URLSyntaxException
{
- _connection = new AMQConnection(brokerDetails, username, password,
- clientName, virtualPath);
+ _isTransactional = transactedMode;
+
+ _logger.info("Delivery Mode: " + (deliveryMode == DeliveryMode.NON_PERSISTENT ? "Non Persistent" : "Persistent")
+ + "\t isTransactional: " + _isTransactional);
+
+ _connection = new AMQConnection(brokerDetails, username, password, clientName, virtualPath);
_connection.setConnectionListener(new ConnectionListener()
{
@@ -74,14 +84,15 @@
_logger.info("App got failover complete callback");
}
});
- final Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
+ _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
_logger.info("Service (queue) name is '" + serviceName + "'...");
AMQQueue destination = new AMQQueue(serviceName);
- MessageConsumer consumer = session.createConsumer(destination,
- 100, true, false, null);
+ MessageConsumer consumer = _session.createConsumer(destination,
+ 100, true, false, selector);
consumer.setMessageListener(new MessageListener()
{
@@ -90,9 +101,7 @@
public void onMessage(Message message)
{
//_logger.info("Got message '" + message + "'");
-
TextMessage tm = (TextMessage) message;
-
try
{
Destination responseDest = tm.getJMSReplyTo();
@@ -107,9 +116,9 @@
_responseDest = responseDest;
_logger.info("About to create a producer");
- _destinationProducer = session.createProducer(responseDest);
+ _destinationProducer = _producerSession.createProducer(responseDest);
_destinationProducer.setDisableMessageTimestamp(true);
- _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ _destinationProducer.setDeliveryMode(deliveryMode);
_logger.info("After create a producer");
}
}
@@ -127,14 +136,25 @@
try
{
String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText();
- TextMessage msg = session.createTextMessage(payload);
+ TextMessage msg = _producerSession.createTextMessage(payload);
if (tm.propertyExists("timeSent"))
{
_logger.info("timeSent property set on message");
- _logger.info("timeSent value is: " + tm.getLongProperty("timeSent"));
- msg.setStringProperty("timeSent", tm.getStringProperty("timeSent"));
+ long timesent = tm.getLongProperty("timeSent");
+ _logger.info("timeSent value is: " + timesent);
+ msg.setLongProperty("timeSent", timesent);
}
+
_destinationProducer.send(msg);
+
+ if (_isTransactional)
+ {
+ _producerSession.commit();
+ }
+ if (_isTransactional)
+ {
+ _session.commit();
+ }
if (_messageCount % 1000 == 0)
{
_logger.info("Sent response to '" + _responseDest + "'");
@@ -160,7 +180,7 @@
if (args.length < 5)
{
- System.out.println("Usage: brokerDetails username password virtual-path serviceQueue [selector]");
+ System.out.println("Usage: serviceProvidingClient <brokerDetails> <username> <password> <virtual-path> <serviceQueue> [<P[ersistent]|N[onPersistent]> <T[ransacted]|N[onTransacted]>] [selector]");
System.exit(1);
}
String clientId = null;
@@ -174,10 +194,28 @@
_logger.error("Error: " + e, e);
}
+ int deliveryMode = DeliveryMode.NON_PERSISTENT;
+ boolean transactedMode = false;
+
+ if (args.length > 7)
+ {
+ deliveryMode = args[args.length - 2].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT
+ : DeliveryMode.NON_PERSISTENT;
+
+ transactedMode = args[args.length - 1].toUpperCase().charAt(0) == 'T' ? true : false;
+ }
+
+ String selector = null;
+ if ((args.length == 8) || (args.length == 7))
+ {
+ selector = args[args.length - 1];
+ }
+
try
{
ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2],
- clientId, args[3], args[4]);
+ clientId, args[3], args[4],
+ deliveryMode, transactedMode, selector);
client.run();
}
catch (JMSException e)
@@ -192,10 +230,6 @@
{
_logger.error("Error: " + e, e);
}
-
-
-
}
-
}
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java?view=diff&rev=500188&r1=500187&r2=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java Fri Jan 26 00:52:20 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,13 +22,15 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
import java.net.InetAddress;
@@ -38,26 +40,33 @@
* A client that behaves as follows:
* <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
* <li>Creates a temporary queue</li>
- * <li>Creates messages containing a property that is the name of the temporary queue</li>
- * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
+ * <li>Creates messages containing a property(reply-to) that is the name of the temporary queue</li>
+ * <li>Fires off a message on the original queue and registers the callbackHandler to listen to the response on the temporary queue</li>
+ * <li>Start the loop to send all messages</li>
+ * <li>CallbackHandler keeps listening to the responses and exits if all the messages have been received back or
+ * if the waiting time for next message is elapsed</li>
* </ul>
- *
*/
public class ServiceRequestingClient implements ExceptionListener
{
private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);
- private static final String MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
+ private long _messageIdentifier = 0;
+
+ // time for which callbackHandler should wait for a message before exiting. Default time= 60 secs
+ private static long _callbackHandlerWaitingTime = 60000;
private String MESSAGE_DATA;
private AMQConnection _connection;
private Session _session;
+ private Session _producerSession;
private long _averageLatency;
private int _messageCount;
+ private boolean _isTransactional;
private volatile boolean _completed;
@@ -67,40 +76,25 @@
private Object _waiter;
- private static String createMessagePayload(int size)
- {
- _log.info("Message size set to " + size + " bytes");
- StringBuffer buf = new StringBuffer(size);
- int count = 0;
- while (count < size + MESSAGE_DATA_BYTES.length())
- {
- buf.append(MESSAGE_DATA_BYTES);
- count += MESSAGE_DATA_BYTES.length();
- }
- if (count < size)
- {
- buf.append(MESSAGE_DATA_BYTES, 0, size - count);
- }
-
- return buf.toString();
- }
-
private class CallbackHandler implements MessageListener
{
- private int _expectedMessageCount;
-
private int _actualMessageCount;
private long _startTime;
+ // The time when the last message was received by the callbackHandler
+ private long _messageReceivedTime = 0;
+ private Object _timerCallbackHandler = new Object();
- public CallbackHandler(int expectedMessageCount, long startTime)
+ public CallbackHandler(long startTime)
{
- _expectedMessageCount = expectedMessageCount;
_startTime = startTime;
+ // Start the timer thread, which will keep checking if test should exit because the waiting time has elapsed
+ (new Thread(new TimerThread())).start();
}
public void onMessage(Message m)
{
+ _messageReceivedTime = System.currentTimeMillis();
if (_log.isDebugEnabled())
{
_log.debug("Message received: " + m);
@@ -110,20 +104,23 @@
m.getPropertyNames();
if (m.propertyExists("timeSent"))
{
- long timeSent = Long.parseLong(m.getStringProperty("timeSent"));
- long now = System.currentTimeMillis();
+ long timeSent = m.getLongProperty("timeSent");
if (_averageLatency == 0)
{
- _averageLatency = now - timeSent;
+ _averageLatency = _messageReceivedTime - timeSent;
_log.info("Latency " + _averageLatency);
}
else
{
- _log.info("Individual latency: " + (now - timeSent));
- _averageLatency = (_averageLatency + (now - timeSent)) / 2;
+ _log.info("Individual latency: " + (_messageReceivedTime - timeSent));
+ _averageLatency = (_averageLatency + (_messageReceivedTime - timeSent)) / 2;
_log.info("Average latency now: " + _averageLatency);
}
}
+ if(_isTransactional)
+ {
+ _session.commit();
+ }
}
catch (JMSException e)
{
@@ -135,26 +132,114 @@
_log.info("Received message count: " + _actualMessageCount);
}
- if (_actualMessageCount == _expectedMessageCount)
+ checkForMessageID(m);
+
+ if (_actualMessageCount == _messageCount)
{
- _completed = true;
- notifyWaiter();
- long timeTaken = System.currentTimeMillis() - _startTime;
- _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " +
- timeTaken + "ms, equivalent to " +
- (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second");
+ finishTesting(_actualMessageCount);
+ }
+ }
+
+ /**
+ * sets completed flag to true, closes the callbackHandler connection and notifies the waiter thread,
+ * so that the callbackHandler can finish listening for messages. This causes the test to finish.
+ * @param receivedMessageCount
+ */
+ private void finishTesting(int receivedMessageCount)
+ {
+ _completed = true;
+ notifyWaiter();
+ notifyTimerThread();
+
+ long timeTaken = System.currentTimeMillis() - _startTime;
+ _log.info("***** Result *****");
+ _log.info("Total messages received = " + receivedMessageCount);
+ _log.info("Total time taken to receive " + receivedMessageCount + " messages was " +
+ timeTaken + "ms, equivalent to " +
+ (receivedMessageCount / (timeTaken / 1000.0)) + " messages per second");
+
+ try
+ {
+ _connection.close();
+ _log.info("Connection closed");
+ }
+ catch (JMSException e)
+ {
+ _log.error("Error closing connection");
+ }
+ }
- try
+ private void notifyTimerThread()
+ {
+ if (_timerCallbackHandler != null)
+ {
+ synchronized (_timerCallbackHandler)
{
- _connection.close();
- _log.info("Connection closed");
+ _timerCallbackHandler.notify();
}
- catch (JMSException e)
+ }
+ }
+
+ /**
+ * Thread class implementing the timer for callbackHandler. The thread will exit the test if the waiting time
+ * has elapsed before next message is received.
+ */
+ private class TimerThread implements Runnable
+ {
+ public void run()
+ {
+ do
{
- _log.error("Error closing connection");
+ try
+ {
+ synchronized(_timerCallbackHandler)
+ {
+ _timerCallbackHandler.wait(_callbackHandlerWaitingTime);
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+
+ }
+
+ // exit if callbackHandler has received all messages
+ if (_completed)
+ {
+ return;
+ }
}
+ while ((System.currentTimeMillis() - _messageReceivedTime) < _callbackHandlerWaitingTime);
+
+ // waiting time has elapsed, so exit the test
+ _log.info("");
+ _log.info("Exited after waiting for " + _callbackHandlerWaitingTime/1000 + " secs");
+ finishTesting(_actualMessageCount);
}
}
+ } // end of CallbackHandler class
+
+ /**
+ * Checks if the received AMQ Message ID(delivery tag) is in sequence, by comparing it with the AMQ MessageID
+ * of previous message.
+ * @param receivedMsg
+ */
+ private void checkForMessageID(Message receivedMsg)
+ {
+ try
+ {
+ JMSTextMessage msg = (JMSTextMessage)receivedMsg;
+ if (! (msg.getDeliveryTag() == _messageIdentifier + 1))
+ {
+ _log.info("Out of sequence message received. Previous AMQ MessageID= " + _messageIdentifier +
+ ", Received AMQ messageID= " + receivedMsg.getJMSMessageID());
+ }
+ _messageIdentifier = msg.getDeliveryTag();
+ }
+ catch (Exception ex)
+ {
+ _log.error("Error in checking messageID ", ex);
+ }
+
}
private void notifyWaiter()
@@ -167,25 +252,31 @@
}
}
}
+
public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password,
String vpath, String commandQueueName,
+ int deliveryMode, boolean transactedMode,
final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException
{
+ _isTransactional = transactedMode;
+
+ _log.info("Delivery Mode: " + (deliveryMode == DeliveryMode.NON_PERSISTENT ? "Non Persistent" : "Persistent"));
+ _log.info("isTransactional: " + _isTransactional);
+
_messageCount = messageCount;
- MESSAGE_DATA = createMessagePayload(messageDataLength);
+ MESSAGE_DATA = TestMessageFactory.createMessagePayload(messageDataLength);
try
{
createConnection(brokerHosts, clientID, username, password, vpath);
- _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+ _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
+ _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
_connection.setExceptionListener(this);
-
AMQQueue destination = new AMQQueue(commandQueueName);
- _producer = (MessageProducer) _session.createProducer(destination);
+ _producer = (MessageProducer) _producerSession.createProducer(destination);
_producer.setDisableMessageTimestamp(true);
- _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ _producer.setDeliveryMode(deliveryMode);
_tempDestination = new AMQQueue("TempResponse" +
Long.toString(System.currentTimeMillis()), true);
@@ -195,7 +286,11 @@
//Send first message, then wait a bit to allow the provider to get initialised
TextMessage first = _session.createTextMessage(MESSAGE_DATA);
first.setJMSReplyTo(_tempDestination);
- _producer.send(first);
+ _producer.send(first);
+ if (_isTransactional)
+ {
+ _producerSession.commit();
+ }
try
{
Thread.sleep(1000);
@@ -207,7 +302,7 @@
//now start the clock and the test...
final long startTime = System.currentTimeMillis();
- messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime));
+ messageConsumer.setMessageListener(new CallbackHandler(startTime));
}
catch (JMSException e)
{
@@ -217,6 +312,7 @@
/**
* Run the test and notify an object upon receipt of all responses.
+ *
* @param waiter the object that will be notified
* @throws JMSException
*/
@@ -226,14 +322,19 @@
_connection.start();
for (int i = 1; i < _messageCount; i++)
{
- TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i);
+ TextMessage msg = _producerSession.createTextMessage(MESSAGE_DATA + i);
msg.setJMSReplyTo(_tempDestination);
if (i % 1000 == 0)
{
long timeNow = System.currentTimeMillis();
- msg.setStringProperty("timeSent", String.valueOf(timeNow));
+ msg.setLongProperty("timeSent", timeNow);
+ }
+ _producer.send(msg);
+ if (_isTransactional)
+ {
+ _producerSession.commit();
}
- _producer.send(msg);
+
}
_log.info("Finished sending " + _messageCount + " messages");
}
@@ -246,8 +347,7 @@
private void createConnection(String brokerHosts, String clientID, String username, String password,
String vpath) throws AMQException, URLSyntaxException
{
- _connection = new AMQConnection(brokerHosts, username, password,
- clientID, vpath);
+ _connection = new AMQConnection(brokerHosts, username, password, clientID, vpath);
}
/**
@@ -256,22 +356,48 @@
*/
public static void main(String[] args)
{
- if (args.length < 6)
+ if ((args.length < 6) || (args.length == 8))
{
- System.err.println(
- "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <number of messages> <message size>");
+ System.err.println("Usage: ServiceRequestingClient <brokerDetails> <username> <password> <vpath> " +
+ "<command queue name> <number of messages> [<message size>] " +
+ "[<P[ersistent]|N[onPersistent] (Default N)> <T[ransacted]|N[onTransacted] (Default N)>] " +
+ "[<waiting time for response in sec (default 60 sec)>]");
+ System.exit(1);
}
try
{
- int messageDataLength = args.length > 6 ? Integer.parseInt(args[6]) : 4096;
+ int messageSize = 4096;
+ boolean transactedMode = false;
+ int deliveryMode = DeliveryMode.NON_PERSISTENT;
+
+ if (args.length > 6)
+ {
+ messageSize = Integer.parseInt(args[6]);
+ }
+ if (args.length > 7)
+ {
+ deliveryMode = args[7].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT
+ : DeliveryMode.NON_PERSISTENT;
+
+ transactedMode = args[8].toUpperCase().charAt(0) == 'T' ? true : false;
+ }
+
+ if (args.length > 9)
+ {
+ _callbackHandlerWaitingTime = Long.parseLong(args[9]) * 1000;
+ }
+
+ _log.info("Each message size = " + messageSize + " bytes");
InetAddress address = InetAddress.getLocalHost();
String clientID = address.getHostName() + System.currentTimeMillis();
ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3],
- args[4], Integer.parseInt(args[5]),
- messageDataLength);
+ args[4], deliveryMode, transactedMode, Integer.parseInt(args[5]),
+ messageSize);
Object waiter = new Object();
client.run(waiter);
+
+ // Start a thread to
synchronized (waiter)
{
while (!client.isCompleted())
@@ -279,7 +405,6 @@
waiter.wait();
}
}
-
}
catch (UnknownHostException e)
{
@@ -292,7 +417,7 @@
}
}
- /**
+ /**
* @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
*/
public void onException(JMSException e)
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java?view=diff&rev=500188&r1=500187&r2=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java Fri Jan 26 00:52:20 2007
@@ -22,14 +22,12 @@
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.config.ConnectorConfig;
-import org.apache.qpid.config.ConnectionFactoryInitialiser;
import org.apache.qpid.config.Connector;
import org.apache.qpid.config.AbstractConfig;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-class Config extends AbstractConfig implements ConnectorConfig
+public class Config extends AbstractConfig implements ConnectorConfig
{
private String host = "localhost";
@@ -45,23 +43,30 @@
private int ackMode= AMQSession.NO_ACKNOWLEDGE;
private String clientId;
private String subscriptionId;
+ private String selector;
+ private String destinationName;
private boolean persistent;
+ private boolean transacted;
+ private int destinationsCount;
+ private int batchSize;
+ private int rate;
+ private boolean ispubsub;
public Config()
{
}
- int getAckMode()
+ public int getAckMode()
{
return ackMode;
}
- void setPayload(int payload)
+ public void setPayload(int payload)
{
this.payload = payload;
}
- int getPayload()
+ public int getPayload()
{
return payload;
}
@@ -81,11 +86,26 @@
this.messages = messages;
}
- int getMessages()
+ public int getMessages()
{
return messages;
}
+ public int getBatchSize()
+ {
+ return batchSize;
+ }
+
+ public int getRate()
+ {
+ return rate;
+ }
+
+ public int getDestinationsCount()
+ {
+ return destinationsCount;
+ }
+
public String getHost()
{
return host;
@@ -141,21 +161,41 @@
this.delay = delay;
}
- String getClientId()
+ public String getClientId()
{
return clientId;
}
- String getSubscriptionId()
+ public String getSubscriptionId()
{
return subscriptionId;
}
- boolean usePersistentMessages()
+ public String getSelector()
+ {
+ return selector;
+ }
+
+ public String getDestination()
+ {
+ return destinationName;
+ }
+
+ public boolean usePersistentMessages()
{
return persistent;
}
+ public boolean isTransacted()
+ {
+ return transacted;
+ }
+
+ public boolean isPubSub()
+ {
+ return ispubsub;
+ }
+
public void setOption(String key, String value)
{
if("-host".equalsIgnoreCase(key))
@@ -216,6 +256,34 @@
else if("-persistent".equalsIgnoreCase(key))
{
persistent = "true".equalsIgnoreCase(value);
+ }
+ else if("-transacted".equalsIgnoreCase(key))
+ {
+ transacted = "true".equalsIgnoreCase(value);
+ }
+ else if ("-destinationscount".equalsIgnoreCase(key))
+ {
+ destinationsCount = parseInt("Bad destinations count", value);
+ }
+ else if ("-batchsize".equalsIgnoreCase(key))
+ {
+ batchSize = parseInt("Bad batch size", value);
+ }
+ else if ("-rate".equalsIgnoreCase(key))
+ {
+ rate = parseInt("MEssage rate", value);
+ }
+ else if("-pubsub".equalsIgnoreCase(key))
+ {
+ ispubsub = "true".equalsIgnoreCase(value);
+ }
+ else if("-selector".equalsIgnoreCase(key))
+ {
+ selector = value;
+ }
+ else if("-destinationname".equalsIgnoreCase(key))
+ {
+ destinationName = value;
}
else
{
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java?view=diff&rev=500188&r1=500187&r2=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java Fri Jan 26 00:52:20 2007
@@ -51,7 +51,7 @@
_factory.createControlConsumer().setMessageListener(this);
_connection.start();
- if(warmup > 0)
+ if (warmup > 0)
{
System.out.println("Runing warmup (" + warmup + " msgs)");
long time = batch(warmup, consumerCount);
@@ -59,11 +59,14 @@
}
long[] times = new long[batches];
- for(int i = 0; i < batches; i++)
+ for (int i = 0; i < batches; i++)
{
- if(i > 0) Thread.sleep(delay*1000);
+ if (i > 0)
+ {
+ Thread.sleep(delay * 1000);
+ }
times[i] = batch(msgCount, consumerCount);
- System.out.println("Batch " + (i+1) + " of " + batches + " completed in " + times[i] + " ms.");
+ System.out.println("Batch " + (i + 1) + " of " + batches + " completed in " + times[i] + " ms.");
}
long min = min(times);
@@ -131,7 +134,7 @@
static long min(long[] times)
{
long min = times.length > 0 ? times[0] : 0;
- for(int i = 0; i < times.length; i++)
+ for (int i = 0; i < times.length; i++)
{
min = Math.min(min, times[i]);
}
@@ -141,7 +144,7 @@
static long max(long[] times)
{
long max = times.length > 0 ? times[0] : 0;
- for(int i = 0; i < times.length; i++)
+ for (int i = 0; i < times.length; i++)
{
max = Math.max(max, times[i]);
}
@@ -151,14 +154,22 @@
static long avg(long[] times, long min, long max)
{
long sum = 0;
- for(int i = 0; i < times.length; i++)
+ for (int i = 0; i < times.length; i++)
{
sum += times[i];
}
- sum -= min;
- sum -= max;
- return (sum / (times.length - 2));
+ int adjustment = 0;
+
+ // Remove min and max if we have run enough batches.
+ if (times.length > 2)
+ {
+ sum -= min;
+ sum -= max;
+ adjustment = 2;
+ }
+
+ return (sum / (times.length - adjustment));
}
public static void main(String[] argv) throws Exception
Added: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j?view=auto&rev=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j (added)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j Fri Jan 26 00:52:20 2007
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.logger.org.apache.qpid.requestreply=${amqj.test.logging.level}
+log4j.logger.org.apache.qpid.pingpong=${amqj.test.logging.level}
+log4j.logger.org.apache.qpid.topic=${amqj.test.logging.level}
+
+
+log4j.logger.uk.co.thebadgerset.junit.extensions=info, console
+log4j.additivity.uk.co.thebadgerset.junit.extensions=false
+log4j.logger.uk.co.thebadgerset.junit.extensions=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+
+#log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
+log4j.appender.console.layout.ConversionPattern=%p [%c] %m%n
+
+log4j.appender.fileApp=org.apache.log4j.FileAppender
+log4j.appender.fileApp.file=${log.dir}/perftests.volumetest.log
+log4j.appender.fileApp.Threshold=info
+log4j.appender.fileApp.append=false
+log4j.appender.fileApp.layout=org.apache.log4j.PatternLayout