You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/05 15:43:17 UTC
svn commit: r503703 [2/2] - in
/incubator/qpid/branches/perftesting/qpid/java: ./ broker/etc/
broker/src/main/java/org/apache/qpid/server/configuration/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/q...
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Mon Feb 5 06:43:14 2007
@@ -50,21 +50,21 @@
/**
* PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
* client (see {@link PingPongBouncer} for the bounce back client).
- *
+ * <p/>
* <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
* This means that this class has to do some work to correlate pings with pongs; it expectes the original message
* correlation id in the ping to be bounced back in the reply correlation id.
- *
+ * <p/>
* <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor.
* It can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings
* within transactions; control the number of pings to send in each transaction; limit its sending rate; and perform
* failover testing.
- *
+ * <p/>
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
* does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
* by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
* also registered to terminate the ping-pong loop cleanly.
- *
+ * <p/>
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Provide a ping and wait for all responses cycle.
@@ -72,25 +72,21 @@
* </table>
*
* @todo The use of a ping rate {@link #DEFAULT_RATE} and waits between pings {@link #DEFAULT_SLEEP_TIME} are overlapping.
- * Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to
- * be created and configured by the test runner from the -f command line option and made available through
- * the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware
- * tests.
- *
+ * Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to
+ * be created and configured by the test runner from the -f command line option and made available through
+ * the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware
+ * tests.
* @todo Make acknowledege mode a test option.
- *
* @todo Make the message listener a static for all replies to be sent to? It won't be any more of a bottle neck than
- * having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process
- * messages concurrently for different ids. Needs to be static so that when using a chained message listener and
- * shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to
- * be picked up by the PPP that it is atteched to.
- *
+ * having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process
+ * messages concurrently for different ids. Needs to be static so that when using a chained message listener and
+ * shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to
+ * be picked up by the PPP that it is atteched to.
* @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock
- * pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message
- * method add a block that obtains the write lock for the very last message, releases any waiting producer. Means
- * that the last message waits until all other messages have been handled before releasing producers but allows
- * messages to be processed concurrently, unlike the current synchronized block.
- *
+ * pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message
+ * method add a block that obtains the write lock for the very last message, releases any waiting producer. Means
+ * that the last message waits until all other messages have been handled before releasing producers but allows
+ * messages to be processed concurrently, unlike the current synchronized block.
* @todo Need to multiply up the number of expected messages for pubsub tests as each can be received by many consumers?
*/
public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
@@ -228,9 +224,6 @@
/** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
- /** A source for providing unique ids to PingPongProducer. */
- private static AtomicInteger _pingProducerIdGenerator;
-
/**
* Holds a map from message ids to latches on which threads wait for replies. This map is shared accross
* multiple ping producers on the same JVM.
@@ -238,7 +231,7 @@
/*private static Map<String, CountDownLatch> trafficLights =
Collections.synchronizedMap(new HashMap<String, CountDownLatch>());*/
private static Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
/** A convenient formatter to use when time stamping output. */
protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
@@ -273,6 +266,9 @@
/** Flag used to indicate if this is a point to point or pub/sub ping client. */
protected boolean _isPubSub = false;
+ /** Flag used to indicate if the destinations should be unique client. */
+ protected static boolean _isUnique = false;
+
/**
* This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
* on the same JVM using this id generator will allow them to ping on the same queues.
@@ -313,6 +309,12 @@
protected int _txBatchSize = 1;
/**
+ * Holds the number of consumers that will be attached to each topic.
+ * Each pings will result in a reply from each of the attached clients
+ */
+ static int _consumersPerTopic = 1;
+
+ /**
* Creates a ping producer with the specified parameters, of which there are many. See their individual comments
* for details. This constructor creates a connection to the broker and creates producer and consumer sessions on it,
* to send and recieve its pings and replies on. The other options are kept, and control how this pinger behaves.
@@ -339,7 +341,6 @@
* possible, with no rate restriction.
* @param pubsub True to ping topics, false to ping queues.
* @param unique True to use unique destinations for each ping pong producer, false to share.
- *
* @throws Exception Any exceptions are allowed to fall through.
*/
public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
@@ -358,6 +359,19 @@
+ txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate
+ ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + "): called");
+ // Keep all the relevant options.
+ _persistent = persistent;
+ _messageSize = messageSize;
+ _verbose = verbose;
+ _failAfterCommit = afterCommit;
+ _failBeforeCommit = beforeCommit;
+ _failAfterSend = afterSend;
+ _failBeforeSend = beforeSend;
+ _failOnce = failOnce;
+ _txBatchSize = txBatchSize;
+ _isPubSub = pubsub;
+ _isUnique = unique;
+
// Check that one or more destinations were specified.
if (noOfDestinations < 1)
{
@@ -388,18 +402,6 @@
createProducer();
createPingDestinations(noOfDestinations, selector, destinationName, unique);
createReplyConsumers(getReplyDestinations(), selector);
-
- // Keep all the remaining options.
- _persistent = persistent;
- _messageSize = messageSize;
- _verbose = verbose;
- _failAfterCommit = afterCommit;
- _failBeforeCommit = beforeCommit;
- _failAfterSend = afterSend;
- _failBeforeSend = beforeSend;
- _failOnce = failOnce;
- _txBatchSize = txBatchSize;
- _isPubSub = pubsub;
}
/**
@@ -407,6 +409,7 @@
* to be started to bounce the pings back again.
*
* @param args The command line arguments.
+ * @throws Exception When something went wrong with the test
*/
public static void main(String[] args) throws Exception
{
@@ -421,7 +424,7 @@
}
String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = "test";
+ String virtualpath = "/test";
String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
boolean verbose = true;
boolean transacted = config.isTransacted();
@@ -479,9 +482,9 @@
// Create a ping producer to handle the request/wait/reply cycle.
PingPongProducer pingProducer =
- new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
- transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, destCount, rate, pubsub, false);
+ new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
+ transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
+ beforeSend, failOnce, batchSize, destCount, rate, pubsub, false);
pingProducer.getConnection().start();
@@ -511,7 +514,9 @@
Thread.sleep(sleepTime);
}
catch (InterruptedException ie)
- { }
+ {
+ //ignore
+ }
}
}
@@ -555,11 +560,10 @@
* @param rootName The root of the name, or actual name if only one is being created.
* @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share
* the numbering with all pingers on the same JVM.
- *
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
- throws JMSException
+ throws JMSException
{
_logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations
+ ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = "
@@ -568,28 +572,32 @@
// Create the desired number of ping destinations and consumers for them.
for (int i = 0; i < noOfDestinations; i++)
{
- AMQDestination destination = null;
+ AMQDestination destination;
int id;
// Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
if (unique)
{
+ _logger.debug("Creating unique destinations.");
id = _queueJVMSequenceID.incrementAndGet();
}
else
{
+ _logger.debug("Creating shared destinations.");
id = _queueSharedId.incrementAndGet();
}
// Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
+ _logger.debug("Creating topics.");
destination = new AMQTopic(rootName + id);
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
+ _logger.debug("Creating queues.");
destination = new AMQQueue(rootName + id);
}
@@ -697,11 +705,10 @@
* @param message The message to send.
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
- *
* @return The number of replies received. This may be less than the number sent if the timeout terminated the
* wait for all prematurely.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ * @throws InterruptedException When interrupted by a timeout.
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
{
@@ -723,14 +730,13 @@
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
* @param messageCorrelationId The message correlation id.
- *
* @return The number of replies received. This may be less than the number sent if the timeout terminated the
* wait for all prematurely.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ * @throws InterruptedException When interrupted by a timeout
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
+ throws JMSException, InterruptedException
{
_logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
@@ -743,7 +749,8 @@
// chained message listener must be called before this sender can be unblocked, but that decrementing the
// countdown needs to be done before the chained listener can be called.
PerCorrelationId perCorrelationId = new PerCorrelationId();
- perCorrelationId.trafficLight = new CountDownLatch(numPings + 1);
+
+ perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1);
perCorrelationIds.put(messageCorrelationId, perCorrelationId);
// Set up the current time as the start time for pinging on the correlation id. This is used to determine
@@ -763,11 +770,12 @@
perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
// Work out how many replies were receieved.
- numReplies = numPings - (int) perCorrelationId.trafficLight.getCount();
- allMessagesReceived = numReplies >= numPings;
+ numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount();
+
+ allMessagesReceived = numReplies == getExpectedNumPings(numPings);
- _logger.debug("numReplies = "+ numReplies);
- _logger.debug("allMessagesReceived = "+ allMessagesReceived);
+ _logger.debug("numReplies = " + numReplies);
+ _logger.debug("allMessagesReceived = " + allMessagesReceived);
// Recheck the timeout condition.
long now = System.nanoTime();
@@ -779,7 +787,7 @@
}
while (!timedOut && !allMessagesReceived);
- if ((numReplies < numPings) && _verbose)
+ if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
{
_logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
}
@@ -808,7 +816,6 @@
* @param message The message to send.
* @param numPings The number of pings to send.
* @param messageCorrelationId A correlation id to place on all messages sent.
- *
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
@@ -864,9 +871,7 @@
}
}
- /**
- * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
- */
+ /** The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each. */
public void pingLoop()
{
try
@@ -909,9 +914,7 @@
_chainedMessageListener = messageListener;
}
- /**
- * Removes any chained message listeners from this pinger.
- */
+ /** Removes any chained message listeners from this pinger. */
public void removeChainedMessageListener()
{
_chainedMessageListener = null;
@@ -923,9 +926,7 @@
* @param replyQueue The reply-to destination for the message.
* @param messageSize The desired size of the message in bytes.
* @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
- *
* @return A freshly generated test message.
- *
* @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
*/
public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
@@ -947,9 +948,7 @@
_publish = false;
}
- /**
- * Implements a ping loop that repeatedly pings until the publish flag becomes false.
- */
+ /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
public void run()
{
// Keep running until the publish flag is cleared.
@@ -980,12 +979,12 @@
public Thread getShutdownHook()
{
return new Thread(new Runnable()
+ {
+ public void run()
{
- public void run()
- {
- stop();
- }
- });
+ stop();
+ }
+ });
}
/**
@@ -1003,7 +1002,6 @@
*
* @param destinations The destinations to listen to.
* @param selector A selector to filter the messages with.
- *
* @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
*/
public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
@@ -1015,8 +1013,8 @@
{
// Create a consumer for the destination and set this pinger to listen to its messages.
MessageConsumer consumer =
- _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
- selector);
+ _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
+ selector);
consumer.setMessageListener(this);
}
}
@@ -1039,19 +1037,20 @@
/**
* Convenience method to commit the transaction on the specified session. If the session to commit on is not
* a transactional session, this method does nothing (unless the failover after send flag is set).
- *
+ * <p/>
* <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit
* is applied. This flag applies whether the pinger is transactional or not.
- *
+ * <p/>
* <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
* commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
* after the commit is applied. These flags will only apply if using a transactional pinger.
*
+ * @param session The session to commit
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
- *
- * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
- * method, because commits only apply to transactional pingers, but fail after send applied to transactional
- * and non-transactional alike.
+ * <p/>
+ * //todo @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
+ * method, because commits only apply to transactional pingers, but fail after send applied to transactional
+ * and non-transactional alike.
*/
protected void commitTx(Session session) throws JMSException
{
@@ -1132,7 +1131,6 @@
*
* @param destination The destination to send to.
* @param message The message to send.
- *
* @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
*/
protected void sendMessage(Destination destination, Message message) throws JMSException
@@ -1170,17 +1168,35 @@
System.in.read();
}
catch (IOException e)
- { }
+ {
+ //ignore
+ }
System.out.println("Continuing.");
}
/**
+ * This value will be changed by PingClient to represent the number of clients connected to each topic.
+ *
+ * @return int The number of consumers subscribing to each topic.
+ */
+ public int getConsumersPerTopic()
+ {
+ return _consumersPerTopic;
+ }
+
+ public int getExpectedNumPings(int numpings)
+ {
+ return numpings * getConsumersPerTopic();
+ }
+
+
+ /**
* Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's
* {@link PingPongProducer#onMessage} method is called, the chained listener set through the
* {@link PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected
* count of messages with that correlation id.
- *
+ * <p/>
* Provided only one pinger is producing messages with that correlation id, the chained listener will always be
* given unique message counts. It will always be called while the producer waiting for all messages to arrive is
* still blocked.
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Mon Feb 5 06:43:14 2007
@@ -70,7 +70,7 @@
/** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */
private Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
/** Holds the batched results listener, that does logging on batch boundaries. */
private BatchedResultsListener batchedResultsListener = null;
@@ -91,6 +91,7 @@
/**
* Compile all the tests into a test suite.
+ * @return The test suite to run. Should only contain testAsyncPingOk method.
*/
public static Test suite()
{
@@ -128,6 +129,7 @@
* all replies have been received or a time out occurs before exiting this method.
*
* @param numPings The number of pings to send.
+ * @throws Exception pass all errors out to the test harness
*/
public void testAsyncPingOk(int numPings) throws Exception
{
@@ -151,7 +153,7 @@
PerCorrelationId perCorrelationId = new PerCorrelationId();
TimingController tc = getTimingController().getControllerForCurrentThread();
perCorrelationId._tc = tc;
- perCorrelationId._expectedCount = numPings;
+ perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
perCorrelationIds.put(messageCorrelationId, perCorrelationId);
// Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
@@ -160,18 +162,18 @@
// Generate a sample message of the specified size.
ObjectMessage msg =
- pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+ pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
// Send the requested number of messages, and wait until they have all been received.
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, messageCorrelationId);
// Check that all the replies were received and log a fail if they were not.
- if (numReplies < numPings)
+ if (numReplies < perCorrelationId._expectedCount)
{
- tc.completeTest(false, numPings - numReplies);
+ tc.completeTest(false, numPings - perCorrelationId._expectedCount);
}
// Remove the chained message listener from the ping producer.
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Mon Feb 5 06:43:14 2007
@@ -110,6 +110,7 @@
/**
* Compile all the tests into a test suite.
+ * @return The test method testPingOk.
*/
public static Test suite()
{
@@ -139,18 +140,18 @@
// Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
- perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(
- PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(
- PingPongProducer.PERSISTENT_MODE_PROPNAME));
+ perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(
+ PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(
+ PingPongProducer.PERSISTENT_MODE_PROPNAME));
// start the test
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout);
// Fail the test if the timeout was exceeded.
- if (numReplies != numPings)
+ if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings))
{
Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
+ numReplies);
@@ -191,7 +192,7 @@
// Extract the test set up paramaeters.
int destinationscount =
- Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
+ Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
// This is synchronized because there is a race condition, which causes one connection to sleep if
// all threads try to create connection concurrently.
Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java Mon Feb 5 06:43:14 2007
@@ -52,7 +52,7 @@
public ConcurrencyTest() throws Exception
{
- _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false,
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false,
new DefaultQueueRegistry()));
}
Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java Mon Feb 5 06:43:14 2007
@@ -21,10 +21,6 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.DeliveryManagerTest;
public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest
{
@@ -33,7 +29,7 @@
try
{
System.setProperty("concurrentdeliverymanager","true");
- _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
+ _mgr = new ConcurrentSelectorDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
new DefaultQueueRegistry()));
}
catch (Throwable t)
Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=diff&rev=503703&r1=503702&r2=503703
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java Mon Feb 5 06:43:14 2007
@@ -169,7 +169,6 @@
{
TestSuite suite = new TestSuite();
suite.addTestSuite(ConcurrentDeliveryManagerTest.class);
- suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
return suite;
}
}