You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/03/27 18:48:34 UTC

svn commit: r522994 [1/3] - in /incubator/qpid/branches/M2/java: broker/src/main/java/org/apache/log4j/ integrationtests/src/main/java/org/apache/qpid/util/ management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ perftests/src/main...

Author: rgreig
Date: Tue Mar 27 09:48:23 2007
New Revision: 522994

URL: http://svn.apache.org/viewvc?view=rev&rev=522994
Log:
Test added for durability of messages under broker failure.

Added:
    incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java
    incubator/qpid/branches/M2/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
    incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
    incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    incubator/qpid/branches/M2/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
    incubator/qpid/branches/M2/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
    incubator/qpid/branches/M2/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
    incubator/qpid/branches/M2/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java?view=diff&rev=522994&r1=522993&r2=522994
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java Tue Mar 27 09:48:23 2007
@@ -607,7 +607,7 @@
 
     /**
      * Sets initial conditions including date/time roll over information, first check, scheduledFilename, and calls
-     * <code>existingInit</code> to initialize the current # of backups.
+     * <code>existingInit</code> to establishConnection the current # of backups.
      */
     public void activateOptions()
     {

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java?view=diff&rev=522994&r1=522993&r2=522994
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java Tue Mar 27 09:48:23 2007
@@ -33,7 +33,7 @@
  *
  * <p/>In order to test whether a class implements an interface or extends a class, the class must be loaded (unless
  * the class files were to be scanned directly). Using this collector can cause problems when it scans the classpath,
- * because loading classes will initialize their statics, which in turn may cause undesired side effects. For this
+ * because loading classes will establishConnection their statics, which in turn may cause undesired side effects. For this
  * reason, the collector should always be used with a regular expression, through which the class file names are
  * filtered, and only those that pass this filter will be tested. For example, if you define tests in classes that
  * end with the keyword "Test" then use the regular expression "Test$" to match this.

Modified: incubator/qpid/branches/M2/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java?view=diff&rev=522994&r1=522993&r2=522994
==============================================================================
--- incubator/qpid/branches/M2/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java (original)
+++ incubator/qpid/branches/M2/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java Tue Mar 27 09:48:23 2007
@@ -895,7 +895,7 @@
         return selectedNode;
     }
 	/**
-     * This is a callback that will allow us to create the viewer and initialize
+     * This is a callback that will allow us to create the viewer and establishConnection
      * it.
      */
 	public void createPartControl(Composite parent)

Modified: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java?view=diff&rev=522994&r1=522993&r2=522994
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java (original)
+++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java Tue Mar 27 09:48:23 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.ping;
 
 import java.util.List;
+import java.util.Properties;
 
 import javax.jms.Destination;
 
@@ -31,54 +32,36 @@
  * to send replies to its pings. It simply listens to its own ping destinations, rather than seperate reply queues.
  * It is an all in one ping client, that produces and consumes its own pings.
  *
