You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/02 16:28:13 UTC
svn commit: r502624 [4/5] - in
/incubator/qpid/branches/perftesting/qpid/java: ./
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/protocol/
client/src/main/java/o...
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=502624&r1=502623&r2=502624
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Fri Feb 2 07:28:08 2007
@@ -20,344 +20,382 @@
*/
package org.apache.qpid.requestreply;
+import java.io.IOException;
import java.net.InetAddress;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.*;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.ping.AbstractPingProducer;
-import org.apache.qpid.ping.Throttle;
import org.apache.qpid.topic.Config;
+import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
+import uk.co.thebadgerset.junit.extensions.Throttle;
+
/**
* PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
- * client (see {@link PingPongBouncer} for the bounce back client). It is designed to be run from the command line
- * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session,
- * message producer and message consumer to run the ping-pong cycle on.
- * <p/>
+ * client (see {@link PingPongBouncer} for the bounce back client).
+ *
* <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
* This means that this class has to do some work to correlate pings with pongs; it expectes the original message
- * id in the ping to be bounced back in the correlation id. If a new temporary queue per ping were used, then
- * this correlation would not need to be done.
- * <p/>
+ * correlation id in the ping to be bounced back in the reply correlation id.
+ *
+ * <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor.
+ * It can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings
+ * within transactions; control the number of pings to send in each transaction; limit its sending rate; and perform
+ * failover testing.
+ *
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
* does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
* by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
* also registered to terminate the ping-pong loop cleanly.
- * <p/>
+ *
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Provide a ping and wait for response cycle.
+ * <tr><td> Provide a ping and wait for all responses cycle.
* <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
* </table>
*
- * @todo Make temp queue per ping a command line option.
- * @todo Make the queue name a command line option.
+ * @todo The use of a ping rate {@link #DEFAULT_RATE} and waits between pings {@link #DEFAULT_SLEEP_TIME} are overlapping.
+ * Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to
+ * be created and configured by the test runner from the -f command line option and made available through
+ * the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware
+ * tests.
+ *
+ * @todo Make shared or unique destinations a configurable option, hard coded to false.
+ *
+ * @todo Make acknowledege mode a test option.
+ *
+ * @todo Make the message listener a static for all replies to be sent to. It won't be any more of a bottle neck than
+ * having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process
+ * messages concurrently for different ids. Needs to be static so that when using a chained message listener and
+ * shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to
+ * be picked up by the PPP that it is atteched to.
+ *
+ * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock
+ * pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message
+ * method add a block that obtains the write lock for the very last message, releases any waiting producer. Means
+ * that the last message waits until all other messages have been handled before releasing producers but allows
+ * messages to be processed concurrently, unlike the current synchronized block.
+ *
+ * @todo Set the timeout to be per message correlation id. Restart it every time a message is received (with matching id).
+ * Means that timeout is measuring situations whether a particular ping stream has pasued for too long, rather than
+ * the time to send an entire block of messages. This will be better because the timeout won't need to be adjusted
+ * depending on the total number of messages being sent. Logic to be added to sendAndWait to recheck the timeout
+ * whenever its wait expires.
+ *
+ * @todo Need to multiply up the number of expected messages for pubsub tests as each can be received by many consumers?
*/
-public class PingPongProducer extends AbstractPingProducer implements Runnable, MessageListener, ExceptionListener
+public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
{
private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
- /**
- * Used to set up a default message size.
- */
- protected static final int DEFAULT_MESSAGE_SIZE = 0;
+ /** Holds the name of the property to get the test message size from. */
+ public static final String MESSAGE_SIZE_PROPNAME = "messagesize";
- /**
- * This is set and used when the test is for multiple-destinations
- */
- protected static final int DEFAULT_DESTINATION_COUNT = 0;
+ /** Holds the name of the property to get the ping queue name from. */
+ public static final String PING_QUEUE_NAME_PROPNAME = "destinationname";
- protected static final int DEFAULT_RATE = 0;
+ /** Holds the name of the property to get the test delivery mode from. */
+ public static final String PERSISTENT_MODE_PROPNAME = "persistent";
- /**
- * Used to define how long to wait between pings.
- */
- protected static final long SLEEP_TIME = 250;
+ /** Holds the name of the property to get the test transactional mode from. */
+ public static final String TRANSACTED_PROPNAME = "transacted";
- /**
- * Used to define how long to wait before assuming that a ping has timed out.
- */
- protected static final long TIMEOUT = 9000;
+ /** Holds the name of the property to get the test broker url from. */
+ public static final String BROKER_PROPNAME = "broker";
- /**
- * Holds the name of the destination to send pings on.
- */
- protected static final String PING_DESTINATION_NAME = "ping";
+ /** Holds the name of the property to get the test broker virtual path. */
+ public static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
- /**
- * The batch size.
- */
- protected static final int DEFAULT_BATCH_SIZE = 100;
+ /** Holds the name of the property to get the message rate from. */
+ public static final String RATE_PROPNAME = "rate";
- protected static final int PREFETCH = 100;
- protected static final boolean NO_LOCAL = true;
- protected static final boolean EXCLUSIVE = false;
+ public static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
- /**
- * The number of priming loops to run.
- */
- protected static final int PRIMING_LOOPS = 3;
+ /** Holds the true or false depending on wether it is P2P test or PubSub */
+ public static final String IS_PUBSUB_PROPNAME = "pubsub";
- /**
- * A source for providing sequential unique correlation ids.
- */
+ public static final String FAIL_AFTER_COMMIT_PROPNAME = "FailAfterCommit";
+
+ public static final String FAIL_BEFORE_COMMIT_PROPNAME = "FailBeforeCommit";
+
+ public static final String FAIL_AFTER_SEND_PROPNAME = "FailAfterSend";
+
+ public static final String FAIL_BEFORE_SEND_PROPNAME = "FailBeforeSend";
+
+ public static final String FAIL_ONCE_PROPNAME = "FailOnce";
+
+ public static final String USERNAME_PROPNAME = "username";
+
+ public static final String PASSWORD_PROPNAME = "password";
+
+ public static final String SELECTOR_PROPNAME = "selector";
+
+ public static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount";
+
+ /** Holds the name of the property to get the waiting timeout for response messages. */
+ public static final String TIMEOUT_PROPNAME = "timeout";
+
+ public static final String COMMIT_BATCH_SIZE_PROPNAME = "CommitBatchSize";
+
+ /** Used to set up a default message size. */
+ public static final int DEFAULT_MESSAGE_SIZE = 0;
+
+ /** Holds the name of the default destination to send pings on. */
+ public static final String DEFAULT_PING_DESTINATION_NAME = "ping";
+
+ /** Defines the default number of destinations to ping. */
+ public static final int DEFAULT_DESTINATION_COUNT = 1;
+
+ /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */
+ public static final int DEFAULT_RATE = 0;
+
+ /** Defines the default wait between pings. */
+ public static final long DEFAULT_SLEEP_TIME = 250;
+
+ /** Default time to wait before assuming that a ping has timed out. */
+ public static final long DEFAULT_TIMEOUT = 9000;
+
+ /** Defines the default number of pings to send in each transaction when running transactionally. */
+ public static final int DEFAULT_TX_BATCH_SIZE = 100;
+
+ /** Defines the default prefetch size to use when consuming messages. */
+ public static final int DEFAULT_PREFETCH = 100;
+
+ /** Defines the default value of the no local flag to use when consuming messages. */
+ public static final boolean DEFAULT_NO_LOCAL = false;
+
+ /** Defines the default value of the exclusive flag to use when consuming messages. */
+ public static final boolean DEFAULT_EXCLUSIVE = false;
+
+ /** Holds the message delivery mode to use for the test. */
+ public static final boolean DEFAULT_PERSISTENT_MODE = false;
+
+ /** Holds the transactional mode to use for the test. */
+ public static final boolean DEFAULT_TRANSACTED = false;
+
+ /** Holds the default broker url for the test. */
+ public static final String DEFAULT_BROKER = "tcp://localhost:5672";
+
+ /** Holds the default virtual path for the test. */
+ public static final String DEFAULT_VIRTUAL_PATH = "test";
+
+ /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
+ public static final boolean DEFAULT_PUBSUB = false;
+
+ /** Holds the default broker log on username. */
+ public static final String DEFAULT_USERNAME = "guest";
+
+ /** Holds the default broker log on password. */
+ public static final String DEFAULT_PASSWORD = "guest";
+
+ /** Holds the default message selector. */
+ public static final String DEFAULT_SELECTOR = null;
+
+ /** Holds the default failover after commit test flag. */
+ public static final String DEFAULT_FAIL_AFTER_COMMIT = "false";
+
+ /** Holds the default failover before commit test flag. */
+ public static final String DEFAULT_FAIL_BEFORE_COMMIT = "false";
+
+ /** Holds the default failover after send test flag. */
+ public static final String DEFAULT_FAIL_AFTER_SEND = "false";
+
+ /** Holds the default failover before send test flag. */
+ public static final String DEFAULT_FAIL_BEFORE_SEND = "false";
+
+ /** Holds the default failover only once flag, true means only do one failover, false means failover on every commit cycle. */
+ public static final String DEFAULT_FAIL_ONCE = "true";
+
+ /** Holds the default verbose mode. */
+ public static final boolean DEFAULT_VERBOSE = false;
+
+ /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
+ public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
+
+ /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong idGenerator = new AtomicLong(0L);
/**
- * Holds a map from message ids to latches on which threads wait for replies.
+ * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross
+ * multiple ping producers on the same JVM.
*/
- private static Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
+ private static Map<String, CountDownLatch> trafficLights =
+ Collections.synchronizedMap(new HashMap<String, CountDownLatch>());
- /**
- * Destination where the responses messages will arrive
- */
- private Destination _replyDestination;
+ /** A convenient formatter to use when time stamping output. */
+ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
/**
- * Destination where the producer will be sending message to
+ * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
+ * creating multiple ping producers in the same JVM.
*/
- private Destination _pingDestination;
+ protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
- /**
- * Determines whether this producer sends persistent messages from the run method.
- */
+ /** Destination where the response messages will arrive. */
+ private Destination _replyDestination;
+
+ /** Determines whether this producer sends persistent messages. */
protected boolean _persistent;
- /**
- * Holds the message size to send, from the run method.
- */
+ /** Determines what size of messages this producer sends. */
protected int _messageSize;
- /**
- * Used to indicate that the ping loop should print out whenever it pings.
- */
+ /** Used to indicate that the ping loop should print out whenever it pings. */
protected boolean _verbose = false;
+ /** Holds the session on which ping replies are received. */
protected Session _consumerSession;
- /**
- * Used to restrict the sending rate to a specified limit.
- */
- private Throttle rateLimiter = null;
+ /** Used to restrict the sending rate to a specified limit. */
+ private Throttle _rateLimiter = null;
+
+ /** Holds a message listener that this message listener chains all its messages to. */
+ private ChainedMessageListener _chainedMessageListener = null;
+
+ /** Flag used to indicate if this is a point to point or pub/sub ping client. */
+ protected boolean _isPubSub = false;
/**
- * The throttler can only reliably restrict to a few hundred cycles per second, so a throttling batch size is used
- * to group sends together into batches large enough that the throttler runs slower than that.
+ * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
+ * on the same JVM using this id generator will allow them to ping on the same queues.
*/
- int _throttleBatchSize;
+ protected AtomicInteger _queueSharedId = new AtomicInteger();
- private MessageListener _messageListener = null;
+ /** Used to tell the ping loop when to terminate, it only runs while this is true. */
+ protected boolean _publish = true;
- private PingPongProducer(String brokerDetails, String username, String password, String virtualpath, boolean transacted,
- boolean persistent, int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
- boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize, int rate)
- throws Exception
- {
- // Create a connection to the broker.
- InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
+ /** Holds the connection to the broker. */
+ private Connection _connection;
- setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
+ /** Holds the producer session, needed to create ping messages. */
+ private Session _producerSession;
- // Create transactional or non-transactional sessions, based on the command line arguments.
- setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
- _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ /** Holds the set of destiniations that this ping producer pings. */
+ protected List<Destination> _pingDestinations = new ArrayList<Destination>();
- _persistent = persistent;
- _messageSize = messageSize;
- _verbose = verbose;
+ /** Holds the message producer to send the pings through. */
+ protected MessageProducer _producer;
- // Set failover interrupts
- _failAfterCommit = afterCommit;
- _failBeforeCommit = beforeCommit;
- _failAfterSend = afterSend;
- _failBeforeSend = beforeSend;
- _failOnce = failOnce;
- _txBatchSize = batchSize;
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
+ protected boolean _failBeforeCommit = false;
- // Calculate a throttling batch size and rate such that the throttle runs slower than 100 cycles per second
- // and batched sends within each cycle multiply up to give the desired rate.
- //
- // total rate = throttle rate * batch size.
- // 1 < throttle rate < 100
- // 1 < total rate < 20000
- if (rate > DEFAULT_RATE)
- {
- // Log base 10 over 2 is used here to get a feel for what power of 100 the total rate is.
- // As the total rate goes up the powers of 100 the batch size goes up by powers of 100 to keep the
- // throttle rate back into the range 1 to 100.
- int x = (int) (Math.log10(rate) / 2);
- _throttleBatchSize = (int) Math.pow(100, x);
- int throttleRate = rate / _throttleBatchSize;
-
- _logger.debug("rate = " + rate);
- _logger.debug("x = " + x);
- _logger.debug("_throttleBatchSize = " + _throttleBatchSize);
- _logger.debug("throttleRate = " + throttleRate);
-
- rateLimiter = new Throttle();
- rateLimiter.setRate(throttleRate);
- }
- }
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
+ protected boolean _failAfterCommit = false;
- /**
- * Creates a ping pong producer with the specified connection details and type.
- *
- * @param brokerDetails
- * @param username
- * @param password
- * @param virtualpath
- * @param transacted
- * @throws Exception All allowed to fall through. This is only test code...
- */
- public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
- String destinationName, String selector, boolean transacted, boolean persistent,
- int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
- boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize,
- int noOfDestinations, int rate, boolean pubsub) throws Exception
- {
- this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose, afterCommit,
- beforeCommit, afterSend, beforeSend, failOnce, batchSize, rate);
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
+ protected boolean _failBeforeSend = false;
- _destinationCount = noOfDestinations;
- setPubSub(pubsub);
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
+ protected boolean _failAfterSend = false;
- if (noOfDestinations == DEFAULT_DESTINATION_COUNT)
- {
- if (destinationName != null)
- {
- createPingDestination(destinationName);
- // Create producer and the consumer
- createProducer();
- createConsumer(selector);
- }
- else
- {
- _logger.error("Destination is not specified");
- throw new IllegalArgumentException("Destination is not specified");
- }
- }
- }
+ /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
+ protected boolean _failOnce = true;
- private void createPingDestination(String name)
- {
- if (isPubSub())
- {
- _pingDestination = new AMQTopic(name);
- }
- else
- {
- _pingDestination = new AMQQueue(name);
- }
- }
+ /** Holds the number of sends that should be performed in every transaction when using transactions. */
+ protected int _txBatchSize = 1;
/**
- * Creates the producer to send the pings on. If the tests are with nultiple-destinations, then producer
- * is created with null destination, so that any destination can be specified while sending
+ * Creates a ping producer with the specified parameters, of which there are many. See their individual comments
+ * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on it,
+ * to send and recieve its pings and replies on. The other options are kept, and control how this pinger behaves.
*
- * @throws JMSException
+ * @param brokerDetails The URL of the broker to send pings to.
+ * @param username The username to log onto the broker with.
+ * @param password The password to log onto the broker with.
+ * @param virtualpath The virtual host name to use on the broker.
+ * @param destinationName The name (or root where multiple destinations are used) of the desitination to send
+ * pings to.
+ * @param selector The selector to filter replies with.
+ * @param transacted Indicates whether or not pings are sent and received in transactions.
+ * @param persistent Indicates whether pings are sent using peristent delivery.
+ * @param messageSize Specifies the size of ping messages to send.
+ * @param verbose Indicates that information should be printed to the console on every ping.
+ * @param afterCommit Indicates that the user should be promted to terminate a broker after commits to test failover.
+ * @param beforeCommit Indicates that the user should be promted to terminate a broker before commits to test failover.
+ * @param afterSend Indicates that the user should be promted to terminate a broker after sends to test failover.
+ * @param beforeSend Indicates that the user should be promted to terminate a broker before sends to test failover.
+ * @param failOnce Indicates that the failover testing behaviour should only happen on the first commit, not all.
+ * @param txBatchSize Specifies the number of pings to send in each transaction.
+ * @param noOfDestinations The number of destinations to ping. Must be 1 or more.
+ * @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as
+ * possible, with no rate restriction.
+ * @param pubsub
+ *
+ * @throws Exception Any exceptions are allowed to fall through.
*/
- public void createProducer() throws JMSException
+ public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
+ String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
+ boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
+ boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
+ boolean pubsub) throws Exception
{
- if (getDestinationsCount() > DEFAULT_DESTINATION_COUNT)
+ // Check that one or more destinations were specified.
+ if (noOfDestinations < 1)
{
- // create producer with initial destination as null for test with multiple-destinations
- // In this case, a different destination will be used while sending the message
- _producer = (MessageProducer) getProducerSession().createProducer(null);
+ throw new IllegalArgumentException("There must be at least one destination.");
}
- else
- {
- // Create a producer with known destination to send the pings on.
- _producer = (MessageProducer) getProducerSession().createProducer(_pingDestination);
- }
-
- _producer.setDisableMessageTimestamp(true);
- _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- }
+ // Create a connection to the broker.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
- /**
- * Creates the temporary destination to listen to the responses
- *
- * @param selector
- * @throws JMSException
- */
- public void createConsumer(String selector) throws JMSException
- {
- // Create a temporary destination to get the pongs on.
- if (isPubSub())
- {
- _replyDestination = _consumerSession.createTemporaryTopic();
- }
- else
- {
- _replyDestination = _consumerSession.createTemporaryQueue();
- }
+ _connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath);
- // Create a message consumer to get the replies with and register this to be called back by it.
- MessageConsumer consumer = _consumerSession.createConsumer(_replyDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
- consumer.setMessageListener(this);
- }
+ // Create transactional or non-transactional sessions, based on the command line arguments.
+ _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- /**
- * Creates consumer instances for each destination. This is used when test is being done with multiple destinations.
- *
- * @param selector
- * @throws JMSException
- */
- public void createConsumers(String selector) throws JMSException
- {
- for (int i = 0; i < getDestinationsCount(); i++)
+ // Set up a throttle to control the send rate, if a rate > 0 is specified.
+ if (rate > 0)
{
- MessageConsumer consumer =
- getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
- consumer.setMessageListener(this);
+ _rateLimiter = new BatchedThrottle();
+ _rateLimiter.setRate(rate);
}
- }
-
- public Session getConsumerSession()
- {
- return _consumerSession;
- }
+ // Create the temporary queue for replies.
+ _replyDestination = _consumerSession.createTemporaryQueue();
- public Destination getPingDestination()
- {
- return _pingDestination;
- }
+ // Create the producer and the consumers for all reply destinations.
+ createProducer();
+ createPingDestinations(noOfDestinations, selector, destinationName, true);
+ createReplyConsumers(getReplyDestinations(), selector);
- protected void setPingDestination(Destination destination)
- {
- _pingDestination = destination;
+ // Keep all the remaining options.
+ _persistent = persistent;
+ _messageSize = messageSize;
+ _verbose = verbose;
+ _failAfterCommit = afterCommit;
+ _failBeforeCommit = beforeCommit;
+ _failAfterSend = afterSend;
+ _failBeforeSend = beforeSend;
+ _failOnce = failOnce;
+ _txBatchSize = txBatchSize;
+ _isPubSub = pubsub;
}
/**
- * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs
+ * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
* to be started to bounce the pings back again.
- * <p/>
- * <p/>The command line takes from 2 to 4 arguments:
- * <p/><table>
- * <tr><td>brokerDetails <td> The broker connection string.
- * <tr><td>virtualPath <td> The virtual path.
- * <tr><td>transacted <td> A boolean flag, telling this client whether or not to use transactions.
- * <tr><td>size <td> The size of ping messages to use, in bytes.
- * </table>
*
- * @param args The command line arguments as defined above.
+ * @param args The command line arguments.
*/
public static void main(String[] args) throws Exception
{
@@ -372,22 +410,22 @@
}
String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = "/test";
- String selector = config.getSelector();
+ String virtualpath = "test";
+ String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
boolean verbose = true;
boolean transacted = config.isTransacted();
boolean persistent = config.usePersistentMessages();
int messageSize = (config.getPayload() != 0) ? config.getPayload() : DEFAULT_MESSAGE_SIZE;
//int messageCount = config.getMessages();
int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DEFAULT_DESTINATION_COUNT;
- int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_BATCH_SIZE;
+ int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_TX_BATCH_SIZE;
int rate = (config.getRate() != 0) ? config.getRate() : DEFAULT_RATE;
boolean pubsub = config.isPubSub();
String destName = config.getDestination();
if (destName == null)
{
- destName = PING_DESTINATION_NAME;
+ destName = DEFAULT_PING_DESTINATION_NAME;
}
boolean afterCommit = false;
@@ -429,15 +467,13 @@
}
// Create a ping producer to handle the request/wait/reply cycle.
- PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath,
- destName, selector, transacted, persistent, messageSize, verbose,
- afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
- destCount, rate, pubsub);
+ PingPongProducer pingProducer =
+ new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
+ transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
+ beforeSend, failOnce, batchSize, destCount, rate, pubsub);
pingProducer.getConnection().start();
- // Run a few priming pings to remove warm up time from test results.
- //pingProducer.prime(PRIMING_LOOPS);
// Create a shutdown hook to terminate the ping-pong producer.
Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
@@ -450,50 +486,105 @@
pingThread.join();
}
- private static void usage()
+ /**
+ * Convenience method for a short pause.
+ *
+ * @param sleepTime The time in milliseconds to pause for.
+ */
+ public static void pause(long sleepTime)
+ {
+ if (sleepTime > 0)
+ {
+ try
+ {
+ Thread.sleep(sleepTime);
+ }
+ catch (InterruptedException ie)
+ { }
+ }
+ }
+
+ /**
+ * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply
+ * to destination of this pinger.
+ *
+ * @return The single reply to destination of this pinger, wrapped in a list.
+ */
+ public List<Destination> getReplyDestinations()
+ {
+ _logger.debug("public List<Destination> getReplyDestinations(): called");
+
+ List<Destination> replyDestinations = new ArrayList<Destination>();
+ replyDestinations.add(_replyDestination);
+
+ return replyDestinations;
+ }
+
+ /**
+ * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery
+ * flag is set accoring the ping producer creation options.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
+ public void createProducer() throws JMSException
{
- System.err.println("Usage: TestPingPublisher \n" + "-host : broker host" + "-port : broker port" +
- "-destinationname : queue/topic name\n" +
- "-transacted : (true/false). Default is false\n" +
- "-persistent : (true/false). Default is false\n" +
- "-pubsub : (true/false). Default is false\n" +
- "-selector : selector string\n" +
- "-payload : paylaod size. Default is 0\n" +
- //"-messages : no of messages to be sent (if 0, the ping loop will run indefinitely)\n" +
- "-destinationscount : no of destinations for multi-destinations test\n" +
- "-batchsize : batch size\n" +
- "-rate : thruput rate\n");
+ _logger.debug("public void createProducer(): called");
+
+ _producer = (MessageProducer) _producerSession.createProducer(null);
+ //_producer.setDisableMessageTimestamp(true);
+ _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
}
/**
- * Primes the test loop by sending a few messages, then introduces a short wait. This allows the bounce back client
- * on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling
- * this a few times, in order to prime the JVMs JIT compilation.
+ * Creates consumers for the specified number of destinations. The destinations themselves are also created by
+ * this method.
+ *
+ * @param noOfDestinations The number of destinations to create consumers for.
+ * @param selector The message selector to filter the consumers with.
+ * @param rootName The root of the name, or actual name if only one is being created.
+ * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share
+ * the numbering with all pingers on the same JVM.
*
- * @param x The number of priming loops to run.
- * @throws JMSException All underlying exceptions are allowed to fall through.
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
*/
- public void prime(int x) throws JMSException
+ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
+ throws JMSException
{
- for (int i = 0; i < x; i++)
+ _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations
+ + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = "
+ + unique + "): called");
+
+ // Create the desired number of ping destinations and consumers for them.
+ for (int i = 0; i < noOfDestinations; i++)
{
- // Create and send a small message.
- Message first = getTestMessage(_replyDestination, 0, false);
- sendMessage(first);
+ AMQDestination destination = null;
- commitTx();
+ int id;
- try
+ // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
+ if (unique)
{
- Thread.sleep(100);
+ id = _queueJVMSequenceID.incrementAndGet();
}
- catch (InterruptedException ignore)
+ else
{
-
+ id = _queueSharedId.incrementAndGet();
}
- }
+ // Check if this is a pub/sub pinger, in which case create topics.
+ if (_isPubSub)
+ {
+ destination = new AMQTopic(rootName + id);
+ }
+ // Otherwise this is a p2p pinger, in which case create queues.
+ else
+ {
+ destination = new AMQQueue(rootName + id);
+ }
+ // Keep the destination.
+ _pingDestinations.add(destination);
+ }
}
/**
@@ -505,60 +596,72 @@
*/
public void onMessage(Message message)
{
+ _logger.debug("public void onMessage(Message message): called");
try
{
-
- // Store the reply, if it has a correlation id that is expected.
+ // Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
+ _logger.debug("correlationID = " + correlationID);
- if (_verbose)
- {
- _logger.info(timestampFormatter.format(new Date()) + ": Got reply with correlation id, " + correlationID);
- //_logger.debug("Received from : " + message.getJMSDestination());
- }
-
- // Turn the traffic light to green.
+ // Countdown on the traffic light if there is one for the matching correlation id.
CountDownLatch trafficLight = trafficLights.get(correlationID);
if (trafficLight != null)
{
- if (_messageListener != null)
- {
- synchronized (trafficLight)
- {
- _messageListener.onMessage(message);
- trafficLight.countDown();
- }
- }
- else
+ _logger.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+
+ // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
+ // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
+ // ensures that each thread will get a unique value for the remaining messages.
+ long trueCount = -1;
+ long remainingCount = -1;
+
+ synchronized (trafficLight)
{
trafficLight.countDown();
- }
- _logger.trace("Reply was expected, decrementing the latch for the id.");
+ trueCount = trafficLight.getCount();
+ remainingCount = trueCount - 1;
- long remainingCount = trafficLight.getCount();
+ _logger.debug("remainingCount = " + remainingCount);
+ _logger.debug("trueCount = " + trueCount);
- if ((remainingCount % _txBatchSize) == 0)
- {
- commitTx(getConsumerSession());
- }
+ // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
+ // blocked, even on the last message.
+ if ((remainingCount % _txBatchSize) == 0)
+ {
+ commitTx(_consumerSession);
+ }
+
+ // Forward the message and remaining count to any interested chained message listener.
+ if (_chainedMessageListener != null)
+ {
+ _chainedMessageListener.onMessage(message, (int) remainingCount);
+ }
+ // Check if this is the last message, in which case release any waiting producers. This is done
+ // after the transaction has been committed and any listeners notified.
+ if (trueCount == 1)
+ {
+ trafficLight.countDown();
+ }
+ }
}
else
{
- _logger.trace("There was no thread waiting for reply: " + correlationID);
+ _logger.debug("There was no thread waiting for reply: " + correlationID);
}
+ // Print out ping times for every message in verbose mode only.
if (_verbose)
{
- Long timestamp = message.getLongProperty("timestamp");
+ Long timestamp = message.getLongProperty(MESSAGE_TIMESTAMP_PROPNAME);
if (timestamp != null)
{
- long diff = System.currentTimeMillis() - timestamp;
- _logger.trace("Time for round trip: " + diff);
+ long diff = System.nanoTime() - timestamp;
+ _logger.trace("Time for round trip (nanos): " + diff);
}
}
}
@@ -566,32 +669,70 @@
{
_logger.warn("There was a JMSException: " + e.getMessage(), e);
}
+
+ _logger.debug("public void onMessage(Message message): ending");
}
/**
* Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
- * before a reply arrives, then a null reply is returned from this method.
+ * before a reply arrives, then a null reply is returned from this method. This method generates a new unqiue
+ * correlation id for the messages.
*
* @param message The message to send.
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
+ *
* @return The number of replies received. This may be less than the number sent if the timeout terminated the
* wait for all prematurely.
+ *
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
{
- String messageCorrelationId = null;
+ _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ + timeout + "): called");
+
+ // Create a unique correlation id to put on the messages before sending them.
+ String messageCorrelationId = Long.toString(idGenerator.incrementAndGet());
+
+ return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
+ }
+
+ /**
+ * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
+ * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
+ * the correlation id.
+ *
+ * @param message The message to send.
+ * @param numPings The number of ping messages to send.
+ * @param timeout The timeout in milliseconds.
+ * @param messageCorrelationId The message correlation id.
+ *
+ * @return The number of replies received. This may be less than the number sent if the timeout terminated the
+ * wait for all prematurely.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
+ throws JMSException, InterruptedException
+ {
+ _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
try
{
- // Put a unique correlation id on the message before sending it.
- messageCorrelationId = Long.toString(getNewID());
+ // Create a count down latch to count the number of replies with. This is created before the messages are
+ // sent so that the replies cannot be received before the count down is created.
+ // One is added to this, so that the last reply becomes a special case. The special case is that the
+ // chained message listener must be called before this sender can be unblocked, but that decrementing the
+ // countdown needs to be done before the chained listener can be called.
+ CountDownLatch trafficLight = new CountDownLatch(numPings + 1);
+ trafficLights.put(messageCorrelationId, trafficLight);
+ // Send the specifed number of messages.
pingNoWaitForReply(message, numPings, messageCorrelationId);
- CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
- // Block the current thread until a reply to the message is received, or it times out.
+ // Block the current thread until replies to all the message are received, or it times out.
trafficLight.await(timeout, TimeUnit.MILLISECONDS);
// Work out how many replies were receieved.
@@ -606,45 +747,37 @@
_logger.info("Got all replies on id, " + messageCorrelationId);
}
- commitTx(getConsumerSession());
+ commitTx(_consumerSession);
+
+ _logger.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
return numReplies;
}
+ // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived,
+ // so will be a memory leak if this is not done.
finally
{
- removeLock(messageCorrelationId);
+ trafficLights.remove(messageCorrelationId);
}
}
- public long getNewID()
- {
- return idGenerator.incrementAndGet();
- }
-
- public CountDownLatch removeLock(String correlationID)
- {
- return trafficLights.remove(correlationID);
- }
-
-
- /*
- * Sends the specified ping message but does not wait for a correlating reply.
- *
- * @param message The message to send.
- * @param numPings The number of pings to send.
- * @return The reply, or null if no reply arrives before the timeout.
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- */
- public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException
+ /**
+ * Sends the specified number of ping messages and does not wait for correlating replies.
+ *
+ * @param message The message to send.
+ * @param numPings The number of pings to send.
+ * @param messageCorrelationId A correlation id to place on all messages sent.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
{
- // Create a count down latch to count the number of replies with. This is created before the message is sent
- // so that the message is not received before the count down is created.
- CountDownLatch trafficLight = new CountDownLatch(numPings);
- trafficLights.put(messageCorrelationId, trafficLight);
+ _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ + ", String messageCorrelationId = " + messageCorrelationId + "): called");
message.setJMSCorrelationID(messageCorrelationId);
- // Set up a committed flag to detect uncommitted message at the end of the send loop. This may occurr if the
+ // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
// transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
// needed.
boolean committed = false;
@@ -652,55 +785,46 @@
// Send all of the ping messages.
for (int i = 0; i < numPings; i++)
{
- // Reset the committed flag to indicate that there are uncommitted message.
+ // Reset the committed flag to indicate that there are uncommitted messages.
committed = false;
// Re-timestamp the message.
- message.setLongProperty("timestamp", System.currentTimeMillis());
+ message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
- // Check if the test is with multiple-destinations, in which case round robin the destinations
- // as the messages are sent.
- if (getDestinationsCount() > DEFAULT_DESTINATION_COUNT)
- {
- sendMessage(getDestination(i % getDestinationsCount()), message);
- }
- else
- {
- sendMessage(message);
- }
+ // Round robin the destinations as the messages are sent.
+ //return _destinationCount;
+ sendMessage(_pingDestinations.get(i % _pingDestinations.size()), message);
- // Apply message rate throttling if a rate limit has been set up and the throttling batch limit has been
- // reached. See the comment on the throttle batch size for information about the use of batches here.
- if ((rateLimiter != null) && ((i % _throttleBatchSize) == 0))
+ // Apply message rate throttling if a rate limit has been set up.
+ if (_rateLimiter != null)
{
- rateLimiter.throttle();
+ _rateLimiter.throttle();
}
// Call commit every time the commit batch size is reached.
if ((i % _txBatchSize) == 0)
{
- commitTx();
+ commitTx(_producerSession);
committed = true;
}
+
+ // Spew out per message timings on every message sonly in verbose mode.
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, "
+ + messageCorrelationId);
+ }
}
// Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
if (!committed)
{
- commitTx();
- }
-
- // Spew out per message timings only in verbose mode.
- if (_verbose)
- {
- _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
+ commitTx(_producerSession);
}
-
}
/**
- * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and
- * waits for replies and inserts short pauses in between each.
+ * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
*/
public void pingLoop()
{
@@ -708,13 +832,13 @@
{
// Generate a sample message and time stamp it.
ObjectMessage msg = getTestMessage(_replyDestination, _messageSize, _persistent);
- msg.setLongProperty("timestamp", System.currentTimeMillis());
+ msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
// Send the message and wait for a reply.
- pingAndWaitForReply(msg, DEFAULT_BATCH_SIZE, TIMEOUT);
+ pingAndWaitForReply(msg, DEFAULT_TX_BATCH_SIZE, DEFAULT_TIMEOUT);
// Introduce a short pause if desired.
- pause(SLEEP_TIME);
+ pause(DEFAULT_SLEEP_TIME);
}
catch (JMSException e)
{
@@ -728,79 +852,300 @@
}
}
- public Destination getReplyDestination()
+ /*public Destination getReplyDestination()
{
return _replyDestination;
+ }*/
+
+ /**
+ * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
+ * here.
+ *
+ * @param messageListener The chained message listener.
+ */
+ public void setChainedMessageListener(ChainedMessageListener messageListener)
+ {
+ _chainedMessageListener = messageListener;
}
- protected void setReplyDestination(Destination destination)
+ /**
+ * Removes any chained message listeners from this pinger.
+ */
+ public void removeChainedMessageListener()
{
- _replyDestination = destination;
+ _chainedMessageListener = null;
}
- public void setMessageListener(MessageListener messageListener)
+ /**
+ * Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
+ *
+ * @param replyQueue The reply-to destination for the message.
+ * @param messageSize The desired size of the message in bytes.
+ * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
+ *
+ * @return A freshly generated test message.
+ *
+ * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
+ */
+ public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
{
- _messageListener = messageListener;
+ ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
+
+ // Timestamp the message in nanoseconds.
+ msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+
+ return msg;
}
- public CountDownLatch getEndLock(String correlationID)
+ /**
+ * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
+ * flag has been cleared.
+ */
+ public void stop()
{
- return trafficLights.get(correlationID);
+ _publish = false;
}
- /*
- * When the test is being performed with multiple queues, then this method will be used, which has a loop to
- * pick up the next queue from the queues list and sends message to it.
- *
- * @param message
- * @param numPings
- * @throws JMSException
- */
- /*private void pingMultipleQueues(Message message, int numPings) throws JMSException
+ /**
+ * Implements a ping loop that repeatedly pings until the publish flag becomes false.
+ */
+ public void run()
{
- int queueIndex = 0;
- for (int i = 0; i < numPings; i++)
+ // Keep running until the publish flag is cleared.
+ while (_publish)
{
- // Re-timestamp the message.
- message.setLongProperty("timestamp", System.currentTimeMillis());
+ pingLoop();
+ }
+ }
- sendMessage(getDestination(queueIndex++), message);
+ /**
+ * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
+ * connection, this clears the publish flag which in turn will halt the ping loop.
+ *
+ * @param e The exception that triggered this callback method.
+ */
+ public void onException(JMSException e)
+ {
+ _publish = false;
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
- // reset the counter to get the first queue
- if (queueIndex == (getDestinationsCount() - 1))
+ /**
+ * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
+ * with the runtime system as a shutdown hook.
+ *
+ * @return A shutdown hook for the ping loop.
+ */
+ public Thread getShutdownHook()
+ {
+ return new Thread(new Runnable()
{
- queueIndex = 0;
- }
+ public void run()
+ {
+ stop();
+ }
+ });
+ }
+
+ /**
+ * Gets the underlying connection that this ping client is running on.
+ *
+ * @return The underlying connection that this ping client is running on.
+ */
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+
+ /**
+ * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
+ *
+ * @param destinations The destinations to listen to.
+ * @param selector A selector to filter the messages with.
+ *
+ * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
+ */
+ public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
+ {
+ _logger.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ + ", String selector = " + selector + "): called");
+
+ for (Destination destination : destinations)
+ {
+ // Create a consumer for the destination and set this pinger to listen to its messages.
+ MessageConsumer consumer =
+ _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
+ selector);
+ consumer.setMessageListener(this);
}
- }*/
+ }
/**
- * A connection listener that logs out any failover complete events. Could do more interesting things with this
- * at some point...
+ * Closes the pingers connection.
+ *
+ * @throws JMSException All JMSException are allowed to fall through.
*/
- public static class FailoverNotifier implements ConnectionListener
+ public void close() throws JMSException
{
- public void bytesSent(long count)
+ _logger.debug("public void close(): called");
+
+ if (_connection != null)
{
+ _connection.close();
}
+ }
+
+ /**
+ * Convenience method to commit the transaction on the specified session. If the session to commit on is not
+ * a transactional session, this method does nothing (unless the failover after send flag is set).
+ *
+ * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit
+ * is applied. This flag applies whether the pinger is transactional or not.
+ *
+ * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
+ * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
+ * after the commit is applied. These flags will only apply if using a transactional pinger.
+ *
+ * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ *
+ * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
+ * method, because commits only apply to transactional pingers, but fail after send applied to transactional
+ * and non-transactional alike.
+ */
+ protected void commitTx(Session session) throws JMSException
+ {
+ _logger.debug("protected void commitTx(Session session): called");
- public void bytesReceived(long count)
+ _logger.trace("Batch time reached");
+ if (_failAfterSend)
{
+ _logger.trace("Batch size reached");
+ if (_failOnce)
+ {
+ _failAfterSend = false;
+ }
+
+ _logger.trace("Failing After Send");
+ doFailover();
}
- public boolean preFailover(boolean redirect)
+ if (session.getTransacted())
{
- return true; //Allow failover
+ try
+ {
+ if (_failBeforeCommit)
+ {
+ if (_failOnce)
+ {
+ _failBeforeCommit = false;
+ }
+
+ _logger.trace("Failing Before Commit");
+ doFailover();
+ }
+
+ session.commit();
+
+ if (_failAfterCommit)
+ {
+ if (_failOnce)
+ {
+ _failAfterCommit = false;
+ }
+
+ _logger.trace("Failing After Commit");
+ doFailover();
+ }
+
+ _logger.trace("Session Commited.");
+ }
+ catch (JMSException e)
+ {
+ _logger.trace("JMSException on commit:" + e.getMessage(), e);
+
+ // Warn that the bounce back client is not available.
+ if (e.getLinkedException() instanceof AMQNoConsumersException)
+ {
+ _logger.debug("No consumers on queue.");
+ }
+
+ try
+ {
+ session.rollback();
+ _logger.trace("Message rolled back.");
+ }
+ catch (JMSException jmse)
+ {
+ _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+
+ // Both commit and rollback failed. Throw the rollback exception.
+ throw jmse;
+ }
+ }
+ }
+ }
+
+ /**
+ * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination
+ * of the ping producer. If an explicit destination is set, this overrides the default.
+ *
+ * @param destination The destination to send to.
+ * @param message The message to send.
+ *
+ * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ protected void sendMessage(Destination destination, Message message) throws JMSException
+ {
+ if (_failBeforeSend)
+ {
+ if (_failOnce)
+ {
+ _failBeforeSend = false;
+ }
+
+ _logger.trace("Failing Before Send");
+ doFailover();
}
- public boolean preResubscribe()
+ if (destination == null)
+ {
+ _producer.send(message);
+ }
+ else
{
- return true; // Allow resubscription
+ _producer.send(destination, message);
}
+ }
- public void failoverComplete()
+ /**
+ * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
+ * until the user supplied some input on the terminal.
+ */
+ protected void doFailover()
+ {
+ System.out.println("Kill Broker now then press return");
+ try
{
- _logger.info("App got failover complete callback.");
+ System.in.read();
}
+ catch (IOException e)
+ { }
+
+ System.out.println("Continuing.");
+ }
+
+ /**
+ * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's
+ * {@link PingPongProducer#onMessage} method is called, the chained listener set through the
+ * {@link PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected
+ * count of messages with that correlation id.
+ *
+ * Provided only one pinger is producing messages with that correlation id, the chained listener will always be
+ * given unique message counts. It will always be called while the producer waiting for all messages to arrive is
+ * still blocked.
+ */
+ public static interface ChainedMessageListener
+ {
+ public void onMessage(Message message, int remainingCount) throws JMSException;
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j?view=diff&rev=502624&r1=502623&r2=502624
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j Fri Feb 2 07:28:08 2007
@@ -24,19 +24,19 @@
log4j.logger.org.apache.qpid.requestreply=${amqj.test.logging.level}
log4j.logger.org.apache.qpid.pingpong=${amqj.test.logging.level}
+log4j.logger.org.apache.qpid.ping=${amqj.test.logging.level}
log4j.logger.org.apache.qpid.topic=${amqj.test.logging.level}
-log4j.logger.uk.co.thebadgerset.junit.extensions=info, console
+log4j.logger.uk.co.thebadgerset.junit.extensions=${badger.level}, console
log4j.additivity.uk.co.thebadgerset.junit.extensions=false
-log4j.logger.uk.co.thebadgerset.junit.extensions=info
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=all
log4j.appender.console.layout=org.apache.log4j.PatternLayout
-#log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
-log4j.appender.console.layout.ConversionPattern=%p [%c] %m%n
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
+#log4j.appender.console.layout.ConversionPattern=%t %p [%c] %m%n
log4j.appender.fileApp=org.apache.log4j.FileAppender
log4j.appender.fileApp.file=${log.dir}/perftests.volumetest.log