You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/04/04 17:21:44 UTC
svn commit: r525536 - in
/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid:
ping/PingDurableClient.java requestreply/PingPongProducer.java
Author: rgreig
Date: Wed Apr 4 08:21:43 2007
New Revision: 525536
URL: http://svn.apache.org/viewvc?view=rev&rev=525536
Log:
Fixed dangling transaction problem by correctly binding queue.
Modified:
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?view=diff&rev=525536&r1=525535&r2=525536
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java (original)
+++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java Wed Apr 4 08:21:43 2007
@@ -35,6 +35,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.requestreply.PingPongProducer;
+import org.apache.qpid.util.CommandLineParser;
import uk.co.thebadgerset.junit.extensions.util.MathUtils;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
@@ -71,6 +72,7 @@
* <tr><td> uniqueDests <td> false <td> Prevents destination names being timestamped.
* <tr><td> transacted <td> true <td> Only makes sense to test with transactions.
* <tr><td> persistent <td> true <td> Only makes sense to test persistent.
+ * <tr><td> durableDests <td> true <td> Should use durable queues with persistent messages.
* <tr><td> commitBatchSize <td> 10
* <tr><td> rate <td> 20 <td> Total default test time is 5 seconds.
* </table>
@@ -108,6 +110,7 @@
defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
defaults.setProperty(RATE_PROPNAME, "20");
+ defaults.setProperty(DURABLE_DESTS_PROPNAME, "true");
}
/** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */
@@ -150,7 +153,7 @@
try
{
// Create a ping producer overriding its defaults with all options passed on the command line.
- Properties options = processCommandLine(args);
+ Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
PingDurableClient pingProducer = new PingDurableClient(options);
// Create a shutdown hook to terminate the ping-pong producer.
Modified: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=525536&r1=525535&r2=525536
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Wed Apr 4 08:21:43 2007
@@ -35,13 +35,10 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.*;
import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.url.URLSyntaxException;
@@ -90,6 +87,7 @@
* <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
* <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
* <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> durableDests <td> false <td> Whether or not durable destinations are used.
* <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
* 0 - SESSION_TRANSACTED
* 1 - AUTO_ACKNOWLEDGE
@@ -257,6 +255,9 @@
/** Defines the default value for the unique destinations property. */
public static final boolean UNIQUE_DESTS_DEFAULT = true;
+ public static final String DURABLE_DESTS_PROPNAME = "durableDests";
+ public static final boolean DURABLE_DESTS_DEFAULT = false;
+
/** Holds the name of the proeprty to get the message acknowledgement mode from. */
public static final String ACK_MODE_PROPNAME = "ackMode";
@@ -299,6 +300,7 @@
defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
+ defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
@@ -337,6 +339,9 @@
/** Flag used to indicate if the destinations should be unique client. */
protected boolean _isUnique;
+ /** Flag used to indicate that durable destination should be used. */
+ protected boolean _isDurable;
+
/** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
protected boolean _failBeforeCommit;
@@ -424,6 +429,7 @@
/** The prompt to display when asking the user to kill the broker for failover testing. */
private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
+ private String _clientID;
/**
* Creates a ping producer with the specified parameters, of which there are many. See the class level comments
@@ -463,6 +469,7 @@
_rate = properties.getPropertyAsInteger(RATE_PROPNAME);
_isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
_isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
+ _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
_ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
_pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME);
@@ -498,10 +505,10 @@
// Generate a unique identifying name for this client, based on it ip address and the current time.
InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
+ _clientID = address.getHostName() + System.currentTimeMillis();
// Create a connection to the broker.
- createConnection(clientID);
+ createConnection(_clientID);
// Create transactional or non-transactional sessions, based on the command line arguments.
_producerSession = (Session) getConnection().createSession(_transacted, _ackMode);
@@ -509,7 +516,7 @@
// Create the destinations to send pings to and receive replies from.
_replyDestination = _consumerSession.createTemporaryQueue();
- createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique);
+ createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
// Create the message producer only if instructed to.
if (producer)
@@ -548,7 +555,7 @@
{
try
{
- Properties options = processCommandLine(args);
+ Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
// Create a ping producer overriding its defaults with all options passed on the command line.
PingPongProducer pingProducer = new PingPongProducer(options);
@@ -577,43 +584,6 @@
}
/**
- * Extracts all name=value pairs from the command line, sets them all as system properties and also returns
- * a map of properties containing them.
- *
- * @param args The command line.
- *
- * @return A set of properties containing all name=value pairs from the command line.
- *
- * @todo This is a commonly used piece of code. Make it accept a command line definition and move it into the
- * CommandLineParser class.
- */
- protected static Properties processCommandLine(String[] args)
- {
- // Use the command line parser to evaluate the command line.
- CommandLineParser commandLine = new CommandLineParser(new String[][] {});
-
- // Capture the command line arguments or display errors and correct usage and then exit.
- Properties options = null;
-
- try
- {
- options = commandLine.parseCommandLine(args);
-
- // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
- // overridden values from there.
- commandLine.addCommandLineToSysProperties();
- }
- catch (IllegalArgumentException e)
- {
- System.out.println(commandLine.getErrors());
- System.out.println(commandLine.getUsage());
- System.exit(1);
- }
-
- return options;
- }
-
- /**
* Convenience method for a short pause.
*
* @param sleepTime The time in milliseconds to pause for.
@@ -677,11 +647,12 @@
*
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
- public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
- throws JMSException
+ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
+ boolean durable) throws JMSException, AMQException
{
log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
- + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + "): called");
+ + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
+ + durable + "): called");
_pingDestinations = new ArrayList<Destination>();
@@ -709,13 +680,30 @@
// Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
- destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
- log.debug("Created topic " + destination);
+ if (!durable)
+ {
+ destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
+ log.debug("Created non-durable topic " + destination);
+ }
+ else
+ {
+ destination =
+ AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
+ _clientID, (AMQConnection) _connection);
+ log.debug("Created durable topic " + destination);
+ }
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
- destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
+ AMQShortString destinationName = new AMQShortString(rootName + id);
+ destination =
+ new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false,
+ _isDurable);
+ ((AMQSession) _producerSession).createQueue(destinationName, false, _isDurable, false);
+ ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null,
+ ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+
log.debug("Created queue " + destination);
}