+ * <p/>The constructor increments a count of the number of ping clients created. It is assumed that where many
+ * are created they will all be run in parallel and be active in sending and consuming pings at the same time.
+ * If the unique destinations flag is not set and a pub/sub ping cycle is being run, this means that they will all hear
+ * pings sent by each other. The expected number of pings received will therefore be multiplied up by the number of
+ * active ping clients. The {@link #getConsumersPerTopic()} method is used to supply this multiplier under these
+ * conditions.
+ *
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create a ping pong producer that listens to its own pings <td> {@link PingPongProducer}
+ * <tr><td> Create a ping producer that listens to its own pings <td> {@link PingPongProducer}
+ * <tr><td> Count the number of ping producers and produce multiplier for scaling up messages expected over topic pings.
  * </table>
  */
 public class PingClient extends PingPongProducer
 {
+    /** Used to count the number of ping clients created. */
     private static int _pingClientCount;
 
     /**
-     * Creates a ping producer with the specified parameters, of which there are many. See their individual comments
-     * for details. This constructor creates ping pong producer but de-registers its reply-to destination message
-     * listener, and replaces it by listening to all of its ping destinations.
+     * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
+     * for {@link PingPongProducer} for details. This constructor creates a connection to the broker and creates
+     * producer and consumer sessions on it, to send and recieve its pings and replies on.
      *
-     * @param brokerDetails    The URL of the broker to send pings to.
-     * @param username         The username to log onto the broker with.
-     * @param password         The password to log onto the broker with.
-     * @param virtualpath      The virtual host name to use on the broker.
-     * @param destinationName  The name (or root where multiple destinations are used) of the desitination to send
-     *                         pings to.
-     * @param selector         The selector to filter replies with.
-     * @param transacted       Indicates whether or not pings are sent and received in transactions.
-     * @param persistent       Indicates whether pings are sent using peristent delivery.
-     * @param messageSize      Specifies the size of ping messages to send.
-     * @param verbose          Indicates that information should be printed to the console on every ping.
-     * @param afterCommit      Indicates that the user should be promted to terminate a broker after commits to test failover.
-     * @param beforeCommit     Indicates that the user should be promted to terminate a broker before commits to test failover.
-     * @param afterSend        Indicates that the user should be promted to terminate a broker after sends to test failover.
-     * @param beforeSend       Indicates that the user should be promted to terminate a broker before sends to test failover.
-     * @param failOnce         Indicates that the failover testing behaviour should only happen on the first commit, not all.
-     * @param txBatchSize      Specifies the number of pings to send in each transaction.
-     * @param noOfDestinations The number of destinations to ping. Must be 1 or more.
-     * @param rate             Specified the number of pings per second to send. Setting this to 0 means send as fast as
-     *                         possible, with no rate restriction.
-     * @param pubsub           True to ping topics, false to ping queues.
-     * @param unique           True to use unique destinations for each ping pong producer, false to share.
+     * @param  overrides Properties containing any desired overrides to the defaults.
      *
      * @throws Exception Any exceptions are allowed to fall through.
      */
-    public PingClient(String brokerDetails, String username, String password, String virtualpath, String destinationName,
-                      String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
-                      boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
-                      int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique,
-                      int ackMode, long pausetime) throws Exception
+    public PingClient(Properties overrides) throws Exception
     {
-        super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
-              verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
-              pubsub, unique, ackMode, pausetime);
+        super(overrides);
 
         _pingClientCount++;
     }
@@ -94,6 +77,11 @@
         return _pingDestinations;
     }
 
