You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/04/04 17:21:44 UTC

svn commit: r525536 - in /incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid: ping/PingDurableClient.java requestreply/PingPongProducer.java

Author: rgreig
Date: Wed Apr  4 08:21:43 2007
New Revision: 525536

URL: http://svn.apache.org/viewvc?view=rev&rev=525536
Log:
Fixed dangling transaction problem by correctly binding queue.

Modified:
    incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
    incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Modified: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?view=diff&rev=525536&r1=525535&r2=525536
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java (original)
+++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java Wed Apr  4 08:21:43 2007
@@ -35,6 +35,7 @@
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.requestreply.PingPongProducer;
+import org.apache.qpid.util.CommandLineParser;
 
 import uk.co.thebadgerset.junit.extensions.util.MathUtils;
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
@@ -71,6 +72,7 @@
  * <tr><td> uniqueDests      <td> false    <td> Prevents destination names being timestamped.
  * <tr><td> transacted       <td> true     <td> Only makes sense to test with transactions.
  * <tr><td> persistent       <td> true     <td> Only makes sense to test persistent.
+ * <tr><td> durableDests     <td> true     <td> Should use durable queues with persistent messages.
  * <tr><td> commitBatchSize  <td> 10
  * <tr><td> rate             <td> 20       <td> Total default test time is 5 seconds.
  * </table>
@@ -108,6 +110,7 @@
         defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
         defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
         defaults.setProperty(RATE_PROPNAME, "20");
+        defaults.setProperty(DURABLE_DESTS_PROPNAME, "true");
     }
 
     /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */
@@ -150,7 +153,7 @@
         try
         {
             // Create a ping producer overriding its defaults with all options passed on the command line.
-            Properties options = processCommandLine(args);
+            Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
             PingDurableClient pingProducer = new PingDurableClient(options);
 
             // Create a shutdown hook to terminate the ping-pong producer.

Modified: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=525536&r1=525535&r2=525536
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Wed Apr  4 08:21:43 2007
@@ -35,13 +35,10 @@
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.*;
 import org.apache.qpid.client.message.TestMessageFactory;
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jms.MessageProducer;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.url.URLSyntaxException;
@@ -90,6 +87,7 @@
  * <tr><td> timeout          <td> 30000    <td> In milliseconds. The timeout to stop waiting for replies.
  * <tr><td> commitBatchSize  <td> 1        <td> The number of messages per transaction in transactional mode.
  * <tr><td> uniqueDests      <td> true     <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> durableDests     <td> false    <td> Whether or not durable destinations are used.
  * <tr><td> ackMode          <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
  *                                               0 - SESSION_TRANSACTED
  *                                               1 - AUTO_ACKNOWLEDGE
@@ -257,6 +255,9 @@
     /** Defines the default value for the unique destinations property. */
     public static final boolean UNIQUE_DESTS_DEFAULT = true;
 
+    public static final String DURABLE_DESTS_PROPNAME = "durableDests";
+    public static final boolean DURABLE_DESTS_DEFAULT = false;
+
     /** Holds the name of the proeprty to get the message acknowledgement mode from. */
     public static final String ACK_MODE_PROPNAME = "ackMode";
 
@@ -299,6 +300,7 @@
         defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
         defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
         defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
+        defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
         defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
         defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
         defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
@@ -337,6 +339,9 @@
     /** Flag used to indicate if the destinations should be unique client. */
     protected boolean _isUnique;
 
+    /** Flag used to indicate that durable destination should be used. */
+    protected boolean _isDurable;
+
     /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
     protected boolean _failBeforeCommit;
 
@@ -424,6 +429,7 @@
 
     /** The prompt to display when asking the user to kill the broker for failover testing. */
     private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
+    private String _clientID;
 
     /**
      * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
@@ -463,6 +469,7 @@
         _rate = properties.getPropertyAsInteger(RATE_PROPNAME);
         _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
         _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
+        _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
         _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
         _pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME);
 
@@ -498,10 +505,10 @@
 
         // Generate a unique identifying name for this client, based on it ip address and the current time.
         InetAddress address = InetAddress.getLocalHost();
-        String clientID = address.getHostName() + System.currentTimeMillis();
+        _clientID = address.getHostName() + System.currentTimeMillis();
 
         // Create a connection to the broker.
-        createConnection(clientID);
+        createConnection(_clientID);
 
         // Create transactional or non-transactional sessions, based on the command line arguments.
         _producerSession = (Session) getConnection().createSession(_transacted, _ackMode);
@@ -509,7 +516,7 @@
 
         // Create the destinations to send pings to and receive replies from.
         _replyDestination = _consumerSession.createTemporaryQueue();
-        createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique);
+        createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
 
         // Create the message producer only if instructed to.
         if (producer)
@@ -548,7 +555,7 @@
     {
         try
         {
-            Properties options = processCommandLine(args);
+            Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
 
             // Create a ping producer overriding its defaults with all options passed on the command line.
             PingPongProducer pingProducer = new PingPongProducer(options);
@@ -577,43 +584,6 @@
     }
 
     /**
-     * Extracts all name=value pairs from the command line, sets them all as system properties and also returns
-     * a map of properties containing them.
-     *
-     * @param args The command line.
-     *
-     * @return A set of properties containing all name=value pairs from the command line.
-     *
-     * @todo This is a commonly used piece of code. Make it accept a command line definition and move it into the
-     *       CommandLineParser class.
-     */
-    protected static Properties processCommandLine(String[] args)
-    {
-        // Use the command line parser to evaluate the command line.
-        CommandLineParser commandLine = new CommandLineParser(new String[][] {});
-
-        // Capture the command line arguments or display errors and correct usage and then exit.
-        Properties options = null;
-
-        try
-        {
-            options = commandLine.parseCommandLine(args);
-
-            // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
-            // overridden values from there.
-            commandLine.addCommandLineToSysProperties();
-        }
-        catch (IllegalArgumentException e)
-        {
-            System.out.println(commandLine.getErrors());
-            System.out.println(commandLine.getUsage());
-            System.exit(1);
-        }
-
-        return options;
-    }
-
-    /**
      * Convenience method for a short pause.
      *
      * @param sleepTime The time in milliseconds to pause for.
@@ -677,11 +647,12 @@
      *
      * @throws JMSException Any JMSExceptions are allowed to fall through.
      */
-    public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
-        throws JMSException
+    public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
+        boolean durable) throws JMSException, AMQException
     {
         log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
-            + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + "): called");
+            + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
+            + durable + "): called");
 
         _pingDestinations = new ArrayList<Destination>();
 
@@ -709,13 +680,30 @@
             // Check if this is a pub/sub pinger, in which case create topics.
             if (_isPubSub)
             {
-                destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
-                log.debug("Created topic " + destination);
+                if (!durable)
+                {
+                    destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
+                    log.debug("Created non-durable topic " + destination);
+                }
+                else
+                {
+                    destination =
+                        AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
+                            _clientID, (AMQConnection) _connection);
+                    log.debug("Created durable topic " + destination);
+                }
             }
             // Otherwise this is a p2p pinger, in which case create queues.
             else
             {
-                destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
+                AMQShortString destinationName = new AMQShortString(rootName + id);
+                destination =
+                    new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false,
+                        _isDurable);
+                ((AMQSession) _producerSession).createQueue(destinationName, false, _isDurable, false);
+                ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null,
+                    ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+
                 log.debug("Created queue " + destination);
             }