You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/11/09 16:31:04 UTC
svn commit: r593566 [4/4] - in /incubator/qpid/trunk/qpid/java/perftests: ./
etc/jndi/ etc/scripts/ src/main/java/org/apache/qpid/ping/
src/main/java/org/apache/qpid/requestreply/
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=593566&r1=593565&r2=593566&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Fri Nov 9 07:31:03 2007
@@ -21,23 +21,27 @@
package org.apache.qpid.requestreply;
import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
-import org.apache.qpid.client.message.TestMessageFactory;
-import org.apache.qpid.util.CommandLineParser;
+import org.apache.qpid.test.framework.TestUtils;
import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
import uk.co.thebadgerset.junit.extensions.Throttle;
+import uk.co.thebadgerset.junit.extensions.util.CommandLineParser;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
import javax.jms.*;
import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
-import java.io.IOException;
+import java.io.*;
import java.net.InetAddress;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -48,16 +52,16 @@
* to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping
* pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour
* configurable.
- * <p/>
+ *
* <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This
* means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation
* id in the ping to be bounced back in the reply correlation id.
- * <p/>
+ *
* <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It
* can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within
* transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover
* testing. A complete list of accepted parameters, default values and comments on their usage is provided here:
- * <p/>
+ *
* <p/><table><caption>Parameters</caption>
* <tr><th> Parameter <th> Default <th> Comments
* <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers.
@@ -84,317 +88,256 @@
* <tr><td> uniqueDests <td> true <td> Whether each receivers only listens to one ping destination or all.
* <tr><td> durableDests <td> false <td> Whether or not durable destinations are used.
* <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
- * 0 - SESSION_TRANSACTED
- * 1 - AUTO_ACKNOWLEDGE
- * 2 - CLIENT_ACKNOWLEDGE
- * 3 - DUPS_OK_ACKNOWLEDGE
- * 257 - NO_ACKNOWLEDGE
- * 258 - PRE_ACKNOWLEDGE
+ * 0 - SESSION_TRANSACTED
+ * 1 - AUTO_ACKNOWLEDGE
+ * 2 - CLIENT_ACKNOWLEDGE
+ * 3 - DUPS_OK_ACKNOWLEDGE
+ * 257 - NO_ACKNOWLEDGE
+ * 258 - PRE_ACKNOWLEDGE
* <tr><td> consTransacted <td> false <td> Whether or not consumers use transactions. Defaults to the same value
- * as the 'transacted' option if not seperately defined.
+ * as the 'transacted' option if not seperately defined.
* <tr><td> consAckMode <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same
- * value as 'ackMode' if not seperately defined.
+ * value as 'ackMode' if not seperately defined.
* <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received.
- * Limits the volume of messages currently buffered on the client
- * or broker. Can help scale test clients by limiting amount of buffered
- * data to avoid out of memory errors.
+ * Limits the volume of messages currently buffered on the client
+ * or broker. Can help scale test clients by limiting amount of buffered
+ * data to avoid out of memory errors.
* </table>
- * <p/>
+ *
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
* does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by
* starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also
* registered to terminate the ping-pong loop cleanly.
- * <p/>
+ *
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Provide a ping and wait for all responses cycle.
* <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
* </table>
*
- * @todo Make the message listener a static for all replies to be sent to? It won't be any more of a bottle neck than having
- * one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process messages
- * concurrently for different ids. Needs to be static so that when using a chained message listener and shared
- * destinations between multiple PPPs, it gets notified about all replies, not just those that happen to be picked up
- * by the PPP that it is atteched to.
* @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair.
- * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a
- * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last
- * message waits until all other messages have been handled before releasing producers but allows messages to be
- * processed concurrently, unlike the current synchronized block.
- * @todo Get rid of pauses between batches, it will impact the timing statistics, and generate meanigless timings.
- * Instead make mina use a bounded blocking buffer, or other form of back pressure, to stop data being written
- * faster than it can be sent.
+ * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a
+ * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last
+ * message waits until all other messages have been handled before releasing producers but allows messages to be
+ * processed concurrently, unlike the current synchronized block.
*/
-public class PingPongProducer implements Runnable /*, MessageListener*/, ExceptionListener
+public class PingPongProducer implements Runnable, ExceptionListener
{
- /**
- * Used for debugging.
- */
- private static final Logger _log = Logger.getLogger(PingPongProducer.class);
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(PingPongProducer.class);
- /**
- * Helds the factory name
- */
+ /** Holds the name of the property to determine whether of not client id is overridden at connection time. */
+ public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId";
+
+ /** Holds the default value of the override client id flag. */
+ public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false";
+
+ /** Holds the name of the property to define the JNDI factory name with. */
public static final String FACTORY_NAME_PROPNAME = "factoryName";
+
+ /** Holds the default JNDI name of the connection factory. */
public static final String FACTORY_NAME_DEAFULT = "local";
- /**
- * Helds the file properties name
- */
+ /** Holds the name of the property to set the JNDI initial context properties with. */
public static final String FILE_PROPERTIES_PROPNAME = "properties";
- public static final String FILE_PROPERTIES_DEAFULT = "/perftests.properties";
- /**
- * Holds the name of the property to get the test message size from.
- */
+ /** Holds the default file name of the JNDI initial context properties. */
+ public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties";
+
+ /** Holds the name of the property to get the test message size from. */
public static final String MESSAGE_SIZE_PROPNAME = "messageSize";
- /**
- * Used to set up a default message size.
- */
+ /** Used to set up a default message size. */
public static final int MESSAGE_SIZE_DEAFULT = 0;
- /**
- * Holds the name of the property to get the ping queue name from.
- */
+ /** Holds the name of the property to get the ping queue name from. */
public static final String PING_QUEUE_NAME_PROPNAME = "destinationName";
- /**
- * Holds the name of the default destination to send pings on.
- */
+ /** Holds the name of the default destination to send pings on. */
public static final String PING_QUEUE_NAME_DEFAULT = "ping";
- /**
- * Holds the name of the property to get the test delivery mode from.
- */
+ /** Holds the name of the property to get the queue name postfix from. */
+ public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix";
+
+ /** Holds the default queue name postfix value. */
+ public static final String QUEUE_NAME_POSTFIX_DEFAULT = "";
+
+ /** Holds the name of the property to get the test delivery mode from. */
public static final String PERSISTENT_MODE_PROPNAME = "persistent";
- /**
- * Holds the message delivery mode to use for the test.
- */
+ /** Holds the message delivery mode to use for the test. */
public static final boolean PERSISTENT_MODE_DEFAULT = false;
- /**
- * Holds the name of the property to get the test transactional mode from.
- */
+ /** Holds the name of the property to get the test transactional mode from. */
public static final String TRANSACTED_PROPNAME = "transacted";
- /**
- * Holds the transactional mode to use for the test.
- */
+ /** Holds the transactional mode to use for the test. */
public static final boolean TRANSACTED_DEFAULT = false;
+ /** Holds the name of the property to get the test consumer transacted mode from. */
public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";
+
+ /** Holds the consumer transactional mode default setting. */
public static final boolean CONSUMER_TRANSACTED_DEFAULT = false;
- /**
- * Holds the name of the property to get the message rate from.
- */
+ /** Holds the name of the property to get the test broker url from. */
+ public static final String BROKER_PROPNAME = "broker";
+
+ /** Holds the default broker url for the test. */
+ public static final String BROKER_DEFAULT = "tcp://localhost:5672";
+
+ /** Holds the name of the property to get the test broker virtual path. */
+ public static final String VIRTUAL_HOST_PROPNAME = "virtualHost";
+
+ /** Holds the default virtual path for the test. */
+ public static final String VIRTUAL_HOST_DEFAULT = "";
+
+ /** Holds the name of the property to get the message rate from. */
public static final String RATE_PROPNAME = "rate";
- /**
- * Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction.
- */
+ /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */
public static final int RATE_DEFAULT = 0;
- /**
- * Holds the name of the property to get the verbose mode proeprty from.
- */
+ /** Holds the name of the property to get the verbose mode proeprty from. */
public static final String VERBOSE_PROPNAME = "verbose";
- /**
- * Holds the default verbose mode.
- */
+ /** Holds the default verbose mode. */
public static final boolean VERBOSE_DEFAULT = false;
- /**
- * Holds the name of the property to get the p2p or pub/sub messaging mode from.
- */
+ /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */
public static final String PUBSUB_PROPNAME = "pubsub";
- /**
- * Holds the pub/sub mode default, true means ping a topic, false means ping a queue.
- */
+ /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
public static final boolean PUBSUB_DEFAULT = false;
- /**
- * Holds the name of the property to get the fail after commit flag from.
- */
+ /** Holds the name of the property to get the fail after commit flag from. */
public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit";
- /**
- * Holds the default failover after commit test flag.
- */
+ /** Holds the default failover after commit test flag. */
public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false;
- /**
- * Holds the name of the proeprty to get the fail before commit flag from.
- */
+ /** Holds the name of the proeprty to get the fail before commit flag from. */
public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit";
- /**
- * Holds the default failover before commit test flag.
- */
+ /** Holds the default failover before commit test flag. */
public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false;
- /**
- * Holds the name of the proeprty to get the fail after send flag from.
- */
+ /** Holds the name of the proeprty to get the fail after send flag from. */
public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend";
- /**
- * Holds the default failover after send test flag.
- */
+ /** Holds the default failover after send test flag. */
public static final boolean FAIL_AFTER_SEND_DEFAULT = false;
- /**
- * Holds the name of the property to get the fail before send flag from.
- */
+ /** Holds the name of the property to get the fail before send flag from. */
public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend";
- /**
- * Holds the default failover before send test flag.
- */
+ /** Holds the default failover before send test flag. */
public static final boolean FAIL_BEFORE_SEND_DEFAULT = false;
- /**
- * Holds the name of the property to get the fail once flag from.
- */
+ /** Holds the name of the property to get the fail once flag from. */
public static final String FAIL_ONCE_PROPNAME = "failOnce";
- /**
- * The default failover once flag, true means only do one failover, false means failover on every commit cycle.
- */
+ /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */
public static final boolean FAIL_ONCE_DEFAULT = true;
- /**
- * Holds the name of the property to get the broker access username from.
- */
+ /** Holds the name of the property to get the broker access username from. */
public static final String USERNAME_PROPNAME = "username";
- /**
- * Holds the default broker _log on username.
- */
+ /** Holds the default broker log on username. */
public static final String USERNAME_DEFAULT = "guest";
- /**
- * Holds the name of the property to get the broker access password from.
- */
+ /** Holds the name of the property to get the broker access password from. */
public static final String PASSWORD_PROPNAME = "password";
- /**
- * Holds the default broker _log on password.
- */
+ /** Holds the default broker log on password. */
public static final String PASSWORD_DEFAULT = "guest";
- /**
- * Holds the name of the proeprty to get the.
- */
+ /** Holds the name of the proeprty to get the. */
public static final String SELECTOR_PROPNAME = "selector";
- /**
- * Holds the default message selector.
- */
+ /** Holds the default message selector. */
public static final String SELECTOR_DEFAULT = "";
- /**
- * Holds the name of the property to get the destination count from.
- */
+ /** Holds the name of the property to get the destination count from. */
public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
- /**
- * Defines the default number of destinations to ping.
- */
+ /** Defines the default number of destinations to ping. */
public static final int DESTINATION_COUNT_DEFAULT = 1;
- /**
- * Holds the name of the property to get the number of consumers per destination from.
- */
+ /** Holds the name of the property to get the number of consumers per destination from. */
public static final String NUM_CONSUMERS_PROPNAME = "numConsumers";
- /**
- * Defines the default number consumers per destination.
- */
+ /** Defines the default number consumers per destination. */
public static final int NUM_CONSUMERS_DEFAULT = 1;
- /**
- * Holds the name of the property to get the waiting timeout for response messages.
- */
+ /** Holds the name of the property to get the waiting timeout for response messages. */
public static final String TIMEOUT_PROPNAME = "timeout";
- /**
- * Default time to wait before assuming that a ping has timed out.
- */
+ /** Default time to wait before assuming that a ping has timed out. */
public static final long TIMEOUT_DEFAULT = 30000;
- /**
- * Holds the name of the property to get the commit batch size from.
- */
+ /** Holds the name of the property to get the commit batch size from. */
public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize";
- /**
- * Defines the default number of pings to send in each transaction when running transactionally.
- */
+ /** Defines the default number of pings to send in each transaction when running transactionally. */
public static final int TX_BATCH_SIZE_DEFAULT = 1;
- /**
- * Holds the name of the property to get the unique destinations flag from.
- */
+ /** Holds the name of the property to get the unique destinations flag from. */
public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests";
- /**
- * Defines the default value for the unique destinations property.
- */
+ /** Defines the default value for the unique destinations property. */
public static final boolean UNIQUE_DESTS_DEFAULT = true;
- /**
- * Holds the name of the proeprty to get the message acknowledgement mode from.
- */
+ /** Holds the name of the property to get the durable destinations flag from. */
+ public static final String DURABLE_DESTS_PROPNAME = "durableDests";
+
+ /** Defines the default value of the durable destinations flag. */
+ public static final boolean DURABLE_DESTS_DEFAULT = false;
+
+ /** Holds the name of the proeprty to get the message acknowledgement mode from. */
public static final String ACK_MODE_PROPNAME = "ackMode";
- /**
- * Defines the default message acknowledgement mode.
- */
+ /** Defines the default message acknowledgement mode. */
public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ /** Holds the name of the property to get the consumers message acknowledgement mode from. */
public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";
+
+ /** Defines the default consumers message acknowledgement mode. */
public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ /** Holds the name of the property to get the maximum pending message size setting from. */
public static final String MAX_PENDING_PROPNAME = "maxPending";
+
+ /** Defines the default value for the maximum pending message size setting. 0 means no limit. */
public static final int MAX_PENDING_DEFAULT = 0;
- /**
- * Defines the default prefetch size to use when consuming messages.
- */
+ /** Defines the default prefetch size to use when consuming messages. */
public static final int PREFETCH_DEFAULT = 100;
- /**
- * Defines the default value of the no local flag to use when consuming messages.
- */
+ /** Defines the default value of the no local flag to use when consuming messages. */
public static final boolean NO_LOCAL_DEFAULT = false;
- /**
- * Defines the default value of the exclusive flag to use when consuming messages.
- */
+ /** Defines the default value of the exclusive flag to use when consuming messages. */
public static final boolean EXCLUSIVE_DEFAULT = false;
- /**
- * Holds the name of the property to store nanosecond timestamps in ping messages with.
- */
+ /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
- /**
- * Holds the default configuration properties.
- */
+ /** Holds the default configuration properties. */
public static ParsedProperties defaults = new ParsedProperties();
static
{
+ defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT);
defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT);
defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT);
+ defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT);
defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
+ defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+ defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT);
defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT);
@@ -405,6 +348,7 @@
defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
+ defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
@@ -418,92 +362,91 @@
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
}
+ /** Allows setting of client ID on the connection, rather than through the connection URL. */
+ protected boolean _overrideClientId;
+
+ /** Holds the JNDI name of the JMS connection factory. */
protected String _factoryName;
+
+ /** Holds the name of the properties file to configure JNDI with. */
protected String _fileProperties;
+
+ /** Holds the broker url. */
protected String _brokerDetails;
+
+ /** Holds the username to access the broker with. */
protected String _username;
+
+ /** Holds the password to access the broker with. */
protected String _password;
+
+ /** Holds the virtual host on the broker to run the tests through. */
protected String _virtualpath;
+
+ /** Holds the root name from which to generate test destination names. */
protected String _destinationName;
+
+ /** Holds the default queue name postfix value. */
+ protected String _queueNamePostfix;
+
+ /** Holds the message selector to filter the pings with. */
protected String _selector;
+
+ /** Holds the producers transactional mode flag. */
protected boolean _transacted;
+
+ /** Holds the consumers transactional mode flag. */
protected boolean _consTransacted;
- /**
- * Determines whether this producer sends persistent messages.
- */
+ /** Determines whether this producer sends persistent messages. */
protected boolean _persistent;
- /**
- * Holds the acknowledgement mode used for sending and receiving messages.
- */
+ /** Holds the acknowledgement mode used for the producers. */
protected int _ackMode;
+ /** Holds the acknowledgement mode setting for the consumers. */
protected int _consAckMode;
- /**
- * Determines what size of messages this producer sends.
- */
+ /** Determines what size of messages this producer sends. */
protected int _messageSize;
- /**
- * Used to indicate that the ping loop should print out whenever it pings.
- */
+ /** Used to indicate that the ping loop should print out whenever it pings. */
protected boolean _verbose;
- /**
- * Flag used to indicate if this is a point to point or pub/sub ping client.
- */
+ /** Flag used to indicate if this is a point to point or pub/sub ping client. */
protected boolean _isPubSub;
- /**
- * Flag used to indicate if the destinations should be unique client.
- */
+ /** Flag used to indicate if the destinations should be unique client. */
protected boolean _isUnique;
- /**
- * Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit.
- */
+ /** Flag used to indicate that durable destination should be used. */
+ protected boolean _isDurable;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
protected boolean _failBeforeCommit;
- /**
- * Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit.
- */
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
protected boolean _failAfterCommit;
- /**
- * Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send.
- */
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
protected boolean _failBeforeSend;
- /**
- * Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send.
- */
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
protected boolean _failAfterSend;
- /**
- * Flag used to indicate that failover prompting should only be done on the first commit, not on every commit.
- */
+ /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
protected boolean _failOnce;
- /**
- * Holds the number of sends that should be performed in every transaction when using transactions.
- */
+ /** Holds the number of sends that should be performed in every transaction when using transactions. */
protected int _txBatchSize;
- /**
- * Holds the number of destinations to ping.
- */
+ /** Holds the number of destinations to ping. */
protected int _noOfDestinations;
- /**
- * Holds the number of consumers per destination.
- */
+ /** Holds the number of consumers per destination. */
protected int _noOfConsumers;
- /**
- * Holds the maximum send rate in herz.
- */
+ /** Holds the maximum send rate in herz. */
protected int _rate;
/**
@@ -512,72 +455,47 @@
*/
protected int _maxPendingSize;
- /**
- * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
- * to wait until the number of unreceived message is reduced before continuing to send.
- */
- protected Object _sendPauseMonitor = new Object();
+ /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
+ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
- /**
- * Keeps a count of the number of message currently sent but not received.
- */
- protected AtomicInteger _unreceived = new AtomicInteger(0);
+ /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */
+ private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0);
- /**
- * A source for providing sequential unique correlation ids. These will be unique within the same JVM.
- */
- private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
+ /** Holds this instances unique id. */
+ private int instanceId;
/**
* Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple
* ping producers on the same JVM.
*/
private static Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
- /**
- * A convenient formatter to use when time stamping output.
- */
+ /** A convenient formatter to use when time stamping output. */
protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
- /**
- * Holds the connection to the broker.
- */
+ /** Holds the connection for the message producer. */
protected Connection _connection;
- /**
- * Holds the consumer connections.
- */
+ /** Holds the consumer connections. */
protected Connection[] _consumerConnection;
- /**
- * Holds the controlSession on which ping replies are received.
- */
+ /** Holds the controlSession on which ping replies are received. */
protected Session[] _consumerSession;
- /**
- * Holds the producer controlSession, needed to create ping messages.
- */
- protected Session _producerSession ;
+ /** Holds the producer controlSession, needed to create ping messages. */
+ protected Session _producerSession;
- /**
- * Holds the destination where the response messages will arrive.
- */
+ /** Holds the destination where the response messages will arrive. */
protected Destination _replyDestination;
- /**
- * Holds the set of destinations that this ping producer pings.
- */
+ /** Holds the set of destinations that this ping producer pings. */
protected List<Destination> _pingDestinations;
- /**
- * Used to restrict the sending rate to a specified limit.
- */
+ /** Used to restrict the sending rate to a specified limit. */
protected Throttle _rateLimiter;
- /**
- * Holds a message listener that this message listener chains all its messages to.
- */
+ /** Holds a message listener that this message listener chains all its messages to. */
protected ChainedMessageListener _chainedMessageListener = null;
/**
@@ -592,30 +510,34 @@
*/
protected AtomicInteger _queueSharedID = new AtomicInteger();
- /**
- * Used to tell the ping loop when to terminate, it only runs while this is true.
- */
+ /** Used to tell the ping loop when to terminate, it only runs while this is true. */
protected boolean _publish = true;
- /**
- * Holds the message producer to send the pings through.
- */
+ /** Holds the message producer to send the pings through. */
protected MessageProducer _producer;
- /**
- * Holds the message consumer to receive the ping replies through.
- */
+ /** Holds the message consumer to receive the ping replies through. */
protected MessageConsumer[] _consumer;
- /**
- * The prompt to display when asking the user to kill the broker for failover testing.
- */
+ /** The prompt to display when asking the user to kill the broker for failover testing. */
private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
+ /** Holds the name for this test client to be identified to the broker with. */
+ private String _clientID;
+
+ /** Keeps count of the total messages sent purely for debugging purposes. */
+ private static AtomicInteger numSent = new AtomicInteger();
+
/**
- * Keeps count of the total messages sent purely for debugging purposes.
+ * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
+ * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a
+ * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an
+ * equal chance to produce messages.
*/
- private static AtomicInteger numSent = new AtomicInteger();
+ static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true);
+
+ /** Keeps a count of the number of message currently sent but not received. */
+ static AtomicInteger _unreceived = new AtomicInteger(0);
/**
* Creates a ping producer with the specified parameters, of which there are many. See the class level comments
@@ -623,20 +545,28 @@
* it, to send and recieve its pings and replies on.
*
* @param overrides Properties containing any desired overrides to the defaults.
+ *
* @throws Exception Any exceptions are allowed to fall through.
*/
public PingPongProducer(Properties overrides) throws Exception
{
- // _log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
+ // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
+ instanceId = _instanceIdGenerator.getAndIncrement();
+
// Create a set of parsed properties from the defaults overriden by the passed in values.
ParsedProperties properties = new ParsedProperties(defaults);
properties.putAll(overrides);
+
// Extract the configuration properties to set the pinger up with.
+ _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME);
_factoryName = properties.getProperty(FACTORY_NAME_PROPNAME);
_fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME);
+ _brokerDetails = properties.getProperty(BROKER_PROPNAME);
_username = properties.getProperty(USERNAME_PROPNAME);
_password = properties.getProperty(PASSWORD_PROPNAME);
+ _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME);
_destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
+ _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME);
_selector = properties.getProperty(SELECTOR_PROPNAME);
_transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
_consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME);
@@ -654,20 +584,26 @@
_rate = properties.getPropertyAsInteger(RATE_PROPNAME);
_isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
_isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
- _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
- _consAckMode = properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
+ _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
+ _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
_maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
+
// Check that one or more destinations were specified.
if (_noOfDestinations < 1)
{
throw new IllegalArgumentException("There must be at least one destination.");
}
+
// Set up a throttle to control the send rate, if a rate > 0 is specified.
if (_rate > 0)
{
_rateLimiter = new BatchedThrottle();
_rateLimiter.setRate(_rate);
}
+
+ // Create the connection and message producers/consumers.
+ // establishConnection(true, true);
}
/**
@@ -676,15 +612,17 @@
*
* @param producer Flag to indicate whether or not the producer should be set up.
* @param consumer Flag to indicate whether or not the consumers should be set up.
+ *
* @throws Exception Any exceptions are allowed to fall through.
*/
public void establishConnection(boolean producer, boolean consumer) throws Exception
{
- // _log.debug("public void establishConnection(): called");
+ // log.debug("public void establishConnection(): called");
// Generate a unique identifying name for this client, based on it ip address and the current time.
InetAddress address = InetAddress.getLocalHost();
- String _clientID = address.getHostName() + System.currentTimeMillis();
+ // _clientID = address.getHostName() + System.currentTimeMillis();
+ _clientID = "perftest_" + instanceId;
// Create a connection to the broker.
createConnection(_clientID);
@@ -701,7 +639,7 @@
// Create the destinations to send pings to and receive replies from.
_replyDestination = _consumerSession[0].createTemporaryQueue();
- createPingDestinations(_noOfDestinations, _selector, _destinationName);
+ createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
// Create the message producer only if instructed to.
if (producer)
@@ -721,17 +659,30 @@
* created with.
*
* @param clientID The clients identifier.
- * @throws Exception Any underlying exceptions are allowed to fall through.
+ *
+ * @throws JMSException Underlying exceptions allowed to fall through.
+ * @throws NamingException Underlying exceptions allowed to fall through.
+ * @throws IOException Underlying exceptions allowed to fall through.
*/
- protected void createConnection(String clientID) throws Exception
+ protected void createConnection(String clientID) throws JMSException, NamingException, IOException
{
// _log.debug("protected void createConnection(String clientID = " + clientID + "): called");
// _log.debug("Creating a connection for the message producer.");
- Context context = InitialContextHelper.getInitialContext(_fileProperties);
+ File propsFile = new File(_fileProperties);
+ InputStream is = new FileInputStream(propsFile);
+ Properties properties = new Properties();
+ properties.load(is);
+
+ Context context = new InitialContext(properties);
ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName);
_connection = factory.createConnection(_username, _password);
+ if (_overrideClientId)
+ {
+ _connection.setClientID(clientID);
+ }
+
// _log.debug("Creating " + _noOfConsumers + " connections for the consumers.");
_consumerConnection = new Connection[_noOfConsumers];
@@ -739,6 +690,7 @@
for (int i = 0; i < _noOfConsumers; i++)
{
_consumerConnection[i] = factory.createConnection(_username, _password);
+ // _consumerConnection[i].setClientID(clientID);
}
}
@@ -752,8 +704,8 @@
{
try
{
- Properties options = CommandLineParser
- .processCommandLine(args, new CommandLineParser(new String[][]{}), System.getProperties());
+ Properties options =
+ CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
// Create a ping producer overriding its defaults with all options passed on the command line.
PingPongProducer pingProducer = new PingPongProducer(options);
@@ -776,7 +728,7 @@
catch (Exception e)
{
System.err.println(e.getMessage());
- _log.error("Top level handler caught execption.", e);
+ log.error("Top level handler caught execption.", e);
System.exit(1);
}
}
@@ -795,9 +747,7 @@
Thread.sleep(sleepTime);
}
catch (InterruptedException ie)
- {
- // do nothing
- }
+ { }
}
}
@@ -809,12 +759,12 @@
*/
public List<Destination> getReplyDestinations()
{
- // _log.debug("public List<Destination> getReplyDestinations(): called");
+ // log.debug("public List<Destination> getReplyDestinations(): called");
List<Destination> replyDestinations = new ArrayList<Destination>();
replyDestinations.add(_replyDestination);
- // _log.debug("replyDestinations = " + replyDestinations);
+ // log.debug("replyDestinations = " + replyDestinations);
return replyDestinations;
}
@@ -827,12 +777,12 @@
*/
public void createProducer() throws JMSException
{
- _producer = _producerSession.createProducer(null);
+ // log.debug("public void createProducer(): called");
+
+ _producer = (MessageProducer) _producerSession.createProducer(null);
_producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- if (_log.isDebugEnabled())
- {
- _log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
- }
+
+ // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
}
/**
@@ -842,43 +792,57 @@
* @param noOfDestinations The number of destinations to create consumers for.
* @param selector The message selector to filter the consumers with.
* @param rootName The root of the name, or actual name if only one is being created.
+ * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the
+ * numbering with all pingers on the same JVM.
+ * @param durable If the destinations are durable topics.
+ *
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
- public void createPingDestinations(int noOfDestinations, String selector, String rootName) throws JMSException
+ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
+ boolean durable) throws JMSException
{
- if (_log.isDebugEnabled())
- {
- _log.debug(
- "public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + selector + ", String rootName = " + rootName);
- }
+ /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
+ + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
+ + durable + "): called");*/
+
_pingDestinations = new ArrayList<Destination>();
+
// Create the desired number of ping destinations and consumers for them.
- if (_log.isDebugEnabled())
- {
- _log.debug("Creating " + noOfDestinations + " destinations to ping.");
- }
- String id;
+ // log.debug("Creating " + noOfDestinations + " destinations to ping.");
+
for (int i = 0; i < noOfDestinations; i++)
{
Destination destination;
- id = "_" + _queueSharedID.incrementAndGet();
+ String id;
+
+ // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
+ if (unique)
+ {
+ // log.debug("Creating unique destinations.");
+ id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
+ }
+ else
+ {
+ // log.debug("Creating shared destinations.");
+ id = "_" + _queueSharedID.incrementAndGet();
+ }
+
// Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
destination = _producerSession.createTopic(rootName + id);
- if (_log.isDebugEnabled())
+ // log.debug("Created non-durable topic " + destination);
+
+ if (durable)
{
- _log.debug("Created topic " + rootName + id);
+ _producerSession.createDurableSubscriber((Topic) destination, _connection.getClientID());
}
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
- destination = _producerSession.createQueue(rootName + id);
- if (_log.isDebugEnabled())
- {
- _log.debug("Created queue " + rootName + id);
- }
+ destination = _producerSession.createQueue(rootName + id + _queueNamePostfix);
+ // log.debug("Created queue " + destination);
}
// Keep the destination.
@@ -891,16 +855,17 @@
*
* @param destinations The destinations to listen to.
* @param selector A selector to filter the messages with.
+ *
* @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
*/
public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
{
- /*_log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ ", String selector = " + selector + "): called");*/
- // _log.debug("There are " + destinations.size() + " destinations.");
- // _log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
- // _log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers));
+ log.debug("There are " + destinations.size() + " destinations.");
+ log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
+ log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers));
for (Destination destination : destinations)
{
@@ -914,14 +879,14 @@
final int consumerNo = i;
_consumer[i].setMessageListener(new MessageListener()
- {
- public void onMessage(Message message)
{
- onMessageWithConsumerNo(message, consumerNo);
- }
- });
+ public void onMessage(Message message)
+ {
+ onMessageWithConsumerNo(message, consumerNo);
+ }
+ });
- // _log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
+ log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
}
}
}
@@ -931,28 +896,29 @@
* correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the
* replies map.
*
- * @param message The received message.
+ * @param message The received message.
+ * @param consumerNo The consumer number within this test pinger instance.
*/
public void onMessageWithConsumerNo(Message message, int consumerNo)
{
- // _log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called");
+ // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called");
try
{
long now = System.nanoTime();
long timestamp = getTimestamp(message);
long pingTime = now - timestamp;
- // NDC.push("cons" + consumerNo);
+ // NDC.push("id" + instanceId + "/cons" + consumerNo);
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
- // _log.debug("correlationID = " + correlationID);
+ // log.debug("correlationID = " + correlationID);
- int num = message.getIntProperty("MSG_NUM");
- // _log.info("Message " + num + " received.");
+ // int num = message.getIntProperty("MSG_NUM");
+ // log.info("Message " + num + " received.");
boolean isRedelivered = message.getJMSRedelivered();
- // _log.debug("isRedelivered = " + isRedelivered);
+ // log.debug("isRedelivered = " + isRedelivered);
if (!isRedelivered)
{
@@ -966,13 +932,34 @@
// Restart the timeout timer on every message.
perCorrelationId.timeOutStart = System.nanoTime();
- // _log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+ // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+
+ // Release waiting senders if there are some and using maxPending limit.
+ if ((_maxPendingSize > 0))
+ {
+ // Decrement the count of sent but not yet received messages.
+ int unreceived = _unreceived.decrementAndGet();
+ int unreceivedSize =
+ (unreceived * ((_messageSize == 0) ? 1 : _messageSize))
+ / (_isPubSub ? getConsumersPerDestination() : 1);
+
+ // log.debug("unreceived = " + unreceived);
+ // log.debug("unreceivedSize = " + unreceivedSize);
+
+ // synchronized (_sendPauseMonitor)
+ // {
+ if (unreceivedSize < _maxPendingSize)
+ {
+ _sendPauseMonitor.poll();
+ }
+ // }
+ }
// Decrement the countdown latch. Before this point, it is possible that two threads might enter this
// method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
// ensures that each thread will get a unique value for the remaining messages.
- long trueCount = -1;
- long remainingCount = -1;
+ long trueCount;
+ long remainingCount;
synchronized (trafficLight)
{
@@ -981,51 +968,33 @@
trueCount = trafficLight.getCount();
remainingCount = trueCount - 1;
- // Decrement the count of sent but not yet received messages.
- int unreceived = _unreceived.decrementAndGet();
- int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
-
- // Release a waiting sender if there is one.
- synchronized (_sendPauseMonitor)
- {
- if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
- // && (_sendPauseBarrier.getNumberWaiting() == 1))
- {
- // _log.debug("unreceived size estimate under limit = " + unreceivedSize);
-
- // Wait on the send pause barrier for the limit to be re-established.
- /*try
- {*/
- // _sendPauseBarrier.await();
- _sendPauseMonitor.notify();
- /*}
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
- }*/
- }
- }
-
// NDC.push("/rem" + remainingCount);
- // _log.debug("remainingCount = " + remainingCount);
- // _log.debug("trueCount = " + trueCount);
+ // log.debug("remainingCount = " + remainingCount);
+ // log.debug("trueCount = " + trueCount);
- // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
- // blocked, even on the last message.
+ // Commit on transaction batch size boundaries. At this point in time the waiting producer
+ // remains blocked, even on the last message.
// Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on
// each batch boundary. For pub/sub each consumer gets every message so no division is done.
+ // When running in client ack mode, an ack is done instead of a commit, on the commit batch
+ // size boundaries.
long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
- // _log.debug("commitCount = " + commitCount);
+ // log.debug("commitCount = " + commitCount);
if ((commitCount % _txBatchSize) == 0)
{
- // _log.debug("Trying commit for consumer " + consumerNo + ".");
- commitTx(_consumerSession[consumerNo]);
+ if (_consAckMode == 2)
+ {
+ // log.debug("Doing client ack for consumer " + consumerNo + ".");
+ message.acknowledge();
+ }
+ else
+ {
+ // log.debug("Trying commit for consumer " + consumerNo + ".");
+ commitTx(_consumerSession[consumerNo]);
+ // log.info("Tx committed on consumer " + consumerNo);
+ }
}
// Forward the message and remaining count to any interested chained message listener.
@@ -1044,33 +1013,21 @@
}
else
{
- _log.warn("Got unexpected message with correlationId: " + correlationID);
+ log.warn("Got unexpected message with correlationId: " + correlationID);
}
}
else
{
- _log.warn("Got redelivered message, ignoring.");
+ log.warn("Got redelivered message, ignoring.");
}
-
- // Print out ping times for every message in verbose mode only.
- /*if (_verbose)
- {
- Long timestamp = message.getLongProperty(MESSAGE_TIMESTAMP_PROPNAME);
-
- if (timestamp != null)
- {
- long diff = System.nanoTime() - timestamp;
- //_log.trace("Time for round trip (nanos): " + diff);
- }
- }*/
}
catch (JMSException e)
{
- _log.warn("There was a JMSException: " + e.getMessage(), e);
+ log.warn("There was a JMSException: " + e.getMessage(), e);
}
finally
{
- // _log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending");
+ // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending");
// NDC.clear();
}
}
@@ -1084,15 +1041,17 @@
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
* @param messageCorrelationId The message correlation id. If this is null, one is generated.
+ *
* @return The number of replies received. This may be less than the number sent if the timeout terminated the wait
* for all prematurely.
+ *
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
* @throws InterruptedException When interrupted by a timeout
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
+ throws JMSException, InterruptedException
{
- /*_log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
// Generate a unique correlation id to put on the messages before sending them, if one was not specified.
@@ -1122,9 +1081,9 @@
// Send the specifed number of messages.
pingNoWaitForReply(message, numPings, messageCorrelationId);
- boolean timedOut = false;
- boolean allMessagesReceived = false;
- int numReplies = 0;
+ boolean timedOut;
+ boolean allMessagesReceived;
+ int numReplies;
do
{
@@ -1136,31 +1095,31 @@
allMessagesReceived = numReplies == getExpectedNumPings(numPings);
- // _log.debug("numReplies = " + numReplies);
- // _log.debug("allMessagesReceived = " + allMessagesReceived);
+ // log.debug("numReplies = " + numReplies);
+ // log.debug("allMessagesReceived = " + allMessagesReceived);
// Recheck the timeout condition.
long now = System.nanoTime();
long lastMessageReceievedAt = perCorrelationId.timeOutStart;
timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000);
- // _log.debug("now = " + now);
- // _log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
+ // log.debug("now = " + now);
+ // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
}
while (!timedOut && !allMessagesReceived);
if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
{
- _log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
}
else if (_verbose)
{
- _log.info("Got all replies on id, " + messageCorrelationId);
+ log.info("Got all replies on id, " + messageCorrelationId);
}
// commitTx(_consumerSession);
- // _log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
+ // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
return numReplies;
}
@@ -1179,11 +1138,12 @@
* @param message The message to send.
* @param numPings The number of pings to send.
* @param messageCorrelationId A correlation id to place on all messages sent.
+ *
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
{
- /*_log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
if (message == null)
@@ -1201,9 +1161,6 @@
// Send all of the ping messages.
for (int i = 0; i < numPings; i++)
{
- // Reset the committed flag to indicate that there may be uncommitted messages.
- committed = false;
-
// Re-timestamp the message.
// message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
@@ -1213,7 +1170,7 @@
// Spew out per message timings on every message sonly in verbose mode.
/*if (_verbose)
{
- _log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
+ log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
}*/
}
@@ -1232,104 +1189,168 @@
*
* @param i The count of messages sent so far in a loop of multiple calls to this send method.
* @param message The message to send.
+ *
* @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise.
+ *
* @throws JMSException All underlyiung JMSExceptions are allowed to fall through.
*/
protected boolean sendMessage(int i, Message message) throws JMSException
{
- // _log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
- // _log.debug("_txBatchSize = " + _txBatchSize);
+ try
+ {
+ NDC.push("id" + instanceId + "/prod");
- // Round robin the destinations as the messages are sent.
- Destination destination = _pingDestinations.get(i % _pingDestinations.size());
+ // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
+ // log.debug("_txBatchSize = " + _txBatchSize);
- // Prompt the user to kill the broker when doing failover testing.
- if (_failBeforeSend)
- {
- if (_failOnce)
- {
- _failBeforeSend = false;
- }
+ // Round robin the destinations as the messages are sent.
+ Destination destination = _pingDestinations.get(i % _pingDestinations.size());
- // _log.trace("Failing Before Send");
- waitForUser(KILL_BROKER_PROMPT);
- }
+ // Prompt the user to kill the broker when doing failover testing.
+ _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
- // If necessary, wait until the max pending message size comes within its limit.
- synchronized (_sendPauseMonitor)
- {
- while ((_maxPendingSize > 0))
- {
- // Get the size estimate of sent but not yet received messages.
- int unreceived = _unreceived.get();
- int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
+ // Get the test setup for the correlation id.
+ String correlationID = message.getJMSCorrelationID();
+ PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
- if (unreceivedSize > _maxPendingSize)
+ // If necessary, wait until the max pending message size comes within its limit.
+ if (_maxPendingSize > 0)
+ {
+ synchronized (_sendPauseMonitor)
{
- // _log.debug("unreceived size estimate over limit = " + unreceivedSize);
+ // Used to keep track of the number of times that send has to wait.
+ int numWaits = 0;
- // Wait on the send pause barrier for the limit to be re-established.
- try
- {
- // _sendPauseBarrier.await();
- _sendPauseMonitor.wait(1000);
- }
- catch (InterruptedException e)
+ // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with
+ // the test timeout.
+ int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
+
+ while (true)
{
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ // Get the size estimate of sent but not yet received messages.
+ int unreceived = _unreceived.get();
+ int unreceivedSize =
+ (unreceived * ((_messageSize == 0) ? 1 : _messageSize))
+ / (_isPubSub ? getConsumersPerDestination() : 1);
+
+ // log.debug("unreceived = " + unreceived);
+ // log.debug("unreceivedSize = " + unreceivedSize);
+ // log.debug("_maxPendingSize = " + _maxPendingSize);
+
+ if (unreceivedSize > _maxPendingSize)
+ {
+ // log.debug("unreceived size estimate over limit = " + unreceivedSize);
+
+ // Fail the test if the send has had to wait more than the maximum allowed number of times.
+ if (numWaits > waitLimit)
+ {
+ String errorMessage =
+ "Send has had to wait for the unreceivedSize (" + unreceivedSize
+ + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit
+ + " times.";
+ log.warn(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
+
+ // Wait on the send pause barrier for the limit to be re-established.
+ try
+ {
+ long start = System.nanoTime();
+ // _sendPauseMonitor.wait(10000);
+ _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS);
+ long end = System.nanoTime();
+
+ // Count the wait only if it was for > 99% of the requested wait time.
+ if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99)
+ {
+ numWaits++;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ else
+ {
+ break;
+ }
}
- /*catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
- }*/
- }
- else
- {
- break;
}
}
- }
- // Send the message either to its round robin destination, or its default destination.
- if (destination == null)
- {
- int num = numSent.incrementAndGet();
- message.setIntProperty("MSG_NUM", num);
+ // Send the message either to its round robin destination, or its default destination.
+ // int num = numSent.incrementAndGet();
+ // message.setIntProperty("MSG_NUM", num);
setTimestamp(message);
- _producer.send(message);
- // _log.info("Message " + num + " sent.");
- }
- else
- {
- int num = numSent.incrementAndGet();
- message.setIntProperty("MSG_NUM", num);
- setTimestamp(message);
- _producer.send(destination, message);
- // _log.info("Message " + num + " sent.");
- }
- // Increase the unreceived size, this may actually happen aftern the message is recevied.
- _unreceived.getAndIncrement();
+ if (destination == null)
+ {
+ _producer.send(message);
+ }
+ else
+ {
+ _producer.send(destination, message);
+ }
+
+ // Increase the unreceived size, this may actually happen after the message is received.
+ // The unreceived size is incremented by the number of consumers that will get a copy of the message,
+ // in pub/sub mode.
+ if (_maxPendingSize > 0)
+ {
+ int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
+ // log.debug("newUnreceivedCount = " + newUnreceivedCount);
+ }
- // Apply message rate throttling if a rate limit has been set up.
- if (_rateLimiter != null)
+ // Apply message rate throttling if a rate limit has been set up.
+ if (_rateLimiter != null)
+ {
+ _rateLimiter.throttle();
+ }
+
+ // Call commit every time the commit batch size is reached.
+ boolean committed = false;
+
+ // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
+ if (((i + 1) % _txBatchSize) == 0)
+ {
+ // log.debug("Trying commit on producer session.");
+ committed = commitTx(_producerSession);
+ }
+
+ return committed;
+ }
+ finally
{
- _rateLimiter.throttle();
+ NDC.clear();
}
+ }
- // Call commit every time the commit batch size is reached.
- boolean committed = false;
-
- // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
- if (((i + 1) % _txBatchSize) == 0)
+ /**
+ * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the
+ * test that the failure has occurred, before the method returns.
+ *
+ * @param failFlag The fail flag to test.
+ *
+ * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only
+ * used once, then reset.
+ */
+ private boolean waitForUserToPromptOnFailure(boolean failFlag)
+ {
+ if (failFlag)
{
- // _log.debug("Trying commit on producer session.");
- committed = commitTx(_producerSession);
+ if (_failOnce)
+ {
+ failFlag = false;
+ }
+
+ // log.debug("Failing Before Send");
+ waitForUser(KILL_BROKER_PROMPT);
}
- return committed;
+ return failFlag;
}
/**
@@ -1351,12 +1372,12 @@
catch (JMSException e)
{
_publish = false;
- // _log.debug("There was a JMSException: " + e.getMessage(), e);
+ // log.debug("There was a JMSException: " + e.getMessage(), e);
}
catch (InterruptedException e)
{
_publish = false;
- // _log.debug("There was an interruption: " + e.getMessage(), e);
+ // log.debug("There was an interruption: " + e.getMessage(), e);
}
}
@@ -1371,9 +1392,7 @@
_chainedMessageListener = messageListener;
}
- /**
- * Removes any chained message listeners from this pinger.
- */
+ /** Removes any chained message listeners from this pinger. */
public void removeChainedMessageListener()
{
_chainedMessageListener = null;
@@ -1385,32 +1404,61 @@
* @param replyQueue The reply-to destination for the message.
* @param messageSize The desired size of the message in bytes.
* @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
+ *
* @return A freshly generated test message.
+ *
* @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
*/
public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
{
- ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
-
- // Timestamp the message in nanoseconds.
-
- // setTimestamp(msg);
-
- return msg;
+ // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
+ return TestUtils.createTestMessageOfSize(_producerSession, messageSize);
}
+ /**
+ * Sets the current time in nanoseconds as the timestamp on the message.
+ *
+ * @param msg The message to timestamp.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
protected void setTimestamp(Message msg) throws JMSException
{
+ /*if (((AMQSession)_producerSession).isStrictAMQP())
+ {
+ ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime());
+ }
+ else
+ {*/
msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+ // }
}
+ /**
+ * Extracts the nanosecond timestamp from a message.
+ *
+ * @param msg The message to extract the time stamp from.
+ *
+ * @return The timestamp in nanos.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
protected long getTimestamp(Message msg) throws JMSException
{
+ /*if (((AMQSession)_producerSession).isStrictAMQP())
+ {
+ Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));
+
+ return (value == null) ? 0L : value;
+ }
+ else
+ {*/
return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
+ // }
}
/**
- * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag
+ * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag
* has been cleared.
*/
public void stop()
@@ -1418,19 +1466,26 @@
_publish = false;
}
+ /**
+ * Starts the producer and consumer connections.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
public void start() throws JMSException
{
+ // log.debug("public void start(): called");
+
_connection.start();
+ // log.debug("Producer started.");
for (int i = 0; i < _noOfConsumers; i++)
{
_consumerConnection[i].start();
+ // log.debug("Consumer " + i + " started.");
}
}
- /**
- * Implements a ping loop that repeatedly pings until the publish flag becomes false.
- */
+ /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
public void run()
{
// Keep running until the publish flag is cleared.
@@ -1448,7 +1503,7 @@
*/
public void onException(JMSException e)
{
- // _log.debug("public void onException(JMSException e = " + e + "): called", e);
+ // log.debug("public void onException(JMSException e = " + e + "): called", e);
_publish = false;
}
@@ -1461,37 +1516,39 @@
public Thread getShutdownHook()
{
return new Thread(new Runnable()
- {
- public void run()
- {
- stop();
- }
- });
+ {
+ public void run()
+ {
+ stop();
+ }
+ });
}
/**
- * Closes the pingers connection.
+ * Closes all of the producer and consumer connections.
*
* @throws JMSException All JMSException are allowed to fall through.
*/
public void close() throws JMSException
{
- // _log.debug("public void close(): called");
+ // log.debug("public void close(): called");
try
{
if (_connection != null)
{
+ // log.debug("Before close producer connection.");
_connection.close();
- // _log.debug("Close connection.");
+ // log.debug("Closed producer connection.");
}
for (int i = 0; i < _noOfConsumers; i++)
{
if (_consumerConnection[i] != null)
{
+ // log.debug("Before close consumer connection " + i + ".");
_consumerConnection[i].close();
- // _log.debug("Closed consumer connection.");
+ // log.debug("Closed consumer connection " + i + ".");
}
}
}
@@ -1511,86 +1568,61 @@
/**
* Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a
* transactional controlSession, this method does nothing (unless the failover after send flag is set).
- * <p/>
+ *
* <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is
* applied. This flag applies whether the pinger is transactional or not.
- * <p/>
+ *
* <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit
* is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the
* commit is applied. These flags will only apply if using a transactional pinger.
*
* @param session The controlSession to commit
+ *
* @return <tt>true</tt> if the controlSession was committed, <tt>false</tt> if it was not.
+ *
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ *
* @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
* method, because commits only apply to transactional pingers, but fail after send applied to transactional and
* non-transactional alike.
*/
protected boolean commitTx(Session session) throws JMSException
{
- // _log.debug("protected void commitTx(Session session): called");
+ // log.debug("protected void commitTx(Session session): called");
boolean committed = false;
- // _log.trace("Batch time reached");
- if (_failAfterSend)
- {
- // _log.trace("Batch size reached");
- if (_failOnce)
- {
- _failAfterSend = false;
- }
-
- // _log.trace("Failing After Send");
- waitForUser(KILL_BROKER_PROMPT);
- }
+ _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend);
if (session.getTransacted())
{
- // _log.debug("Session is transacted.");
+ // log.debug("Session is transacted.");
try
{
- if (_failBeforeCommit)
- {
- if (_failOnce)
- {
- _failBeforeCommit = false;
- }
-
- // _log.trace("Failing Before Commit");
- waitForUser(KILL_BROKER_PROMPT);
- }
+ _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit);
long start = System.nanoTime();
session.commit();
committed = true;
- // _log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms");
+ // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms");
- if (_failAfterCommit)
- {
- if (_failOnce)
- {
- _failAfterCommit = false;
- }
+ _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit);
- // _log.trace("Failing After Commit");
- waitForUser(KILL_BROKER_PROMPT);
- }
-
- // _log.debug("Session Commited.");
+ // log.debug("Session Commited.");
}
catch (JMSException e)
{
- // _log.debug("JMSException on commit:" + e.getMessage(), e);
+ // log.debug("JMSException on commit:" + e.getMessage(), e);
+
try
{
session.rollback();
- // _log.debug("Message rolled back.");
+ // log.debug("Message rolled back.");
}
catch (JMSException jmse)
{
- // _log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
+ // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
// Both commit and rollback failed. Throw the rollback exception.
throw jmse;
@@ -1636,13 +1668,14 @@
* Calculates how many pings are expected to be received for the given number sent.
*
* @param numpings The number of pings that will be sent.
+ *
* @return The number that should be received, for the test to pass.
*/
public int getExpectedNumPings(int numpings)
{
- // _log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called");
+ // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called");
- // _log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers.");
+ // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers.");
return numpings * (_isPubSub ? getConsumersPerDestination() : 1);
}
@@ -1652,7 +1685,7 @@
* PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link
* PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of
* messages with that correlation id.
- * <p/>
+ *
* <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be
* given unique message counts. It will always be called while the producer waiting for all messages to arrive is
* still blocked.
@@ -1666,6 +1699,7 @@
* @param message The newly arrived message.
* @param remainingCount The number of messages left to complete the test.
* @param latency The nanosecond latency of the message.
+ *
* @throws JMSException Any JMS exceptions is allowed to fall through.
*/
public void onMessage(Message message, int remainingCount, long latency) throws JMSException;
@@ -1677,14 +1711,10 @@
*/
protected static class PerCorrelationId
{
- /**
- * Holds a countdown on number of expected messages.
- */
+ /** Holds a countdown on number of expected messages. */
CountDownLatch trafficLight;
- /**
- * Holds the last timestamp that the timeout was reset to.
- */
+ /** Holds the last timestamp that the timeout was reset to. */
Long timeOutStart;
}
}