You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/04/05 13:47:51 UTC
svn commit: r525800 [2/3] - in /incubator/qpid/trunk/qpid: ./
java/integrationtests/src/main/java/org/apache/qpid/util/
java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/
java/perftests/src/main/java/org/apache/qpid/ping/...
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=525800&r1=525799&r2=525800
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Thu Apr 5 04:47:50 2007
@@ -34,20 +34,22 @@
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.topic.Config;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.util.CommandLineParser;
import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
import uk.co.thebadgerset.junit.extensions.Throttle;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
/**
* PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may
@@ -65,31 +67,37 @@
* transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover
* testing. A complete list of accepted parameters, default values and comments on their usage is provided here:
*
- * <p/><table><caption>CRC Card</caption>
- * <tr><th> Parameter <th> Default <th> Comments
- * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers.
- * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping.
- * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used.
- * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions.
+ * <p/><table><caption>Parameters</caption>
+ * <tr><th> Parameter <th> Default <th> Comments
+ * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers.
+ * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping.
+ * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used.
+ * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions.
* <tr><td> broker <td> tcp://localhost:5672 <td> Determines the broker to connect to.
- * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over.
- * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
- * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message.
- * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default.
- * <tr><td> failAfterCommit <td> false <td> Whether to prompt user to kill broker after a commit batch.
- * <tr><td> failBeforeCommit <td> false <td> Whether to prompt user to kill broker before a commit batch.
- * <tr><td> failAfterSend <td> false <td> Whether to prompt user to kill broker after a send.
- * <tr><td> failBeforeSend <td> false <td> Whether to prompt user to kill broker before a send.
- * <tr><td> failOnce <td> true <td> Whether to prompt for failover only once.
- * <tr><td> username <td> guest <td> The username to access the broker with.
- * <tr><td> password <td> guest <td> The password to access the broker with.
- * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with.
- * <tr><td> destinationCount <td> 1 <td> The number of receivers listening to the pings.
- * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
- * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
- * <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
- * <tr><td> ackMode <td> NO_ACK <td> The message acknowledgement mode.
- * <tr><td> pauseBatch <td> 0 <td> In milliseconds. A pause to insert between transaction batches.
+ * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over.
+ * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
+ * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message.
+ * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default.
+ * <tr><td> failAfterCommit <td> false <td> Whether to prompt user to kill broker after a commit batch.
+ * <tr><td> failBeforeCommit <td> false <td> Whether to prompt user to kill broker before a commit batch.
+ * <tr><td> failAfterSend <td> false <td> Whether to prompt user to kill broker after a send.
+ * <tr><td> failBeforeSend <td> false <td> Whether to prompt user to kill broker before a send.
+ * <tr><td> failOnce <td> true <td> Whether to prompt for failover only once.
+ * <tr><td> username <td> guest <td> The username to access the broker with.
+ * <tr><td> password <td> guest <td> The password to access the broker with.
+ * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with.
+ * <tr><td> destinationCount <td> 1 <td> The number of receivers listening to the pings.
+ * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
+ * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
+ * <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
+ * 0 - SESSION_TRANSACTED
+ * 1 - AUTO_ACKNOWLEDGE
+ * 2 - CLIENT_ACKNOWLEDGE
+ * 3 - DUPS_OK_ACKNOWLEDGE
+ * 257 - NO_ACKNOWLEDGE
+ * 258 - PRE_ACKNOWLEDGE
+ * <tr><td> pauseBatch <td> 0 <td> In milliseconds. A pause to insert between transaction batches.
* </table>
*
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
@@ -121,7 +129,7 @@
*/
public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
{
- private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
+ private static final Logger log = Logger.getLogger(PingPongProducer.class);
/** Holds the name of the property to get the test message size from. */
public static final String MESSAGE_SIZE_PROPNAME = "messageSize";
@@ -181,31 +189,31 @@
public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit";
/** Holds the default failover after commit test flag. */
- public static final String FAIL_AFTER_COMMIT_DEFAULT = "false";
+ public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false;
/** Holds the name of the proeprty to get the fail before commit flag from. */
public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit";
/** Holds the default failover before commit test flag. */
- public static final String FAIL_BEFORE_COMMIT_DEFAULT = "false";
+ public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false;
/** Holds the name of the proeprty to get the fail after send flag from. */
public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend";
/** Holds the default failover after send test flag. */
- public static final String FAIL_AFTER_SEND_DEFAULT = "false";
+ public static final boolean FAIL_AFTER_SEND_DEFAULT = false;
/** Holds the name of the property to get the fail before send flag from. */
public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend";
/** Holds the default failover before send test flag. */
- public static final String FAIL_BEFORE_SEND_DEFAULT = "false";
+ public static final boolean FAIL_BEFORE_SEND_DEFAULT = false;
/** Holds the name of the property to get the fail once flag from. */
public static final String FAIL_ONCE_PROPNAME = "failOnce";
/** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */
- public static final String FAIL_ONCE_DEFAULT = "true";
+ public static final boolean FAIL_ONCE_DEFAULT = true;
/** Holds the name of the property to get the broker access username from. */
public static final String USERNAME_PROPNAME = "username";
@@ -223,7 +231,7 @@
public static final String SELECTOR_PROPNAME = "selector";
/** Holds the default message selector. */
- public static final String SELECTOR_DEFAULT = null;
+ public static final String SELECTOR_DEFAULT = "";
/** Holds the name of the proeprty to get the destination count from. */
public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
@@ -253,7 +261,7 @@
public static final String ACK_MODE_PROPNAME = "ackMode";
/** Defines the default message acknowledgement mode. */
- public static final int ACK_MODE_DEFAULT = Session.NO_ACKNOWLEDGE;
+ public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
/** Holds the name of the property to get the pause between batches property from. */
public static final String PAUSE_AFTER_BATCH_PROPNAME = "pauseBatch";
@@ -273,6 +281,86 @@
/** Holds the name of the property to store nanosecond timestamps in ping messages with. */
public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
+ /** Holds the default configuration properties. */
+ public static ParsedProperties defaults = new ParsedProperties();
+
+ static
+ {
+ defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
+ defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT);
+ defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
+ defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
+ defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+ defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
+ defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
+ defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
+ defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
+ defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
+ defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
+ defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT);
+ defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
+ defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
+ defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
+ defaults.setPropertyIfNull(PAUSE_AFTER_BATCH_PROPNAME, PAUSE_AFTER_BATCH_DEFAULT);
+ defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
+ }
+
+ protected String _brokerDetails;
+ protected String _username;
+ protected String _password;
+ protected String _virtualpath;
+ protected String _destinationName;
+ protected String _selector;
+ protected boolean _transacted;
+
+ /** Determines whether this producer sends persistent messages. */
+ protected boolean _persistent;
+
+ /** Holds the acknowledgement mode used for sending and receiving messages. */
+ private int _ackMode;
+
+ /** Determines what size of messages this producer sends. */
+ protected int _messageSize;
+
+ /** Used to indicate that the ping loop should print out whenever it pings. */
+ protected boolean _verbose;
+
+ /** Flag used to indicate if this is a point to point or pub/sub ping client. */
+ protected boolean _isPubSub;
+
+ /** Flag used to indicate if the destinations should be unique client. */
+ protected boolean _isUnique;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
+ protected boolean _failBeforeCommit;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
+ protected boolean _failAfterCommit;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
+ protected boolean _failBeforeSend;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
+ protected boolean _failAfterSend;
+
+ /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
+ protected boolean _failOnce;
+
+ /** Holds the number of sends that should be performed in every transaction when using transactions. */
+ protected int _txBatchSize;
+
+ protected int _noOfDestinations;
+ protected int _rate;
+
+ /** Holds the wait time to insert between every batch of messages committed. */
+ private long _pauseBatch;
+
/** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
@@ -286,83 +374,47 @@
/** A convenient formatter to use when time stamping output. */
protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
- /**
- * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
- * creating multiple ping producers in the same JVM.
- */
- protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
-
- /** Holds the destination where the response messages will arrive. */
- private Destination _replyDestination;
-
- /** Determines whether this producer sends persistent messages. */
- protected boolean _persistent;
+ /** Holds the connection to the broker. */
+ protected Connection _connection;
- /** Holds the acknowledgement mode used for sending and receiving messages. */
- private int _ackMode = Session.NO_ACKNOWLEDGE;
+ /** Holds the session on which ping replies are received. */
+ protected Session _consumerSession;
- /** Determines what size of messages this producer sends. */
- protected int _messageSize;
+ /** Holds the producer session, needed to create ping messages. */
+ protected Session _producerSession;
- /** Used to indicate that the ping loop should print out whenever it pings. */
- protected boolean _verbose = VERBOSE_DEFAULT;
+ /** Holds the destination where the response messages will arrive. */
+ protected Destination _replyDestination;
- /** Holds the session on which ping replies are received. */
- protected Session _consumerSession;
+ /** Holds the set of destinations that this ping producer pings. */
+ protected List<Destination> _pingDestinations;
/** Used to restrict the sending rate to a specified limit. */
- private Throttle _rateLimiter = null;
+ protected Throttle _rateLimiter;
/** Holds a message listener that this message listener chains all its messages to. */
- private ChainedMessageListener _chainedMessageListener = null;
-
- /** Flag used to indicate if this is a point to point or pub/sub ping client. */
- protected boolean _isPubSub = PUBSUB_DEFAULT;
+ protected ChainedMessageListener _chainedMessageListener = null;
- /** Flag used to indicate if the destinations should be unique client. */
- protected static boolean _isUnique = UNIQUE_DESTS_DEFAULT;
+ /**
+ * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
+ * creating multiple ping producers in the same JVM.
+ */
+ protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
/**
* This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
* on the same JVM using this id generator will allow them to ping on the same queues.
*/
- protected AtomicInteger _queueSharedId = new AtomicInteger();
+ protected AtomicInteger _queueSharedID = new AtomicInteger();
/** Used to tell the ping loop when to terminate, it only runs while this is true. */
protected boolean _publish = true;
- /** Holds the connection to the broker. */
- private Connection _connection;
-
- /** Holds the producer session, needed to create ping messages. */
- private Session _producerSession;
-
- /** Holds the set of destinations that this ping producer pings. */
- protected List<Destination> _pingDestinations = new ArrayList<Destination>();
-
/** Holds the message producer to send the pings through. */
protected MessageProducer _producer;
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
- protected boolean _failBeforeCommit = false;
-
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
- protected boolean _failAfterCommit = false;
-
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
- protected boolean _failBeforeSend = false;
-
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
- protected boolean _failAfterSend = false;
-
- /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
- protected boolean _failOnce = true;
-
- /** Holds the number of sends that should be performed in every transaction when using transactions. */
- protected int _txBatchSize = TX_BATCH_SIZE_DEFAULT;
-
- /** Holds the wait time to insert between every batch of messages committed. */
- private static long _pauseBatch = PAUSE_AFTER_BATCH_DEFAULT;
+ /** Holds the message consumer to receive the ping replies through. */
+ protected MessageConsumer _consumer;
/**
* Holds the number of consumers that will be attached to each topic. Each pings will result in a reply from each of the
@@ -370,202 +422,195 @@
*/
static int _consumersPerTopic = 1;
+ /** The prompt to display when asking the user to kill the broker for failover testing. */
+ private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
+
/**
- * Creates a ping producer with the specified parameters, of which there are many. See their individual comments for
- * details. This constructor creates a connection to the broker and creates producer and consumer sessions on it, to send
- * and recieve its pings and replies on. The other options are kept, and control how this pinger behaves.
- *
- * @param brokerDetails The URL of the broker to send pings to.
- * @param username The username to log onto the broker with.
- * @param password The password to log onto the broker with.
- * @param virtualpath The virtual host name to use on the broker.
- * @param destinationName The name (or root where multiple destinations are used) of the desitination to send pings to.
- * @param selector The selector to filter replies with.
- * @param transacted Indicates whether or not pings are sent and received in transactions.
- * @param persistent Indicates whether pings are sent using peristent delivery.
- * @param messageSize Specifies the size of ping messages to send.
- * @param verbose Indicates that information should be printed to the console on every ping.
- * @param afterCommit Indicates that the user should be promted to terminate a broker after commits to test
- * failover.
- * @param beforeCommit Indicates that the user should be promted to terminate a broker before commits to test
- * failover.
- * @param afterSend Indicates that the user should be promted to terminate a broker after sends to test failover.
- * @param beforeSend Indicates that the user should be promted to terminate a broker before sends to test
- * failover.
- * @param failOnce Indicates that the failover testing behaviour should only happen on the first commit, not
- * all.
- * @param txBatchSize Specifies the number of pings to send in each transaction.
- * @param noOfDestinations The number of destinations to ping. Must be 1 or more.
- * @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as
- * possible, with no rate restriction.
- * @param pubsub True to ping topics, false to ping queues.
- * @param unique True to use unique destinations for each ping pong producer, false to share.
+ * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
+ * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
+ * it, to send and recieve its pings and replies on.
+ *
+ * @param overrides Properties containing any desired overrides to the defaults.
*
* @throws Exception Any exceptions are allowed to fall through.
*/
- public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
- String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
- boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
- boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
- boolean pubsub, boolean unique, int ackMode, long pause) throws Exception
- {
- _logger.debug("public PingPongProducer(String brokerDetails = " + brokerDetails + ", String username = " + username
- + ", String password = " + password + ", String virtualpath = " + virtualpath
- + ", String destinationName = " + destinationName + ", String selector = " + selector
- + ", boolean transacted = " + transacted + ", boolean persistent = " + persistent
- + ", int messageSize = " + messageSize + ", boolean verbose = " + verbose + ", boolean afterCommit = "
- + afterCommit + ", boolean beforeCommit = " + beforeCommit + ", boolean afterSend = " + afterSend
- + ", boolean beforeSend = " + beforeSend + ", boolean failOnce = " + failOnce + ", int txBatchSize = "
- + txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate
- + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + ", ackMode = " + ackMode
- + "): called");
-
- // Keep all the relevant options.
- _persistent = persistent;
- _messageSize = messageSize;
- _verbose = verbose;
- _failAfterCommit = afterCommit;
- _failBeforeCommit = beforeCommit;
- _failAfterSend = afterSend;
- _failBeforeSend = beforeSend;
- _failOnce = failOnce;
- _txBatchSize = txBatchSize;
- _isPubSub = pubsub;
- _isUnique = unique;
- _pauseBatch = pause;
+ public PingPongProducer(Properties overrides) throws Exception
+ {
+ log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
- if (ackMode != 0)
- {
- _ackMode = ackMode;
- }
+ // Create a set of parsed properties from the defaults overriden by the passed in values.
+ ParsedProperties properties = new ParsedProperties(defaults);
+ properties.putAll(overrides);
+
+ // Extract the configuration properties to set the pinger up with.
+ _brokerDetails = properties.getProperty(BROKER_PROPNAME);
+ _username = properties.getProperty(USERNAME_PROPNAME);
+ _password = properties.getProperty(PASSWORD_PROPNAME);
+ _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME);
+ _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
+ _selector = properties.getProperty(SELECTOR_PROPNAME);
+ _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+ _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
+ _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
+ _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
+ _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME);
+ _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME);
+ _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME);
+ _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME);
+ _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME);
+ _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME);
+ _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME);
+ _rate = properties.getPropertyAsInteger(RATE_PROPNAME);
+ _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
+ _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
+ _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ _pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME);
// Check that one or more destinations were specified.
- if (noOfDestinations < 1)
+ if (_noOfDestinations < 1)
{
throw new IllegalArgumentException("There must be at least one destination.");
}
- // Create a connection to the broker.
+ // Set up a throttle to control the send rate, if a rate > 0 is specified.
+ if (_rate > 0)
+ {
+ _rateLimiter = new BatchedThrottle();
+ _rateLimiter.setRate(_rate);
+ }
+
+ // Create the connection and message producers/consumers.
+ // establishConnection(true, true);
+ }
+
+ /**
+ * Establishes a connection to the broker and creates message consumers and producers based on the parameters
+ * that this ping client was created with.
+ *
+ * @param producer Flag to indicate whether or not the producer should be set up.
+ * @param consumer Flag to indicate whether or not the consumers should be set up.
+ *
+ * @throws Exception Any exceptions are allowed to fall through.
+ */
+ public void establishConnection(boolean producer, boolean consumer) throws Exception
+ {
+ log.debug("public void establishConnection(): called");
+
+ // Generate a unique identifying name for this client, based on it ip address and the current time.
InetAddress address = InetAddress.getLocalHost();
String clientID = address.getHostName() + System.currentTimeMillis();
- _connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath);
+ // Create a connection to the broker.
+ createConnection(clientID);
// Create transactional or non-transactional sessions, based on the command line arguments.
- _producerSession = (Session) getConnection().createSession(transacted, _ackMode);
- _consumerSession = (Session) getConnection().createSession(transacted, _ackMode);
+ _producerSession = (Session) getConnection().createSession(_transacted, _ackMode);
+ _consumerSession = (Session) getConnection().createSession(_transacted, _ackMode);
- // Set up a throttle to control the send rate, if a rate > 0 is specified.
- if (rate > 0)
+ // Create the destinations to send pings to and receive replies from.
+ _replyDestination = _consumerSession.createTemporaryQueue();
+ createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique);
+
+ // Create the message producer only if instructed to.
+ if (producer)
{
- _rateLimiter = new BatchedThrottle();
- _rateLimiter.setRate(rate);
+ createProducer();
}
- // Create the temporary queue for replies.
- _replyDestination = _consumerSession.createTemporaryQueue();
+ // Create the message consumer only if instructed to.
+ if (consumer)
+ {
+ createReplyConsumers(getReplyDestinations(), _selector);
+ }
+ }
- // Create the producer and the consumers for all reply destinations.
- createProducer();
- createPingDestinations(noOfDestinations, selector, destinationName, unique);
- createReplyConsumers(getReplyDestinations(), selector);
+ /**
+ * Establishes a connection to the broker, based on the configuration parameters that this ping client was
+ * created with.
+ *
+ * @param clientID The clients identifier.
+ *
+ * @throws AMQException Any underlying exceptions are allowed to fall through.
+ * @throws URLSyntaxException Any underlying exceptions are allowed to fall through.
+ */
+ protected void createConnection(String clientID) throws AMQException, URLSyntaxException
+ {
+ _connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath);
}
/**
- * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs to be
- * started to bounce the pings back again.
+ * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
+ * to be started to bounce the pings back again.
*
* @param args The command line arguments.
- *
- * @throws Exception When something went wrong with the test
*/
- public static void main(String[] args) throws Exception
+ public static void main(String[] args)
{
- // Extract the command line.
- Config config = new Config();
- config.setOptions(args);
- if (args.length == 0)
+ try
{
- _logger.info("Running test with default values...");
- // usage();
- // System.exit(0);
- }
+ Properties options = processCommandLine(args);
+
+ // Create a ping producer overriding its defaults with all options passed on the command line.
+ PingPongProducer pingProducer = new PingPongProducer(options);
+ pingProducer.establishConnection(true, true);
+
+ // Start the ping producers dispatch thread running.
+ pingProducer.getConnection().start();
- String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = VIRTUAL_HOST_DEFAULT;
- String selector = (config.getSelector() == null) ? SELECTOR_DEFAULT : config.getSelector();
- boolean verbose = true;
- boolean transacted = config.isTransacted();
- boolean persistent = config.usePersistentMessages();
- int messageSize = (config.getPayload() != 0) ? config.getPayload() : MESSAGE_SIZE_DEAFULT;
- // int messageCount = config.getMessages();
- int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DESTINATION_COUNT_DEFAULT;
- int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : TX_BATCH_SIZE_DEFAULT;
- int rate = (config.getRate() != 0) ? config.getRate() : RATE_DEFAULT;
- boolean pubsub = config.isPubSub();
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
- String destName = config.getDestination();
- if (destName == null)
+ // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+ pingProducer.getConnection().setExceptionListener(pingProducer);
+
+ // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
+ Thread pingThread = new Thread(pingProducer);
+ pingThread.run();
+ pingThread.join();
+ }
+ catch (Exception e)
{
- destName = PING_QUEUE_NAME_DEFAULT;
+ System.err.println(e.getMessage());
+ log.error("Top level handler caught execption.", e);
+ System.exit(1);
}
+ }
+
+ /**
+ * Extracts all name=value pairs from the command line, sets them all as system properties and also returns
+ * a map of properties containing them.
+ *
+ * @param args The command line.
+ *
+ * @return A set of properties containing all name=value pairs from the command line.
+ *
+ * @todo This is a commonly used piece of code. Make it accept a command line definition and move it into the
+ * CommandLineParser class.
+ */
+ protected static Properties processCommandLine(String[] args)
+ {
+ // Use the command line parser to evaluate the command line.
+ CommandLineParser commandLine = new CommandLineParser(new String[][] {});
- boolean afterCommit = false;
- boolean beforeCommit = false;
- boolean afterSend = false;
- boolean beforeSend = false;
- boolean failOnce = false;
+ // Capture the command line arguments or display errors and correct usage and then exit.
+ Properties options = null;
- for (String arg : args)
+ try
{
- if (arg.startsWith("failover:"))
- {
- // failover:<before|after>:<send:commit> | failover:once
- String[] parts = arg.split(":");
- if (parts.length == 3)
- {
- if (parts[2].equals("commit"))
- {
- afterCommit = parts[1].equals("after");
- beforeCommit = parts[1].equals("before");
- }
-
- if (parts[2].equals("send"))
- {
- afterSend = parts[1].equals("after");
- beforeSend = parts[1].equals("before");
- }
+ options = commandLine.parseCommandLine(args);
- if (parts[1].equals("once"))
- {
- failOnce = true;
- }
- }
- else
- {
- System.out.println("Unrecognized failover request:" + arg);
- }
- }
+ // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
+ // overridden values from there.
+ commandLine.addCommandLineToSysProperties();
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(commandLine.getErrors());
+ System.out.println(commandLine.getUsage());
+ System.exit(1);
}
- // Create a ping producer to handle the request/wait/reply cycle.
- PingPongProducer pingProducer =
- new PingPongProducer(brokerDetails, USERNAME_DEFAULT, PASSWORD_DEFAULT, virtualpath, destName, selector,
- transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0, 0);
-
- pingProducer.getConnection().start();
-
- // Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
-
- // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- pingProducer.getConnection().setExceptionListener(pingProducer);
-
- // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
- Thread pingThread = new Thread(pingProducer);
- pingThread.run();
- pingThread.join();
+ return options;
}
/**
@@ -582,9 +627,7 @@
Thread.sleep(sleepTime);
}
catch (InterruptedException ie)
- {
- // ignore
- }
+ { }
}
}
@@ -596,27 +639,30 @@
*/
public List<Destination> getReplyDestinations()
{
- _logger.debug("public List<Destination> getReplyDestinations(): called");
+ log.debug("public List<Destination> getReplyDestinations(): called");
List<Destination> replyDestinations = new ArrayList<Destination>();
replyDestinations.add(_replyDestination);
+ log.debug("replyDestinations = " + replyDestinations);
+
return replyDestinations;
}
/**
- * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery flag
- * is set accoring the ping producer creation options.
+ * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery
+ * flag is set accoring the ping producer creation options.
*
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
public void createProducer() throws JMSException
{
- _logger.debug("public void createProducer(): called");
+ log.debug("public void createProducer(): called");
_producer = (MessageProducer) _producerSession.createProducer(null);
- // _producer.setDisableMessageTimestamp(true);
_producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
}
/**
@@ -632,13 +678,16 @@
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
- throws JMSException
+ throws JMSException
{
- _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations
- + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = "
- + unique + "): called");
+ log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
+ + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + "): called");
+
+ _pingDestinations = new ArrayList<Destination>();
// Create the desired number of ping destinations and consumers for them.
+ log.debug("Creating " + noOfDestinations + " destinations to ping.");
+
for (int i = 0; i < noOfDestinations; i++)
{
AMQDestination destination;
@@ -648,26 +697,26 @@
// Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
if (unique)
{
- _logger.debug("Creating unique destinations.");
+ log.debug("Creating unique destinations.");
id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
}
else
{
- _logger.debug("Creating shared destinations.");
- id = "_" + _queueSharedId.incrementAndGet();
+ log.debug("Creating shared destinations.");
+ id = "_" + _queueSharedID.incrementAndGet();
}
// Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
- _logger.debug("Created topic " + destination);
+ log.debug("Created topic " + destination);
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
- _logger.debug("Created queue " + destination);
+ log.debug("Created queue " + destination);
}
// Keep the destination.
@@ -676,6 +725,29 @@
}
/**
+ * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
+ *
+ * @param destinations The destinations to listen to.
+ * @param selector A selector to filter the messages with.
+ *
+ * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
+ */
+ public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
+ {
+ log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ + ", String selector = " + selector + "): called");
+
+ for (Destination destination : destinations)
+ {
+ // Create a consumer for the destination and set this pinger to listen to its messages.
+ _consumer =
+ _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
+ selector);
+ _consumer.setMessageListener(this);
+ }
+ }
+
+ /**
* Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a correlating
* reply may be waiting on. This is only done if the reply has a correlation id that is expected in the replies map.
*
@@ -683,13 +755,13 @@
*/
public void onMessage(Message message)
{
- _logger.debug("public void onMessage(Message message): called");
+ log.debug("public void onMessage(Message message): called");
try
{
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
- _logger.debug("correlationID = " + correlationID);
+ log.debug("correlationID = " + correlationID);
// Countdown on the traffic light if there is one for the matching correlation id.
PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
@@ -701,7 +773,7 @@
// Restart the timeout timer on every message.
perCorrelationId.timeOutStart = System.nanoTime();
- _logger.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+ log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
// Decrement the countdown latch. Before this point, it is possible that two threads might enter this
// method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
@@ -716,20 +788,14 @@
trueCount = trafficLight.getCount();
remainingCount = trueCount - 1;
- _logger.debug("remainingCount = " + remainingCount);
- _logger.debug("trueCount = " + trueCount);
+ log.debug("remainingCount = " + remainingCount);
+ log.debug("trueCount = " + trueCount);
// Commit on transaction batch size boundaries. At this point in time the waiting producer remains
// blocked, even on the last message.
if ((remainingCount % _txBatchSize) == 0)
{
commitTx(_consumerSession);
- if (!_consumerSession.getTransacted() &&
- _consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- // Acknowledge the messages when the session is not transacted but client_ack
- ((AMQSession) _consumerSession).acknowledge();
- }
}
// Forward the message and remaining count to any interested chained message listener.
@@ -748,7 +814,7 @@
}
else
{
- _logger.warn("Got unexpected message with correlationId: " + correlationID);
+ log.warn("Got unexpected message with correlationId: " + correlationID);
}
// Print out ping times for every message in verbose mode only.
@@ -759,48 +825,16 @@
if (timestamp != null)
{
long diff = System.nanoTime() - timestamp;
- _logger.trace("Time for round trip (nanos): " + diff);
+ log.trace("Time for round trip (nanos): " + diff);
}
}
}
catch (JMSException e)
{
- _logger.warn("There was a JMSException: " + e.getMessage(), e);
+ log.warn("There was a JMSException: " + e.getMessage(), e);
}
- _logger.debug("public void onMessage(Message message): ending");
- }
-
- /**
- * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out before a
- * reply arrives, then a null reply is returned from this method. This method generates a new unqiue correlation id for
- * the messages.
- *
- * @param message The message to send.
- * @param numPings The number of ping messages to send.
- * @param timeout The timeout in milliseconds.
- *
- * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for
- * all prematurely.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- * @throws InterruptedException When interrupted by a timeout.
- */
- public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
- {
- _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
- + timeout + "): called");
-
- // Create a unique correlation id to put on the messages before sending them.
- String messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
-
- return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
- }
-
- public int pingAndWaitForReply(int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
- {
- return pingAndWaitForReply(null, numPings, timeout, messageCorrelationId);
+ log.debug("public void onMessage(Message message): ending");
}
/**
@@ -808,10 +842,10 @@
* reply arrives, then a null reply is returned from this method. This method allows the caller to specify the
* correlation id.
*
- * @param message The message to send.
+ * @param message The message to send. If this is null, one is generated.
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
- * @param messageCorrelationId The message correlation id.
+ * @param messageCorrelationId The message correlation id. If this is null, one is generated.
*
* @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for
* all prematurely.
@@ -820,10 +854,16 @@
* @throws InterruptedException When interrupted by a timeout
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
+ throws JMSException, InterruptedException
{
- _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
- + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+
+ // Generate a unique correlation id to put on the messages before sending them, if one was not specified.
+ if (messageCorrelationId == null)
+ {
+ messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
+ }
try
{
@@ -858,31 +898,31 @@
allMessagesReceived = numReplies == getExpectedNumPings(numPings);
- _logger.debug("numReplies = " + numReplies);
- _logger.debug("allMessagesReceived = " + allMessagesReceived);
+ log.debug("numReplies = " + numReplies);
+ log.debug("allMessagesReceived = " + allMessagesReceived);
// Recheck the timeout condition.
long now = System.nanoTime();
long lastMessageReceievedAt = perCorrelationId.timeOutStart;
timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000);
- _logger.debug("now = " + now);
- _logger.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
+ log.debug("now = " + now);
+ log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
}
while (!timedOut && !allMessagesReceived);
if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
{
- _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
}
else if (_verbose)
{
- _logger.info("Got all replies on id, " + messageCorrelationId);
+ log.info("Got all replies on id, " + messageCorrelationId);
}
commitTx(_consumerSession);
- _logger.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
+ log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
return numReplies;
}
@@ -905,8 +945,8 @@
*/
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
{
- _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
- + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ + ", String messageCorrelationId = " + messageCorrelationId + "): called");
if (message == null)
{
@@ -923,40 +963,19 @@
// Send all of the ping messages.
for (int i = 0; i < numPings; i++)
{
- // Reset the committed flag to indicate that there are uncommitted messages.
+ // Reset the committed flag to indicate that there may be uncommitted messages.
committed = false;
// Re-timestamp the message.
message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
- // Round robin the destinations as the messages are sent.
- // return _destinationCount;
- sendMessage(_pingDestinations.get(i % _pingDestinations.size()), message);
-
- // Apply message rate throttling if a rate limit has been set up.
- if (_rateLimiter != null)
- {
- _rateLimiter.throttle();
- }
-
- // Call commit every time the commit batch size is reached.
- if ((i % _txBatchSize) == 0)
- {
- commitTx(_producerSession);
- committed = true;
-
- /* This pause is required for some cases. eg in load testing when sessions are non-transacted the
- Mina IO layer can't clear the cache in time. So this pause gives enough time for mina to clear
- the cache (without this mina throws OutOfMemoryError). pause() will check if time is != 0
- */
- pause(_pauseBatch);
- }
+ // Send the message, passing in the message count.
+ committed = sendMessage(i, message);
// Spew out per message timings on every message sonly in verbose mode.
if (_verbose)
{
- _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, "
- + messageCorrelationId);
+ log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
}
}
@@ -968,7 +987,70 @@
}
/**
- * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
+ * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of
+ * messages sent so far must be specified and is used to round robin the ping destinations (where there are more
+ * than one), and to determine if the transaction batch size has been reached and the sent messages should be
+ * committed.
+ *
+ * @param i The count of messages sent so far in a loop of multiple calls to this send method.
+ * @param message The message to send.
+ *
+ * @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise.
+ *
+ * @throws JMSException All underlyiung JMSExceptions are allowed to fall through.
+ */
+ protected boolean sendMessage(int i, Message message) throws JMSException
+ {
+ // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
+ // log.debug("_txBatchSize = " + _txBatchSize);
+
+ // Round robin the destinations as the messages are sent.
+ Destination destination = _pingDestinations.get(i % _pingDestinations.size());
+
+ // Prompt the user to kill the broker when doing failover testing.
+ if (_failBeforeSend)
+ {
+ if (_failOnce)
+ {
+ _failBeforeSend = false;
+ }
+
+ log.trace("Failing Before Send");
+ waitForUser(KILL_BROKER_PROMPT);
+ }
+
+ // Send the message either to its round robin destination, or its default destination.
+ if (destination == null)
+ {
+ _producer.send(message);
+ }
+ else
+ {
+ _producer.send(destination, message);
+ }
+
+ // Apply message rate throttling if a rate limit has been set up.
+ if (_rateLimiter != null)
+ {
+ _rateLimiter.throttle();
+ }
+
+ // Call commit every time the commit batch size is reached.
+ boolean committed = false;
+
+ // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
+ if (((i + 1) % _txBatchSize) == 0)
+ {
+ committed = commitTx(_producerSession);
+ }
+
+ return committed;
+ }
+
+ /**
+ * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction
+ * batch size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared,
+ * which will terminate the pinger.
*/
public void pingLoop()
{
@@ -979,25 +1061,20 @@
msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
// Send the message and wait for a reply.
- pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT);
+ pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
}
catch (JMSException e)
{
_publish = false;
- _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ log.debug("There was a JMSException: " + e.getMessage(), e);
}
catch (InterruptedException e)
{
_publish = false;
- _logger.debug("There was an interruption: " + e.getMessage(), e);
+ log.debug("There was an interruption: " + e.getMessage(), e);
}
}
- public Destination getReplyDestination()
- {
- return getReplyDestinations().get(0);
- }
-
/**
* Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set here.
*
@@ -1038,8 +1115,8 @@
}
/**
- * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag has
- * been cleared.
+ * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
+ * flag has been cleared.
*/
public void stop()
{
@@ -1066,8 +1143,8 @@
*/
public void onException(JMSException e)
{
+ log.debug("public void onException(JMSException e = " + e + "): called", e);
_publish = false;
- _logger.debug("There was a JMSException: " + e.getMessage(), e);
}
/**
@@ -1079,12 +1156,12 @@
public Thread getShutdownHook()
{
return new Thread(new Runnable()
- {
- public void run()
{
- stop();
- }
- });
+ public void run()
+ {
+ stop();
+ }
+ });
}
/**
@@ -1098,40 +1175,30 @@
}
/**
- * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
- *
- * @param destinations The destinations to listen to.
- * @param selector A selector to filter the messages with.
- *
- * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
- */
- public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
- {
- _logger.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
- + ", String selector = " + selector + "): called");
-
- for (Destination destination : destinations)
- {
- // Create a consumer for the destination and set this pinger to listen to its messages.
- MessageConsumer consumer =
- _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
- selector);
- consumer.setMessageListener(this);
- }
- }
-
- /**
* Closes the pingers connection.
*
* @throws JMSException All JMSException are allowed to fall through.
*/
public void close() throws JMSException
{
- _logger.debug("public void close(): called");
+ log.debug("public void close(): called");
- if (_connection != null)
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ finally
{
- _connection.close();
+ _connection = null;
+ _producerSession = null;
+ _consumerSession = null;
+ _producer = null;
+ _consumer = null;
+ _pingDestinations = null;
+ _replyDestination = null;
}
}
@@ -1150,25 +1217,29 @@
*
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
*
+ * @return <tt>true</tt> if the session was committed, <tt>false</tt> if it was not.
+ *
* @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
* method, because commits only apply to transactional pingers, but fail after send applied to transactional and
* non-transactional alike.
*/
- protected void commitTx(Session session) throws JMSException
+ protected boolean commitTx(Session session) throws JMSException
{
- _logger.debug("protected void commitTx(Session session): called");
+ log.debug("protected void commitTx(Session session): called");
+
+ boolean committed = false;
- _logger.trace("Batch time reached");
+ log.trace("Batch time reached");
if (_failAfterSend)
{
- _logger.trace("Batch size reached");
+ log.trace("Batch size reached");
if (_failOnce)
{
_failAfterSend = false;
}
- _logger.trace("Failing After Send");
- doFailover();
+ log.trace("Failing After Send");
+ waitForUser(KILL_BROKER_PROMPT);
}
if (session.getTransacted())
@@ -1182,13 +1253,14 @@
_failBeforeCommit = false;
}
- _logger.trace("Failing Before Commit");
- doFailover();
+ log.trace("Failing Before Commit");
+ waitForUser(KILL_BROKER_PROMPT);
}
- long l = System.currentTimeMillis();
+ long l = System.nanoTime();
session.commit();
- _logger.debug("Time taken to commit :" + (System.currentTimeMillis() - l) + " ms");
+ committed = true;
+ log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms");
if (_failAfterCommit)
{
@@ -1197,84 +1269,56 @@
_failAfterCommit = false;
}
- _logger.trace("Failing After Commit");
- doFailover();
+ log.trace("Failing After Commit");
+ waitForUser(KILL_BROKER_PROMPT);
}
- _logger.trace("Session Commited.");
+ log.trace("Session Commited.");
}
catch (JMSException e)
{
- _logger.trace("JMSException on commit:" + e.getMessage(), e);
+ log.trace("JMSException on commit:" + e.getMessage(), e);
// Warn that the bounce back client is not available.
if (e.getLinkedException() instanceof AMQNoConsumersException)
{
- _logger.debug("No consumers on queue.");
+ log.debug("No consumers on queue.");
}
try
{
session.rollback();
- _logger.trace("Message rolled back.");
+ log.trace("Message rolled back.");
}
catch (JMSException jmse)
{
- _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+ log.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
// Both commit and rollback failed. Throw the rollback exception.
throw jmse;
}
}
}
+
+ return committed;
}
/**
- * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination of
- * the ping producer. If an explicit destination is set, this overrides the default.
+ * Outputs a prompt to the console and waits for the user to press return.
*
- * @param destination The destination to send to.
- * @param message The message to send.
- *
- * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
+ * @param prompt The prompt to display on the console.
*/
- protected void sendMessage(Destination destination, Message message) throws JMSException
+ protected void waitForUser(String prompt)
{
- if (_failBeforeSend)
- {
- if (_failOnce)
- {
- _failBeforeSend = false;
- }
-
- _logger.trace("Failing Before Send");
- doFailover();
- }
-
- if (destination == null)
- {
- _producer.send(message);
- }
- else
- {
- _producer.send(destination, message);
- }
- }
+ System.out.println(prompt);
- /**
- * Prompts the user to terminate the broker, in order to test failover functionality. This method will block until the
- * user supplied some input on the terminal.
- */
- protected void doFailover()
- {
- System.out.println("Kill Broker now then press return");
try
{
System.in.read();
}
catch (IOException e)
{
- // ignore
+ // Ignored.
}
System.out.println("Continuing.");
Modified: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java?view=diff&rev=525800&r1=525799&r2=525800
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Thu Apr 5 04:47:50 2007
@@ -86,7 +86,7 @@
// Sets up the test parameters with defaults.
testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
- Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT));
+ Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT));
}
/**
@@ -159,7 +159,7 @@
// Send the requested number of messages, and wait until they have all been received.
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = pingClient.pingAndWaitForReply(numPings, timeout, perThreadSetup._correlationId);
+ int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId);
// Check that all the replies were received and log a fail if they were not.
if (numReplies < perCorrelationId._expectedCount)
@@ -247,8 +247,8 @@
String correlationId = message.getJMSCorrelationID();
_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount
- + "): called on batch boundary for message id: " + correlationId + " with thread id: "
- + Thread.currentThread().getId());
+ + "): called on batch boundary for message id: " + correlationId + " with thread id: "
+ + Thread.currentThread().getId());
// Get the details for the correlation id and check that they are not null. They can become null
// if a test times out.
Modified: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java?view=diff&rev=525800&r1=525799&r2=525800
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java Thu Apr 5 04:47:50 2007
@@ -175,7 +175,7 @@
// Send the requested number of messages, and wait until they have all been received.
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout);
+ int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
// Check that all the replies were received and log a fail if they were not.
if (numReplies < numPings)
Modified: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=525800&r1=525799&r2=525800
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Thu Apr 5 04:47:50 2007
@@ -66,19 +66,22 @@
ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
/** Holds a property reader to extract the test parameters from. */
- protected ParsedProperties testParameters = new TestContextProperties(System.getProperties());
+ protected ParsedProperties testParameters =
+ TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
public PingTestPerf(String name)
{
super(name);
+ _logger.debug("testParameters = " + testParameters);
+
// Sets up the test parameters with defaults.
- testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
+ /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, PingPongProducer.MESSAGE_SIZE_DEAFULT);
testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
- PingPongProducer.PING_QUEUE_NAME_DEFAULT);
+ PingPongProducer.PING_QUEUE_NAME_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
- PingPongProducer.PERSISTENT_MODE_DEFAULT);
+ PingPongProducer.PERSISTENT_MODE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, PingPongProducer.TRANSACTED_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT);
@@ -90,20 +93,20 @@
testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, PingPongProducer.TIMEOUT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME,
- PingPongProducer.DESTINATION_COUNT_DEFAULT);
+ PingPongProducer.DESTINATION_COUNT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
- PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
+ PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
- PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
+ PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
- PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
+ PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
- PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
+ PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, PingPongProducer.UNIQUE_DESTS_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, PingPongProducer.ACK_MODE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME,
- PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);
+ PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/
}
/**
@@ -139,20 +142,18 @@
// Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(
- PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(
- PingPongProducer.PERSISTENT_MODE_PROPNAME));
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
// start the test
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout);
+ int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
// Fail the test if the timeout was exceeded.
if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings))
{
Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
- + numReplies);
+ + numReplies);
}
}
@@ -167,43 +168,13 @@
{
PerThreadSetup perThreadSetup = new PerThreadSetup();
- // Extract the test set up paramaeters.
- String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
- String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
- String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
- String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME);
- String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
- boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
- boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
- String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
- boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
- int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
- int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
- boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME);
- boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
- boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
- boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
- boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
- int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.TX_BATCH_SIZE_PROPNAME);
- Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
- boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_DESTS_PROPNAME);
- int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
- int pausetime = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
-
- // Extract the test set up paramaeters.
- int destinationscount =
- Integer.parseInt(testParameters.getProperty(PingPongProducer.DESTINATION_COUNT_PROPNAME));
-
// This is synchronized because there is a race condition, which causes one connection to sleep if
// all threads try to create connection concurrently.
synchronized (this)
{
// Establish a client to ping a Destination and listen the reply back from same Destination
- perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName,
- selector, transacted, persistent, messageSize, verbose,
- failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
- failOnce, batchSize, destinationscount, rate, pubsub, unique,
- ackMode, pausetime);
+ perThreadSetup._pingClient = new PingClient(testParameters);
+ perThreadSetup._pingClient.establishConnection(true, true);
}
// Start the client connection
perThreadSetup._pingClient.getConnection().start();