You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/08/23 14:10:43 UTC
svn commit: r568952 [2/2] - in /incubator/qpid/trunk/qpid/java/perftests/src:
main/java/org/apache/qpid/client/message/ main/java/org/apache/qpid/ping/
main/java/org/apache/qpid/requestreply/ test/
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=568952&r1=568951&r2=568952&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Thu Aug 23 05:10:42 2007
@@ -20,21 +20,8 @@
*/
package org.apache.qpid.requestreply;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.*;
-
import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.*;
@@ -51,6 +38,18 @@
import uk.co.thebadgerset.junit.extensions.Throttle;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import javax.jms.*;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may
* either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens
@@ -86,10 +85,11 @@
* <tr><td> username <td> guest <td> The username to access the broker with.
* <tr><td> password <td> guest <td> The password to access the broker with.
* <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with.
- * <tr><td> destinationCount <td> 1 <td> The number of receivers listening to the pings.
+ * <tr><td> destinationCount <td> 1 <td> The number of destinations to send pings to.
+ * <tr><td> numConsumers <td> 1 <td> The number of consumers on each destination.
* <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
* <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
- * <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> uniqueDests <td> true <td> Whether each receivers only listens to one ping destination or all.
* <tr><td> durableDests <td> false <td> Whether or not durable destinations are used.
* <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
* 0 - SESSION_TRANSACTED
@@ -98,6 +98,10 @@
* 3 - DUPS_OK_ACKNOWLEDGE
* 257 - NO_ACKNOWLEDGE
* 258 - PRE_ACKNOWLEDGE
+ * <tr><td> consTransacted <td> false <td> Whether or not consumers use transactions. Defaults to the same value
+ * as the 'transacted' option if not seperately defined.
+ * <tr><td> consAckMode <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same
+ * value as 'ackMode' if not seperately defined.
* <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received.
* Limits the volume of messages currently buffered on the client
* or broker. Can help scale test clients by limiting amount of buffered
@@ -131,8 +135,9 @@
* Instead make mina use a bounded blocking buffer, or other form of back pressure, to stop data being written
* faster than it can be sent.
*/
-public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
+public class PingPongProducer implements Runnable /*, MessageListener*/, ExceptionListener
{
+ /** Used for debugging. */
private static final Logger log = Logger.getLogger(PingPongProducer.class);
/** Holds the name of the property to get the test message size from. */
@@ -159,6 +164,9 @@
/** Holds the transactional mode to use for the test. */
public static final boolean TRANSACTED_DEFAULT = false;
+ public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";
+ public static final boolean CONSUMER_TRANSACTED_DEFAULT = false;
+
/** Holds the name of the property to get the test broker url from. */
public static final String BROKER_PROPNAME = "broker";
@@ -237,12 +245,18 @@
/** Holds the default message selector. */
public static final String SELECTOR_DEFAULT = "";
- /** Holds the name of the proeprty to get the destination count from. */
+ /** Holds the name of the property to get the destination count from. */
public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
/** Defines the default number of destinations to ping. */
public static final int DESTINATION_COUNT_DEFAULT = 1;
+ /** Holds the name of the property to get the number of consumers per destination from. */
+ public static final String NUM_CONSUMERS_PROPNAME = "numConsumers";
+
+ /** Defines the default number consumers per destination. */
+ public static final int NUM_CONSUMERS_DEFAULT = 1;
+
/** Holds the name of the property to get the waiting timeout for response messages. */
public static final String TIMEOUT_PROPNAME = "timeout";
@@ -270,6 +284,9 @@
/** Defines the default message acknowledgement mode. */
public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";
+ public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+
public static final String MAX_PENDING_PROPNAME = "maxPending";
public static final int MAX_PENDING_DEFAULT = 0;
@@ -297,8 +314,10 @@
defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
+ defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT);
defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT);
defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
@@ -311,6 +330,7 @@
defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT);
defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
+ defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT);
defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
@@ -323,12 +343,15 @@
protected String _destinationName;
protected String _selector;
protected boolean _transacted;
+ protected boolean _consTransacted;
/** Determines whether this producer sends persistent messages. */
protected boolean _persistent;
/** Holds the acknowledgement mode used for sending and receiving messages. */
- private int _ackMode;
+ protected int _ackMode;
+
+ protected int _consAckMode;
/** Determines what size of messages this producer sends. */
protected int _messageSize;
@@ -363,7 +386,13 @@
/** Holds the number of sends that should be performed in every transaction when using transactions. */
protected int _txBatchSize;
+ /** Holds the number of destinations to ping. */
protected int _noOfDestinations;
+
+ /** Holds the number of consumers per destination. */
+ protected int _noOfConsumers;
+
+ /** Holds the maximum send rate in herz. */
protected int _rate;
/**
@@ -373,7 +402,7 @@
protected int _maxPendingSize;
/**
- * Holds a monitor which is used to synchronize sender and receiver threads, where the sender has elected
+ * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
* to wait until the number of unreceived message is reduced before continuing to send.
*/
protected Object _sendPauseMonitor = new Object();
@@ -397,10 +426,13 @@
/** Holds the connection to the broker. */
protected Connection _connection;
- /** Holds the session on which ping replies are received. */
- protected Session _consumerSession;
+ /** Holds the consumer connections. */
+ protected Connection[] _consumerConnection;
- /** Holds the producer session, needed to create ping messages. */
+ /** Holds the controlSession on which ping replies are received. */
+ protected Session[] _consumerSession;
+
+ /** Holds the producer controlSession, needed to create ping messages. */
protected Session _producerSession;
/** Holds the destination where the response messages will arrive. */
@@ -434,18 +466,15 @@
protected MessageProducer _producer;
/** Holds the message consumer to receive the ping replies through. */
- protected MessageConsumer _consumer;
-
- /**
- * Holds the number of consumers that will be attached to each topic. Each pings will result in a reply from each of the
- * attached clients
- */
- static int _consumersPerTopic = 1;
+ protected MessageConsumer[] _consumer;
/** The prompt to display when asking the user to kill the broker for failover testing. */
private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
private String _clientID;
+ /** Keeps count of the total messages sent purely for debugging purposes. */
+ private static AtomicInteger numSent = new AtomicInteger();
+
/**
* Creates a ping producer with the specified parameters, of which there are many. See the class level comments
* for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
@@ -457,7 +486,7 @@
*/
public PingPongProducer(Properties overrides) throws Exception
{
- log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
+ // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
// Create a set of parsed properties from the defaults overriden by the passed in values.
ParsedProperties properties = new ParsedProperties(defaults);
@@ -471,6 +500,7 @@
_destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
_selector = properties.getProperty(SELECTOR_PROPNAME);
_transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+ _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME);
_persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
_messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
_verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
@@ -481,11 +511,13 @@
_failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME);
_txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME);
_noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME);
+ _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME);
_rate = properties.getPropertyAsInteger(RATE_PROPNAME);
_isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
_isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
_isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
_ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ _consAckMode = properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
_maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
// Check that one or more destinations were specified.
@@ -516,7 +548,7 @@
*/
public void establishConnection(boolean producer, boolean consumer) throws Exception
{
- log.debug("public void establishConnection(): called");
+ // log.debug("public void establishConnection(): called");
// Generate a unique identifying name for this client, based on it ip address and the current time.
InetAddress address = InetAddress.getLocalHost();
@@ -526,11 +558,17 @@
createConnection(_clientID);
// Create transactional or non-transactional sessions, based on the command line arguments.
- _producerSession = (Session) getConnection().createSession(_transacted, _ackMode);
- _consumerSession = (Session) getConnection().createSession(_transacted, _ackMode);
+ _producerSession = (Session) _connection.createSession(_transacted, _ackMode);
+
+ _consumerSession = new Session[_noOfConsumers];
+
+ for (int i = 0; i < _noOfConsumers; i++)
+ {
+ _consumerSession[i] = (Session) _consumerConnection[i].createSession(_consTransacted, _consAckMode);
+ }
// Create the destinations to send pings to and receive replies from.
- _replyDestination = _consumerSession.createTemporaryQueue();
+ _replyDestination = _consumerSession[0].createTemporaryQueue();
createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
// Create the message producer only if instructed to.
@@ -557,7 +595,20 @@
*/
protected void createConnection(String clientID) throws AMQException, URLSyntaxException
{
+ // log.debug("protected void createConnection(String clientID = " + clientID + "): called");
+
+ // log.debug("Creating a connection for the message producer.");
+
_connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath);
+
+ // log.debug("Creating " + _noOfConsumers + " connections for the consumers.");
+
+ _consumerConnection = new Connection[_noOfConsumers];
+
+ for (int i = 0; i < _noOfConsumers; i++)
+ {
+ _consumerConnection[i] = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath);
+ }
}
/**
@@ -570,20 +621,21 @@
{
try
{
- Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
+ Properties options =
+ CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
// Create a ping producer overriding its defaults with all options passed on the command line.
PingPongProducer pingProducer = new PingPongProducer(options);
pingProducer.establishConnection(true, true);
// Start the ping producers dispatch thread running.
- pingProducer.getConnection().start();
+ pingProducer._connection.start();
// Create a shutdown hook to terminate the ping-pong producer.
Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
// Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- pingProducer.getConnection().setExceptionListener(pingProducer);
+ pingProducer._connection.setExceptionListener(pingProducer);
// Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
Thread pingThread = new Thread(pingProducer);
@@ -624,12 +676,12 @@
*/
public List<Destination> getReplyDestinations()
{
- log.debug("public List<Destination> getReplyDestinations(): called");
+ // log.debug("public List<Destination> getReplyDestinations(): called");
List<Destination> replyDestinations = new ArrayList<Destination>();
replyDestinations.add(_replyDestination);
- log.debug("replyDestinations = " + replyDestinations);
+ // log.debug("replyDestinations = " + replyDestinations);
return replyDestinations;
}
@@ -642,12 +694,12 @@
*/
public void createProducer() throws JMSException
{
- log.debug("public void createProducer(): called");
+ // log.debug("public void createProducer(): called");
_producer = (MessageProducer) _producerSession.createProducer(null);
_producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
+ // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
}
/**
@@ -665,14 +717,14 @@
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
boolean durable) throws JMSException, AMQException
{
- log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
+ /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
+ selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
- + durable + "): called");
+ + durable + "): called");*/
_pingDestinations = new ArrayList<Destination>();
// Create the desired number of ping destinations and consumers for them.
- log.debug("Creating " + noOfDestinations + " destinations to ping.");
+ // log.debug("Creating " + noOfDestinations + " destinations to ping.");
for (int i = 0; i < noOfDestinations; i++)
{
@@ -683,12 +735,12 @@
// Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
if (unique)
{
- log.debug("Creating unique destinations.");
+ // log.debug("Creating unique destinations.");
id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
}
else
{
- log.debug("Creating shared destinations.");
+ // log.debug("Creating shared destinations.");
id = "_" + _queueSharedID.incrementAndGet();
}
@@ -698,14 +750,14 @@
if (!durable)
{
destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
- log.debug("Created non-durable topic " + destination);
+ // log.debug("Created non-durable topic " + destination);
}
else
{
destination =
AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
_clientID, (AMQConnection) _connection);
- log.debug("Created durable topic " + destination);
+ // log.debug("Created durable topic " + destination);
}
}
// Otherwise this is a p2p pinger, in which case create queues.
@@ -719,7 +771,7 @@
((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null,
ExchangeDefaults.DIRECT_EXCHANGE_NAME);
- log.debug("Created queue " + destination);
+ // log.debug("Created queue " + destination);
}
// Keep the destination.
@@ -737,20 +789,36 @@
*/
public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
{
- log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
- + ", String selector = " + selector + "): called");
+ /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ + ", String selector = " + selector + "): called");*/
- log.debug("Creating " + destinations.size() + " reply consumers.");
+ // log.debug("There are " + destinations.size() + " destinations.");
+ // log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
+ // log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers));
for (Destination destination : destinations)
{
- // Create a consumer for the destination and set this pinger to listen to its messages.
- _consumer =
- _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
- selector);
- _consumer.setMessageListener(this);
+ _consumer = new MessageConsumer[_noOfConsumers];
+
+ for (int i = 0; i < _noOfConsumers; i++)
+ {
+ // Create a consumer for the destination and set this pinger to listen to its messages.
+ _consumer[i] =
+ _consumerSession[i].createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
+ selector);
+
+ final int consumerNo = i;
+
+ _consumer[i].setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ onMessageWithConsumerNo(message, consumerNo);
+ }
+ });
- log.debug("Set this to listen to replies sent to destination: " + destination);
+ // log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
+ }
}
}
@@ -761,97 +829,123 @@
*
* @param message The received message.
*/
- public void onMessage(Message message)
+ public void onMessageWithConsumerNo(Message message, int consumerNo)
{
- // log.debug("public void onMessage(Message message): called");
-
+ // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called");
try
{
+ long now = System.nanoTime();
+ long timestamp = getTimestamp(message);
+ long pingTime = now - timestamp;
+
+ // NDC.push("cons" + consumerNo);
+
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
// log.debug("correlationID = " + correlationID);
- // Countdown on the traffic light if there is one for the matching correlation id.
- PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
+ int num = message.getIntProperty("MSG_NUM");
+ // log.info("Message " + num + " received.");
- if (perCorrelationId != null)
+ boolean isRedelivered = message.getJMSRedelivered();
+ // log.debug("isRedelivered = " + isRedelivered);
+
+ if (!isRedelivered)
{
- CountDownLatch trafficLight = perCorrelationId.trafficLight;
+ // Countdown on the traffic light if there is one for the matching correlation id.
+ PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
+
+ if (perCorrelationId != null)
+ {
+ CountDownLatch trafficLight = perCorrelationId.trafficLight;
- // Restart the timeout timer on every message.
- perCorrelationId.timeOutStart = System.nanoTime();
+ // Restart the timeout timer on every message.
+ perCorrelationId.timeOutStart = System.nanoTime();
- // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+ // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
- // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
- // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
- // ensures that each thread will get a unique value for the remaining messages.
- long trueCount = -1;
- long remainingCount = -1;
+ // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
+ // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
+ // ensures that each thread will get a unique value for the remaining messages.
+ long trueCount = -1;
+ long remainingCount = -1;
- synchronized (trafficLight)
- {
- trafficLight.countDown();
+ synchronized (trafficLight)
+ {
+ trafficLight.countDown();
- trueCount = trafficLight.getCount();
- remainingCount = trueCount - 1;
+ trueCount = trafficLight.getCount();
+ remainingCount = trueCount - 1;
- // Decrement the count of sent but not yet received messages.
- int unreceived = _unreceived.decrementAndGet();
- int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
+ // Decrement the count of sent but not yet received messages.
+ int unreceived = _unreceived.decrementAndGet();
+ int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
- // Release a waiting sender if there is one.
- synchronized (_sendPauseMonitor)
- {
- if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
- // && (_sendPauseBarrier.getNumberWaiting() == 1))
+ // Release a waiting sender if there is one.
+ synchronized (_sendPauseMonitor)
{
- log.debug("unreceived size estimate under limit = " + unreceivedSize);
-
- // Wait on the send pause barrier for the limit to be re-established.
- /*try
- {*/
- // _sendPauseBarrier.await();
- _sendPauseMonitor.notify();
- /*}
- catch (InterruptedException e)
+ if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
+ // && (_sendPauseBarrier.getNumberWaiting() == 1))
{
- throw new RuntimeException(e);
+ // log.debug("unreceived size estimate under limit = " + unreceivedSize);
+
+ // Wait on the send pause barrier for the limit to be re-established.
+ /*try
+ {*/
+ // _sendPauseBarrier.await();
+ _sendPauseMonitor.notify();
+ /*}
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (BrokenBarrierException e)
+ {
+ throw new RuntimeException(e);
+ }*/
}
- catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
- }*/
}
- }
- // log.debug("remainingCount = " + remainingCount);
- // log.debug("trueCount = " + trueCount);
+ // NDC.push("/rem" + remainingCount);
- // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
- // blocked, even on the last message.
- if ((remainingCount % _txBatchSize) == 0)
- {
- commitTx(_consumerSession);
- }
+ // log.debug("remainingCount = " + remainingCount);
+ // log.debug("trueCount = " + trueCount);
- // Forward the message and remaining count to any interested chained message listener.
- if (_chainedMessageListener != null)
- {
- _chainedMessageListener.onMessage(message, (int) remainingCount);
- }
+ // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
+ // blocked, even on the last message.
+ // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on
+ // each batch boundary. For pub/sub each consumer gets every message so no division is done.
+ long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
+ // log.debug("commitCount = " + commitCount);
- // Check if this is the last message, in which case release any waiting producers. This is done
- // after the transaction has been committed and any listeners notified.
- if (trueCount == 1)
- {
- trafficLight.countDown();
+ if ((commitCount % _txBatchSize) == 0)
+ {
+ // log.debug("Trying commit for consumer " + consumerNo + ".");
+ commitTx(_consumerSession[consumerNo]);
+ }
+
+ // Forward the message and remaining count to any interested chained message listener.
+ if (_chainedMessageListener != null)
+ {
+ _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime);
+ }
+
+ // Check if this is the last message, in which case release any waiting producers. This is done
+ // after the transaction has been committed and any listeners notified.
+ if (trueCount == 1)
+ {
+ trafficLight.countDown();
+ }
}
}
+ else
+ {
+ log.warn("Got unexpected message with correlationId: " + correlationID);
+ }
}
else
{
- log.warn("Got unexpected message with correlationId: " + correlationID);
+ log.warn("Got redelivered message, ignoring.");
}
// Print out ping times for every message in verbose mode only.
@@ -870,8 +964,11 @@
{
log.warn("There was a JMSException: " + e.getMessage(), e);
}
-
- // log.debug("public void onMessage(Message message): ending");
+ finally
+ {
+ // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending");
+ // NDC.clear();
+ }
}
/**
@@ -893,8 +990,8 @@
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
throws JMSException, InterruptedException
{
- log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
- + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
// Generate a unique correlation id to put on the messages before sending them, if one was not specified.
if (messageCorrelationId == null)
@@ -904,6 +1001,8 @@
try
{
+ // NDC.push("prod");
+
// Create a count down latch to count the number of replies with. This is created before the messages are
// sent so that the replies cannot be received before the count down is created.
// One is added to this, so that the last reply becomes a special case. The special case is that the
@@ -935,16 +1034,16 @@
allMessagesReceived = numReplies == getExpectedNumPings(numPings);
- log.debug("numReplies = " + numReplies);
- log.debug("allMessagesReceived = " + allMessagesReceived);
+ // log.debug("numReplies = " + numReplies);
+ // log.debug("allMessagesReceived = " + allMessagesReceived);
// Recheck the timeout condition.
long now = System.nanoTime();
long lastMessageReceievedAt = perCorrelationId.timeOutStart;
timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000);
- log.debug("now = " + now);
- log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
+ // log.debug("now = " + now);
+ // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
}
while (!timedOut && !allMessagesReceived);
@@ -957,9 +1056,9 @@
log.info("Got all replies on id, " + messageCorrelationId);
}
- commitTx(_consumerSession);
+ // commitTx(_consumerSession);
- log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
+ // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
return numReplies;
}
@@ -967,6 +1066,7 @@
// so will be a memory leak if this is not done.
finally
{
+ // NDC.pop();
perCorrelationIds.remove(messageCorrelationId);
}
}
@@ -982,8 +1082,8 @@
*/
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
{
- log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
- + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
if (message == null)
{
@@ -1067,7 +1167,7 @@
if (unreceivedSize > _maxPendingSize)
{
- log.debug("unreceived size estimate over limit = " + unreceivedSize);
+ // log.debug("unreceived size estimate over limit = " + unreceivedSize);
// Wait on the send pause barrier for the limit to be re-established.
try
@@ -1096,11 +1196,19 @@
// Send the message either to its round robin destination, or its default destination.
if (destination == null)
{
+ int num = numSent.incrementAndGet();
+ message.setIntProperty("MSG_NUM", num);
+ setTimestamp(message);
_producer.send(message);
+ // log.info("Message " + num + " sent.");
}
else
{
+ int num = numSent.incrementAndGet();
+ message.setIntProperty("MSG_NUM", num);
+ setTimestamp(message);
_producer.send(destination, message);
+ // log.info("Message " + num + " sent.");
}
// Increase the unreceived size, this may actually happen aftern the message is recevied.
@@ -1118,6 +1226,7 @@
// Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
if (((i + 1) % _txBatchSize) == 0)
{
+ // log.debug("Trying commit on producer session.");
committed = commitTx(_producerSession);
}
@@ -1135,7 +1244,7 @@
{
// Generate a sample message and time stamp it.
Message msg = getTestMessage(_replyDestination, _messageSize, _persistent);
- setTimestamp(msg);
+ // setTimestamp(msg);
// Send the message and wait for a reply.
pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
@@ -1143,12 +1252,12 @@
catch (JMSException e)
{
_publish = false;
- log.debug("There was a JMSException: " + e.getMessage(), e);
+ // log.debug("There was a JMSException: " + e.getMessage(), e);
}
catch (InterruptedException e)
{
_publish = false;
- log.debug("There was an interruption: " + e.getMessage(), e);
+ // log.debug("There was an interruption: " + e.getMessage(), e);
}
}
@@ -1186,7 +1295,7 @@
// Timestamp the message in nanoseconds.
- setTimestamp(msg);
+ // setTimestamp(msg);
return msg;
}
@@ -1227,6 +1336,16 @@
_publish = false;
}
+ public void start() throws JMSException
+ {
+ _connection.start();
+
+ for (int i = 0; i < _noOfConsumers; i++)
+ {
+ _consumerConnection[i].start();
+ }
+ }
+
/** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
public void run()
{
@@ -1245,7 +1364,7 @@
*/
public void onException(JMSException e)
{
- log.debug("public void onException(JMSException e = " + e + "): called", e);
+ // log.debug("public void onException(JMSException e = " + e + "): called", e);
_publish = false;
}
@@ -1267,30 +1386,29 @@
}
/**
- * Gets the underlying connection that this ping client is running on.
- *
- * @return The underlying connection that this ping client is running on.
- */
- public Connection getConnection()
- {
- return _connection;
- }
-
- /**
* Closes the pingers connection.
*
* @throws JMSException All JMSException are allowed to fall through.
*/
public void close() throws JMSException
{
- log.debug("public void close(): called");
+ // log.debug("public void close(): called");
try
{
if (_connection != null)
{
_connection.close();
- log.debug("Close connection.");
+ // log.debug("Close connection.");
+ }
+
+ for (int i = 0; i < _noOfConsumers; i++)
+ {
+ if (_consumerConnection[i] != null)
+ {
+ _consumerConnection[i].close();
+ // log.debug("Closed consumer connection.");
+ }
}
}
finally
@@ -1298,6 +1416,7 @@
_connection = null;
_producerSession = null;
_consumerSession = null;
+ _consumerConnection = null;
_producer = null;
_consumer = null;
_pingDestinations = null;
@@ -1306,8 +1425,8 @@
}
/**
- * Convenience method to commit the transaction on the specified session. If the session to commit on is not a
- * transactional session, this method does nothing (unless the failover after send flag is set).
+ * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a
+ * transactional controlSession, this method does nothing (unless the failover after send flag is set).
*
* <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is
* applied. This flag applies whether the pinger is transactional or not.
@@ -1316,9 +1435,9 @@
* is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the
* commit is applied. These flags will only apply if using a transactional pinger.
*
- * @param session The session to commit
+ * @param session The controlSession to commit
*
- * @return <tt>true</tt> if the session was committed, <tt>false</tt> if it was not.
+ * @return <tt>true</tt> if the controlSession was committed, <tt>false</tt> if it was not.
*
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
*
@@ -1347,6 +1466,8 @@
if (session.getTransacted())
{
+ // log.debug("Session is transacted.");
+
try
{
if (_failBeforeCommit)
@@ -1360,10 +1481,10 @@
waitForUser(KILL_BROKER_PROMPT);
}
- // long l = System.nanoTime();
+ long start = System.nanoTime();
session.commit();
committed = true;
- // log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms");
+ // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms");
if (_failAfterCommit)
{
@@ -1376,26 +1497,26 @@
waitForUser(KILL_BROKER_PROMPT);
}
- // log.trace("Session Commited.");
+ // log.debug("Session Commited.");
}
catch (JMSException e)
{
- log.debug("JMSException on commit:" + e.getMessage(), e);
+ // log.debug("JMSException on commit:" + e.getMessage(), e);
// Warn that the bounce back client is not available.
if (e.getLinkedException() instanceof AMQNoConsumersException)
{
- log.debug("No consumers on queue.");
+ // log.debug("No consumers on queue.");
}
try
{
session.rollback();
- log.debug("Message rolled back.");
+ // log.debug("Message rolled back.");
}
catch (JMSException jmse)
{
- log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
+ // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
// Both commit and rollback failed. Throw the rollback exception.
throw jmse;
@@ -1428,23 +1549,34 @@
}
/**
- * This value will be changed by PingClient to represent the number of clients connected to each topic.
+ * Gets the number of consumers that are listening to each destination in the test.
*
* @return int The number of consumers subscribing to each topic.
*/
- public int getConsumersPerTopic()
+ public int getConsumersPerDestination()
{
- return _consumersPerTopic;
+ return _noOfConsumers;
}
+ /**
+ * Calculates how many pings are expected to be received for the given number sent.
+ *
+ * @param numpings The number of pings that will be sent.
+ *
+ * @return The number that should be received, for the test to pass.
+ */
public int getExpectedNumPings(int numpings)
{
- return numpings * getConsumersPerTopic();
+ // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called");
+
+ // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers.");
+
+ return numpings * (_isPubSub ? getConsumersPerDestination() : 1);
}
/**
* Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link
- * PingPongProducer#onMessage} method is called, the chained listener set through the {@link
+ * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link
* PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of
* messages with that correlation id.
*
@@ -1454,7 +1586,17 @@
*/
public static interface ChainedMessageListener
{
- public void onMessage(Message message, int remainingCount) throws JMSException;
+ /**
+ * Notifies interested listeners about message arrival and important test stats, the number of messages
+ * remaining in the test, and the messages send timestamp.
+ *
+ * @param message The newly arrived message.
+ * @param remainingCount The number of messages left to complete the test.
+ * @param latency The nanosecond latency of the message.
+ *
+ * @throws JMSException Any JMS exceptions is allowed to fall through.
+ */
+ public void onMessage(Message message, int remainingCount, long latency) throws JMSException;
}
/**
Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java?rev=568952&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java Thu Aug 23 05:10:42 2007
@@ -0,0 +1,251 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+import javax.jms.*;
+
+/**
+ * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
+ * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from
+ * a producer to a conumer, then the consumer replies to the message on a temporary queue.
+ *
+ * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of the number
+ * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled
+ * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more
+ * information on how to do this.
+ *
+ * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
+ * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads
+ * gets its own connection/producer/consumer, this is only re-established if the connection is lost.
+ *
+ * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
+ * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come
+ * back on the temporary queue.
+ *
+ * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ */
+public class PingPongTestPerf extends AsymptoticTestCase
+{
+ private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
+
+ /** Thread local to hold the per-thread test setup fields. */
+ ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
+
+ // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
+ // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
+ // of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
+ // private Properties testParameters = System.getProperties();
+ private ParsedProperties testParameters =
+ TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
+
+ public PingPongTestPerf(String name)
+ {
+ super(name);
+
+ _logger.debug(testParameters);
+
+ // Sets up the test parameters with defaults.
+ /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT));
+ testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
+ PingPongProducer.PING_QUEUE_NAME_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
+ Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
+ Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME,
+ Boolean.toString(PingPongProducer.VERBOSE_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME,
+ Boolean.toString(PingPongProducer.PUBSUB_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME,
+ Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
+ PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
+ PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
+ PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
+ PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME,
+ Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
+ Integer.toString(PingPongProducer.ACK_MODE_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME,
+ PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping-Pong Performance Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingPongTestPerf("testPingPongOk"));
+
+ return suite;
+ }
+
+ private static void setSystemPropertyIfNull(String propName, String propValue)
+ {
+ if (System.getProperty(propName) == null)
+ {
+ System.setProperty(propName, propValue);
+ }
+ }
+
+ public void testPingPongOk(int numPings) throws Exception
+ {
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+ Message msg =
+ perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // Send the message and wait for a reply.
+ int numReplies =
+ perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null);
+
+ // Fail the test if the timeout was exceeded.
+ if (numReplies != numPings)
+ {
+ Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings);
+ }
+ }
+
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+ */
+ public void threadSetUp()
+ {
+ try
+ {
+ PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+ // Extract the test set up paramaeters.
+ String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
+ String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
+ String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+ String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME);
+ String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
+ boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
+ boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
+ String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
+ boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
+ boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME);
+
+ synchronized (this)
+ {
+ // Establish a bounce back client on the ping queue to bounce back the pings.
+ perThreadSetup._testPingBouncer =
+ new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent,
+ transacted, selector, verbose, pubsub);
+
+ // Start the connections for client and producer running.
+ perThreadSetup._testPingBouncer.getConnection().start();
+
+ // Establish a ping-pong client on the ping queue to send the pings and receive replies with.
+ perThreadSetup._testPingProducer = new PingPongProducer(testParameters);
+ perThreadSetup._testPingProducer.establishConnection(true, true);
+ perThreadSetup._testPingProducer.start();
+ }
+
+ // Attach the per-thread set to the thread.
+ threadSetup.set(perThreadSetup);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * Performs test fixture clean
+ */
+ public void threadTearDown()
+ {
+ _logger.debug("public void threadTearDown(): called");
+
+ try
+ {
+ // Get the per thread test fixture.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Close the pingers so that it cleans up its connection cleanly.
+ synchronized (this)
+ {
+ perThreadSetup._testPingProducer.close();
+ // perThreadSetup._testPingBouncer.close();
+ }
+
+ // Ensure the per thread fixture is reclaimed.
+ threadSetup.remove();
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("There was an exception during per thread tear down.");
+ }
+ }
+
+ protected static class PerThreadSetup
+ {
+ /**
+ * Holds the test ping-pong producer.
+ */
+ private PingPongProducer _testPingProducer;
+
+ /**
+ * Holds the test ping client.
+ */
+ private PingPongBouncer _testPingBouncer;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
------------------------------------------------------------------------------
svn:eol-style = native