+    /**
+     * Supplies the multiplier for the number of ping clients that will hear each ping when doing pub/sub pinging.
+     *
+     * @return The scaling up of the number of expected pub/sub pings.
+     */
     public int getConsumersPerTopic()
     {
         if (_isUnique)

Added: incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?view=auto&rev=522994
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java (added)
+++ incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java Tue Mar 27 09:48:23 2007
@@ -0,0 +1,389 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.util.MathUtils;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and
+ * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop
+ * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the
+ * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with
+ * failure conditions when using durable messaging.
+ *
+ * <p/>The events that can stop it from sending are input from the user on the console, failure of its connection to
+ * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases
+ * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings
+ * with.
+ *
+ * <p/>The event to re-connect and attempt to recieve the pings is input from the user on the console.
+ *
+ * <p/>This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and
+ * additionally accepts the following parameters:
+ *
+ * <p/><table><caption>Parameters</caption>
+ * <tr><th> Parameter        <th> Default  <th> Comments
+ * <tr><td> numMessages      <th> 100      <th> The total number of messages to send.
+ * <tr><td> duration         <th> 30S      <th> The length of time to ping for. (Format dDhHmMsS, for d days, h hours,
+ *                                              m minutes and s seconds).
+ * </table>
+ *
+ * <p/>This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up
+ * when no parameters are specified.
+ *
+ * <p/><table><caption>Parameters</caption>
+ * <tr><th> Parameter        <th> Default  <th> Comments
+ * <tr><td> uniqueDests      <td> false    <td> Prevents destination names being timestamped.
+ * <tr><td> transacted       <td> true     <td> Only makes sense to test with transactions.
+ * <tr><td> persistent       <td> true     <td> Only makes sense to test persistent.
+ * <tr><td> commitBatchSize  <td> 10
+ * <tr><td> rate             <td> 20       <td> Total default test time is 5 seconds.
+ * </table>
+ *
+ * <p/>When a number of messages or duration is specified, this ping client will ping until the first of those limits
+ * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will
+ * wait for the second signal before receiving its pings.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Send and receive pings.
+ * <tr><td> Accept user input to signal stop sending.
+ * <tr><td> Accept user input to signal start receiving.
+ * <tr><td> Provide feedback on pings sent versus pings received.
+ * </table>
+ */
+public class PingDurableClient extends PingPongProducer implements ExceptionListener
+{
+    private static final Logger log = Logger.getLogger(PingDurableClient.class);
+
+    public static final String NUM_MESSAGES_PROPNAME = "numMessages";
+    public static final String NUM_MESSAGES_DEFAULT = "100";
+    public static final String DURATION_PROPNAME = "duration";
+    public static final String DURATION_DEFAULT = "30S";
+
+    /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */
+    private static final long TIME_OUT = 3000;
+
+    static
+    {
+        defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT);
+        defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT);
+        defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false");
+        defaults.setProperty(TRANSACTED_PROPNAME, "true");
+        defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
+        defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
+        defaults.setProperty(RATE_PROPNAME, "20");
+    }
+
+    /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */
+    private int numMessages;
+
+    /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */
+    private long duration;
+
+    /** Used to indciate that this application should terminate. Set by the shutdown hook. */
+    private boolean terminate = false;
+
+    /**
+     * @throws Exception Any exceptions are allowed to fall through.
+     */
+    public PingDurableClient(Properties overrides) throws Exception
+    {
+        super(overrides);
+        log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called");
+
+        // Extract the additional configuration parameters.
+        ParsedProperties properties = new ParsedProperties(defaults);
+        properties.putAll(overrides);
+
+        numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME);
+        String durationSpec = properties.getProperty(DURATION_PROPNAME);
+
+        if (durationSpec != null)
+        {
+            duration = MathUtils.parseDuration(durationSpec) * 1000000;
+        }
+    }
+
+    /**
+     * Starts the ping/wait/receive process.
+     *
+     * @param args The command line arguments.
+     */
+    public static void main(String[] args)
+    {
+        try
+        {
+            // Create a ping producer overriding its defaults with all options passed on the command line.
+            Properties options = processCommandLine(args);
+            PingDurableClient pingProducer = new PingDurableClient(options);
+
+            // Create a shutdown hook to terminate the ping-pong producer.
+            Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
+
+            // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+            // pingProducer.getConnection().setExceptionListener(pingProducer);
+
+            // Run the test procedure.
+            int sent = pingProducer.send();
+            pingProducer.waitForUser("Press return to begin receiving the pings.");
+            pingProducer.receive(sent);
+
+            System.exit(0);
+        }
+        catch (Exception e)
+        {
+            System.err.println(e.getMessage());
+            log.error("Top level handler caught execption.", e);
+            System.exit(1);
+        }
+    }
+
+    /**
+     * Performs the main test procedure implemented by this ping client. See the class level comment for details.
+     */
+    public int send() throws Exception
+    {
+        log.debug("public void sendWaitReceive(): called");
+
+        log.debug("duration = " + duration);
+        log.debug("numMessages = " + numMessages);
+
+        if (duration > 0)
+        {
+            System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds.");
+        }
+
+        if (_rate > 0)
+        {
+            System.out.println("Sending at " + _rate + " messages per second.");
+        }
+
+        if (numMessages > 0)
+        {
+            System.out.println("Sending up to " + numMessages + " messages.");
+        }
+
+        // Establish the connection and the message producer.
+        establishConnection(true, false);
+        getConnection().start();
+
+        Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
+
+        // Send pings until a terminating condition is received.
+        boolean endCondition = false;
+        int messagesSent = 0;
+        int messagesCommitted = 0;
+        int messagesNotCommitted = 0;
+        long start = System.nanoTime();
+
+        // Clear console in.
+        clearConsole();
+
+        while (!endCondition)
+        {
+            boolean committed = false;
+
+            try
+            {
+                committed = sendMessage(messagesSent, message) && _transacted;
+
+                messagesSent++;
+                messagesNotCommitted++;
+
+                // Keep count of the number of messsages currently committed and pending commit.
+                if (committed)
+                {
+                    log.debug("Adding " + messagesNotCommitted + " messages to the committed count.");
+                    messagesCommitted += messagesNotCommitted;
+                    messagesNotCommitted = 0;
+
+                    System.out.println("Commited: " + messagesCommitted);
+                }
+            }
+            catch (JMSException e)
+            {
+                log.debug("Got JMSException whilst sending.");
+                _publish = false;
+            }
+
+            // Determine if the end condition has been met, based on the number of messages, time passed, errors on
+            // the connection or user input.
+            long now = System.nanoTime();
+
+            if ((duration != 0) && ((now - start) > duration))
+            {
+                System.out.println("Send halted because duration expired.");
+                endCondition = true;
+            }
+            else if ((numMessages != 0) && (messagesSent >= numMessages))
+            {
+                System.out.println("Send halted because # messages completed.");
+                endCondition = true;
+            }
+            else if (System.in.available() > 0)
+            {
+                System.out.println("Send halted by user input.");
+                endCondition = true;
+
+                clearConsole();
+            }
+            else if (!_publish)
+            {
+                System.out.println("Send halted by error on the connection.");
+                endCondition = true;
+            }
+        }
+
+        log.debug("messagesSent = " + messagesSent);
+        log.debug("messagesCommitted = " + messagesCommitted);
+        log.debug("messagesNotCommitted = " + messagesNotCommitted);
+
+        System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted
+            + ", Messages not Committed = " + messagesNotCommitted);
+
+        // Clean up the connection.
+        try
+        {
+            close();
+        }
+        catch (JMSException e)
+        {
+            // Ignore as did best could manage to clean up.
+        }
+
+        return messagesSent;
+    }
+
+    private void receive(int messagesSent) throws Exception
+    {
+        // Re-establish the connection and the message consumer.
+        _queueJVMSequenceID = new AtomicInteger();
+        _queueSharedID = new AtomicInteger();
+
+        establishConnection(false, true);
+        _consumer.setMessageListener(null);
+        _connection.start();
+
+        // Try to receive all of the pings that were successfully sent.
+        int messagesReceived = 0;
+        boolean endCondition = false;
+
+        while (!endCondition)
+        {
+            // Message received = _consumer.receiveNoWait();
+            Message received = _consumer.receive(TIME_OUT);
+            log.debug("received = " + received);
+
+            if (received != null)
+            {
+                messagesReceived++;
+            }
+
+            // Determine if the end condition has been met, based on the number of messages and time passed since last
+            // receiving a message.
+            if (received == null)
+            {
+                System.out.println("Timed out.");
+                endCondition = true;
+            }
+            else if (messagesReceived >= messagesSent)
+            {
+                System.out.println("Got all messages.");
+                endCondition = true;
+            }
+        }
+
+        log.debug("messagesReceived = " + messagesReceived);
+
+        System.out.println("Messages received: " + messagesReceived);
+
+        // Clean up the connection.
+        close();
+    }
+
+    /**
+     * Clears any pending input from the console.
+     */
+    private void clearConsole()
+    {
+        try
+        {
+            BufferedReader bis = new BufferedReader(new InputStreamReader(System.in));
+
+            // System.in.skip(System.in.available());
+            while (bis.ready())
+            {
+                bis.readLine();
+            }
+        }
+        catch (IOException e)
+        { }
+    }
+
+    /**
+     * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the
+     * effect of making this pinger listen to its own pings.
+     *
+     * @return The ping destinations.
+     */
+    public List<Destination> getReplyDestinations()
+    {
+        return _pingDestinations;
+    }
+
+    /**
+     * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with
+     * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the
+     * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving
+     * message should stop, not that the application should termiante.
+     *
+     * @return A shutdown hook for the ping loop.
+     */
+    public Thread getShutdownHook()
+    {
+        return new Thread(new Runnable()
+                {
+                    public void run()
+                    {
+                        stop();
+                        terminate = true;
+                    }
+                });
+    }
+}