You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by bh...@apache.org on 2007/03/01 18:57:25 UTC
svn commit: r513419 - in /incubator/qpid/branches/perftesting/qpid/java:
broker/etc/ broker/src/main/java/org/apache/qpid/server/queue/
perftests/src/main/java/org/apache/qpid/ping/
perftests/src/main/java/org/apache/qpid/requestreply/ perftests/src/te...
Author: bhupendrab
Date: Thu Mar 1 09:57:24 2007
New Revision: 513419
URL: http://svn.apache.org/viewvc?view=rev&rev=513419
Log:
QPID-395 : pause between batches of messages
QPID-367 : config updation for message alerts
Modified:
incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/etc/virtualhosts.xml Thu Mar 1 09:57:24 2007
@@ -22,10 +22,12 @@
<virtualhosts>
<virtualhost>
<name>localhost</name>
-
<localhost>
<minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
<maximumMessageCount>5000</maximumMessageCount>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge>
<queue>
<name>queue</name>
<queue>
@@ -47,11 +49,13 @@
</localhost>
</virtualhost>
<virtualhost>
- <name>development</name>
-
+ <name>development</name>
<development>
- <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
- <maximumMessageCount>5000</maximumMessageCount>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge>
<queue>
<name>queue</name>
<queue>
@@ -72,30 +76,32 @@
</queue>
</development>
</virtualhost>
- <virtualhost>
- <name>test</name>
-
- <test>
+ <virtualhost>
+ <name>test</name>
+ <test>
<minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
<maximumMessageCount>5000</maximumMessageCount>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge>
+ <queue>
+ <name>queue</name>
<queue>
- <name>queue</name>
- <queue>
- <exchange>amq.direct</exchange>
- <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
- <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
- <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
- </queue>
- </queue>
- <queue>
- <name>ping</name>
- <ping>
- <exchange>amq.direct</exchange>
- <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
- <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
- <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
- </ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
</queue>
- </test>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>22352</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>11176</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>6000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </test>
</virtualhost>
</virtualhosts>
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Mar 1 09:57:24 2007
@@ -79,7 +79,7 @@
/** max allowed number of messages on a queue. */
@Configured(path = "maximumMessageCount", defaultValue = "0")
- public int _maximumMessageCount;
+ public int _maximumMessageCount = 5000;
/** max queue depth for the queue */
@Configured(path = "maximumQueueDepth", defaultValue = "0")
@@ -89,7 +89,7 @@
* maximum message age before alerts occur
*/
@Configured(path = "maximumMessageAge", defaultValue = "0")
- public long _maximumMessageAge = 30000; //0
+ public long _maximumMessageAge = 30000;
/*
* the minimum interval between sending out consequetive alerts of the same type
@@ -506,10 +506,10 @@
protected void updateReceivedMessageCount(AMQMessage msg)
{
- if (msg.isRedelivered())
- return;
-
- _totalMessagesReceived++;
+ if (!msg.isRedelivered())
+ {
+ _totalMessagesReceived++;
+ }
_managedObject.checkForNotification(msg);
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Mar 1 09:57:24 2007
@@ -177,7 +177,8 @@
public Long getMaximumQueueDepth()
{
- return _queue.getMaximumQueueDepth();
+ long queueDepthInBytes = _queue.getMaximumQueueDepth();
+ return queueDepthInBytes >> 10;
}
public void setMaximumQueueDepth(Long value)
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java Thu Mar 1 09:57:24 2007
@@ -74,11 +74,11 @@
String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique,
- int ackMode) throws Exception
+ int ackMode, long pausetime) throws Exception
{
super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
- pubsub, unique, ackMode);
+ pubsub, unique, ackMode, pausetime);
_pingClientCount++;
}
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Thu Mar 1 09:57:24 2007
@@ -146,6 +146,8 @@
public static final String ACK_MODE_PROPNAME = "ackMode";
+ public static final String PAUSE_AFTER_BATCH_PROPNAME = "pausetimeAfterEachBatch";
+
/** Used to set up a default message size. */
public static final int DEFAULT_MESSAGE_SIZE = 0;
@@ -186,7 +188,7 @@
public static final String DEFAULT_BROKER = "tcp://localhost:5672";
/** Holds the default virtual path for the test. */
- public static final String DEFAULT_VIRTUAL_PATH = "test";
+ public static final String DEFAULT_VIRTUAL_PATH = "/test";
/** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
public static final boolean DEFAULT_PUBSUB = false;
@@ -314,6 +316,8 @@
/** Holds the number of sends that should be performed in every transaction when using transactions. */
protected int _txBatchSize = 1;
+ private static long _pausetimeAfterEachBatch = 0;
+
/**
* Holds the number of consumers that will be attached to each topic.
* Each pings will result in a reply from each of the attached clients
@@ -353,7 +357,7 @@
String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
- boolean pubsub, boolean unique, int ackMode) throws Exception
+ boolean pubsub, boolean unique, int ackMode, long pause) throws Exception
{
_logger.debug("public PingPongProducer(String brokerDetails = " + brokerDetails + ", String username = " + username
+ ", String password = " + password + ", String virtualpath = " + virtualpath
@@ -378,6 +382,7 @@
_txBatchSize = txBatchSize;
_isPubSub = pubsub;
_isUnique = unique;
+ _pausetimeAfterEachBatch = pause;
if (ackMode != 0)
{
_ackMode = ackMode;
@@ -435,7 +440,7 @@
}
String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = "/test";
+ String virtualpath = DEFAULT_VIRTUAL_PATH;
String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
boolean verbose = true;
boolean transacted = config.isTransacted();
@@ -495,7 +500,7 @@
PingPongProducer pingProducer =
new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0);
+ beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0, 0);
pingProducer.getConnection().start();
@@ -732,6 +737,12 @@
return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
}
+ public int pingAndWaitForReply(int numPings, long timeout, String messageCorrelationId)
+ throws JMSException, InterruptedException
+ {
+ return pingAndWaitForReply(null, numPings, timeout, messageCorrelationId);
+ }
+
/**
* Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
* before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
@@ -834,6 +845,11 @@
_logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ if (message == null)
+ {
+ message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
+ }
+
message.setJMSCorrelationID(messageCorrelationId);
// Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
@@ -865,6 +881,12 @@
{
commitTx(_producerSession);
committed = true;
+ /* This pause is required for some cases. eg in load testing when sessions are non-transacted the
+ Mina IO layer can't clear the cache in time. So this pause gives enough time for mina to clear
+ the cache (without this mina throws OutOfMemoryError). pause() will check if time is != 0
+ */
+ pause(_pausetimeAfterEachBatch);
+ //_logger.info("committed " + _txBatchSize + " " + i);
}
// Spew out per message timings on every message sonly in verbose mode.
@@ -909,10 +931,10 @@
}
}
- /*public Destination getReplyDestination()
+ public Destination getReplyDestination()
{
- return _replyDestination;
- }*/
+ return getReplyDestinations().get(0);
+ }
/**
* Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
@@ -1095,7 +1117,9 @@
doFailover();
}
+ long l = System.currentTimeMillis();
session.commit();
+ _logger.debug("Time taken to commit :" + (System.currentTimeMillis() - l) + " ms" );
if (_failAfterCommit)
{
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Thu Mar 1 09:57:24 2007
@@ -158,30 +158,16 @@
perCorrelationId._tc = tc;
perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
-
- // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
- // messages.
- //pingClient.setChainedMessageListener(batchedResultsListener);
-
- // Generate a sample message of the specified size.
- perThreadSetup._message =
- pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
-
// Send the requested number of messages, and wait until they have all been received.
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = pingClient.pingAndWaitForReply(perThreadSetup._message, numPings, timeout, perThreadSetup._correlationId);
+ int numReplies = pingClient.pingAndWaitForReply(numPings, timeout, perThreadSetup._correlationId);
// Check that all the replies were received and log a fail if they were not.
if (numReplies < perCorrelationId._expectedCount)
{
perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount);
}
-
- // Remove the chained message listener from the ping producer.
- //pingClient.removeChainedMessageListener();
// Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
perCorrelationIds.remove(perThreadSetup._correlationId);
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Thu Mar 1 09:57:24 2007
@@ -108,6 +108,7 @@
testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_PROPNAME, PingPongProducer.DEFAULT_UNIQUE);
testParameters.setSysPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
Integer.toString(PingPongProducer.DEFAULT_ACK_MODE));
+ testParameters.setSysPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, 0l);
}
/**
@@ -192,6 +193,7 @@
Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_PROPNAME);
int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
+ int pausetime = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
// Extract the test set up paramaeters.
int destinationscount =
@@ -206,7 +208,7 @@
selector, transacted, persistent, messageSize, verbose,
failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
failOnce, batchSize, destinationscount, rate, pubsub,
- unique, ackMode);
+ unique, ackMode, pausetime);
}
// Start the client connection
perThreadSetup._pingClient.getConnection().start();
@@ -256,7 +258,6 @@
* Holds the test ping client.
*/
protected PingClient _pingClient;
- protected Message _message;
protected String _correlationId;
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java?view=diff&rev=513419&r1=513418&r2=513419
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java Thu Mar 1 09:57:24 2007
@@ -113,6 +113,7 @@
ParsedProperties.setSysPropertyIfNull(PingPongProducer.UNIQUE_PROPNAME, Boolean.toString(PingPongProducer.DEFAULT_UNIQUE));
ParsedProperties.setSysPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
Integer.toString(PingPongProducer.DEFAULT_ACK_MODE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, 0l);
}
/**
@@ -191,6 +192,7 @@
Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_PROPNAME);
int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
+ long pause = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
synchronized (this)
{
@@ -209,7 +211,7 @@
messageSize, verbose, failAfterCommit,
failBeforeCommit, failAfterSend, failBeforeSend,
failOnce, batchSize, 0, rate, pubsub,
- unique, ackMode);
+ unique, ackMode, pause);
perThreadSetup._testPingProducer.getConnection().start();
}