You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/06/04 11:47:59 UTC

svn commit: r544109 - in /incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid: interop/coordinator/ interop/coordinator/testcases/pubsub/ interop/testclient/ interop/testclient/testcases/ sustained/

Author: ritchiem
Date: Mon Jun  4 02:47:53 2007
New Revision: 544109

URL: http://svn.apache.org/viewvc?view=rev&rev=544109
Log:
Addition of a sustained test client. This is currently setup for running a pub/sub test. 

The test allows for multiple clients to connect and participate in testing the broker throughput.

A single producer continually sends messages to a topic which the clients then send batched results back about.

The producer uses the timings in the reports to update the rate at which it sends messages. Ideally reaching a steady state where all messages produced are received by everyone within a specified time frame.

Added:
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java   (with props)
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java   (with props)
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/pubsub/
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java   (with props)
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java   (with props)
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java   (with props)
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java   (with props)
Modified:
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java?view=diff&rev=544109&r1=544108&r2=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java Mon Jun  4 02:47:53 2007
@@ -21,10 +21,7 @@
  */
 package org.apache.qpid.interop.coordinator;
 
-import java.util.Collection;
 import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.jms.*;
 
@@ -74,13 +71,13 @@
     private static final Logger log = Logger.getLogger(CoordinatingTestCase.class);
 
     /** Holds the contact details for the sending test client. */
-    TestClientDetails sender;
+    protected TestClientDetails sender;
 
     /** Holds the contact details for the receving test client. */
-    TestClientDetails receiver;
+    protected TestClientDetails receiver;
 
     /** Holds the conversation factory over which to coordinate the test. */
-    ConversationFactory conversationFactory;
+    protected ConversationFactory conversationFactory;
 
     /**
      * Creates a new coordinating test case with the specified name.

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java?view=diff&rev=544109&r1=544108&r2=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java Mon Jun  4 02:47:53 2007
@@ -65,28 +65,28 @@
     public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties";
 
     /** Holds the URL of the broker to coordinate the tests on. */
-    String brokerUrl;
+    protected String brokerUrl;
 
     /** Holds the virtual host to coordinate the tests on. If <tt>null</tt>, then the default virtual host is used. */
-    String virtualHost;
+    protected String virtualHost;
 
     /** Holds the list of all clients that enlisted, when the compulsory invite was issued. */
-    Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
+    protected Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
 
     /** Holds the conversation helper for the control conversation. */
-    private ConversationFactory conversationFactory;
+    protected ConversationFactory conversationFactory;
 
     /** Holds the connection that the coordinating messages are sent over. */
-    private Connection connection;
+    protected Connection connection;
 
     /**
      * Holds the name of the class of the test currently being run. Ideally passed into the {@link #createTestResult}
      * method, but as the signature is already fixed for this, the current value gets pushed here as a member variable.
      */
-    private String currentTestClassName;
+    protected String currentTestClassName;
 
     /** Holds the path of the directory to output test results too, if one is defined. */
-    private static String reportDir;
+    protected static String _reportDir;
 
     /**
      * Creates an interop test coordinator on the specified broker and virtual host.
@@ -94,7 +94,7 @@
      * @param brokerUrl   The URL of the broker to connect to.
      * @param virtualHost The virtual host to run all tests on. Optional, may be <tt>null</tt>.
      */
-    Coordinator(String brokerUrl, String virtualHost)
+    public Coordinator(String brokerUrl, String virtualHost)
     {
         log.debug("Coordinator(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
 
@@ -121,38 +121,36 @@
             // Use the command line parser to evaluate the command line with standard handling behaviour (print errors
             // and usage then exist if there are errors).
             Properties options =
-                CommandLineParser.processCommandLine(args,
-                    new CommandLineParser(
-                        new String[][]
-                        {
-                            { "b", "The broker URL.", "broker", "false" },
-                            { "h", "The virtual host to use.", "virtual host", "false" },
-                            { "o", "The name of the directory to output test timings to.", "dir", "false" }
-                        }));
+                    CommandLineParser.processCommandLine(args,
+                                                         new CommandLineParser(
+                                                                 new String[][]
+                                                                         {
+                                                                                 {"b", "The broker URL.", "broker", "false"},
+                                                                                 {"h", "The virtual host to use.", "virtual host", "false"},
+                                                                                 {"o", "The name of the directory to output test timings to.", "dir", "false"}
+                                                                         }));
 
             // Extract the command line options.
             String brokerUrl = options.getProperty("b");
             String virtualHost = options.getProperty("h");
-            reportDir = options.getProperty("o");
-            reportDir = (reportDir == null) ? "." : reportDir;
+            _reportDir = options.getProperty("o");
+            _reportDir = (_reportDir == null) ? "." : _reportDir;
 
             // Scan for available test cases using a classpath scanner.
             Collection<Class<? extends CoordinatingTestCase>> testCaseClasses =
-                new ArrayList<Class<? extends CoordinatingTestCase>>();
+                    new ArrayList<Class<? extends CoordinatingTestCase>>();
             // ClasspathScanner.getMatches(CoordinatingTestCase.class, "^Test.*", true);
             // Hard code the test classes till the classpath scanner is fixed.
             Collections.addAll(testCaseClasses,
-                new Class[]
-                {
-                    CoordinatingTestCase1DummyRun.class, CoordinatingTestCase2BasicP2P.class,
-                    CoordinatingTestCase3BasicPubSub.class
-                });
+                               CoordinatingTestCase1DummyRun.class,
+                               CoordinatingTestCase2BasicP2P.class,
+                               CoordinatingTestCase3BasicPubSub.class);
 
             // Check that some test classes were actually found.
-            if ((testCaseClasses == null) || testCaseClasses.isEmpty())
+            if (testCaseClasses.isEmpty())
             {
                 throw new RuntimeException(
-                    "No test classes implementing CoordinatingTestCase were found on the class path.");
+                        "No test classes implementing CoordinatingTestCase were found on the class path.");
             }
 
             int i = 0;
@@ -199,7 +197,7 @@
     public TestResult start(String[] testClassNames) throws Exception
     {
         log.debug("public TestResult start(String[] testClassNames = " + PrettyPrintingUtils.printArray(testClassNames)
-            + ": called");
+                  + ": called");
 
         // Connect to the broker.
         connection = TestClient.createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
@@ -233,7 +231,7 @@
             // Record the current test class, so that the test results can be output to a file incorporating this name.
             this.currentTestClassName = testClassName;
 
-            result = super.start(new String[] { testClassName });
+            result = super.start(new String[]{testClassName});
         }
 
         // At this point in time, all tests have completed. Broadcast the shutdown message.
@@ -257,7 +255,7 @@
     public static Set<TestClientDetails> extractEnlists(Collection<Message> enlists) throws JMSException
     {
         log.debug("public static Set<TestClientDetails> extractEnlists(Collection<Message> enlists = " + enlists
-            + "): called");
+                  + "): called");
 
         Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
 
@@ -315,9 +313,9 @@
             targetTest = new WrappedSuiteTestDecorator(suite);
             log.debug("Wrapped with a WrappedSuiteTestDecorator.");
         }
-
         // Wrap the tests in an inviting test decorator, to perform the invite/test cycle.
-        targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+
+        targetTest = newTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
 
         TestSuite suite = new TestSuite();
         suite.addTest(targetTest);
@@ -328,6 +326,11 @@
         return super.doRun(suite, wait);
     }
 
+    protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set<TestClientDetails> enlistedClients, ConversationFactory conversationFactory, Connection connection)
+    {
+        return new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+    }
+
     /**
      * Creates the TestResult object to be used for test runs.
      *
@@ -340,10 +343,10 @@
         TKTestResult result = new TKTestResult(fPrinter.getWriter(), delay, verbose, testCaseName);
 
         // Check if a directory to output reports to has been specified and attach test listeners if so.
-        if (reportDir != null)
+        if (_reportDir != null)
         {
             // Create the report directory if it does not already exist.
-            File reportDirFile = new File(reportDir);
+            File reportDirFile = new File(_reportDir);
 
             if (!reportDirFile.exists())
             {
@@ -381,5 +384,10 @@
         }
 
         return result;
+    }
+
+    public void setReportDir(String reportDir)
+    {
+        _reportDir = reportDir;
     }
 }

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java?view=diff&rev=544109&r1=544108&r2=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java Mon Jun  4 02:47:53 2007
@@ -21,7 +21,6 @@
 package org.apache.qpid.interop.coordinator;
 
 import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -30,7 +29,6 @@
 
 import junit.framework.Test;
 import junit.framework.TestResult;
-import junit.framework.TestSuite;
 
 import org.apache.log4j.Logger;
 
@@ -107,7 +105,7 @@
             CoordinatingTestCase coordTest = (CoordinatingTestCase) test;
 
             // Broadcast the invitation to find out what clients are available to test.
-            Set<TestClientDetails> enlists = null;
+            Set<TestClientDetails> enlists;
             try
             {
                 Message invite = conversationFactory.getSession().createMessage();

Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java?view=auto&rev=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java Mon Jun  4 02:47:53 2007
@@ -0,0 +1,28 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.interop.coordinator;
+
+import javax.jms.Message;
+
+public interface ListeningCoordinatorTest
+{
+    public void latejoin(Message message);
+}

Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java?view=auto&rev=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java Mon Jun  4 02:47:53 2007
@@ -0,0 +1,200 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+import org.apache.log4j.Logger;
+import org.apache.qpid.util.ConversationFactory;
+import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Broadcast test
+ * invitations and collect enlists. <td> {@link ConversationFactory}. <tr><td> Output test failures for clients
+ * unwilling to run the test case. <td> {@link Coordinator} <tr><td> Execute coordinated test cases. <td> {@link
+ * CoordinatingTestCase} </table>
+ */
+public class ListeningTestDecorator extends WrappedSuiteTestDecorator implements MessageListener
+{
+    private static final Logger log = Logger.getLogger(ListeningTestDecorator.class);
+
+    /** Holds the contact information for all test clients that are available and that may take part in the test. */
+    Set<TestClientDetails> allClients;
+
+    /** Holds the conversation helper for the control level conversation for coordinating the test through. */
+    ConversationFactory conversationFactory;
+
+    /** Holds the connection that the control conversation is held over. */
+    Connection connection;
+
+    /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */
+    WrappedSuiteTestDecorator testSuite;
+
+    /** Hold the current running test case. */
+    CoordinatingTestCase _currentTest = null;
+
+    /**
+     * Creates a wrapped suite test decorator from another one.
+     *
+     * @param suite               The test suite.
+     * @param availableClients    The list of all clients that responded to the compulsory invite.
+     * @param controlConversation The conversation helper for the control level, test coordination conversation.
+     * @param controlConnection   The connection that the coordination messages are sent over.
+     */
+    public ListeningTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> availableClients,
+                                  ConversationFactory controlConversation, Connection controlConnection)
+    {
+        super(suite);
+
+        log.debug("public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> allClients = "
+                  + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called");
+
+        testSuite = suite;
+        allClients = availableClients;
+        conversationFactory = controlConversation;
+        connection = controlConnection;
+    }
+
+    /**
+     * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is then
+     * repeated for every combination of test clients (provided the wrapped test case extends {@link
+     * CoordinatingTestCase}.
+     *
+     * <p/>Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime
+     * exceptions, resulting in the non-completion of the test run.
+     *
+     * @param testResult The the results object to monitor the test results with.
+     *
+     * @todo Better error recovery for failure of the invite/enlist conversation could be added.
+     */
+    public void run(TestResult testResult)
+    {
+        log.debug("public void run(TestResult testResult): called");
+
+        Collection<Test> tests = testSuite.getAllUnderlyingTests();
+
+        for (Test test : tests)
+        {
+            CoordinatingTestCase coordTest = (CoordinatingTestCase) test;
+
+            Set<TestClientDetails> enlists = signupClients(coordTest);
+
+            if (enlists.size() == 0)
+            {
+                throw new RuntimeException("No clients to test with");
+            }
+
+            Iterator<TestClientDetails> clients = enlists.iterator();
+            coordTest.setSender(clients.next());
+
+            while (clients.hasNext())
+            {
+                // Set the sending and receiving client details on the test case.
+                coordTest.setReceiver(clients.next());
+            }
+
+            // Pass down the connection to hold the coordination conversation over.
+            coordTest.setConversationFactory(conversationFactory);
+
+
+            if (coordTest instanceof ListeningCoordinatorTest)
+            {
+                _currentTest = coordTest;
+            }
+            // Execute the test case.
+            coordTest.run(testResult);
+
+            _currentTest = null;
+        }
+    }
+
+    private Set<TestClientDetails> signupClients(CoordinatingTestCase coordTest)
+    {
+        // Broadcast the invitation to find out what clients are available to test.
+        Set<TestClientDetails> enlists;
+        try
+        {
+            Message invite = conversationFactory.getSession().createMessage();
+            Destination controlTopic = conversationFactory.getSession().createTopic("iop.control");
+            ConversationFactory.Conversation conversation = conversationFactory.startConversation();
+
+            invite.setStringProperty("CONTROL_TYPE", "INVITE");
+            invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName()));
+
+            conversation.send(controlTopic, invite);
+
+            // Wait for a short time, to give test clients an opportunity to reply to the invitation.
+            Collection<Message> replies = conversation.receiveAll(allClients.size(), 5000);
+
+            log.debug("Received " + replies.size() + " enlist replies");
+            
+            enlists = Coordinator.extractEnlists(replies);
+
+            //Create topic to listen on for latejoiners
+            Destination listenTopic = conversationFactory.getSession().createTopic("iop.control.test." + coordTest.getTestCaseNameForTestMethod(coordTest.getName()));
+
+            //Listen for joiners
+            conversationFactory.getSession().createConsumer(listenTopic).setMessageListener(this);
+            log.debug("Created consumer on :" + listenTopic);
+        }
+        catch (JMSException e)
+        {
+            throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e);
+        }
+
+        return enlists;
+    }
+
+    /**
+     * Prints a string summarizing this test decorator, mainly for debugging purposes.
+     *
+     * @return String representation for debugging purposes.
+     */
+    public String toString()
+    {
+        return "ListeningTestDecorator: [ testSuite = " + testSuite + " ]";
+    }
+
+
+    public void onMessage(Message message)
+    {
+        try
+        {
+            if (message.getStringProperty("CONTROL_TYPE").equals("LATEJOIN"))
+            {
+                ((ListeningCoordinatorTest) _currentTest).latejoin(message);
+            }
+        }
+        catch (JMSException e)
+        {
+            log.debug("Unable to process message:" + message);
+        }
+    }
+}

Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java?view=diff&rev=544109&r1=544108&r2=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java Mon Jun  4 02:47:53 2007
@@ -75,16 +75,17 @@
     /** Holds all the test cases loaded from the classpath. */
     Map<String, InteropClientTestCase> testCases = new HashMap<String, InteropClientTestCase>();
 
-    InteropClientTestCase currentTestCase;
+    protected InteropClientTestCase currentTestCase;
 
-    private MessageProducer producer;
-    private Session session;
+    protected Connection _connection;
+    protected MessageProducer producer;
+    protected Session session;
 
-    private String clientName = CLIENT_NAME;
+    protected String clientName = CLIENT_NAME;
 
     /**
-     * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified
-     * client identifying name.
+     * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client
+     * identifying name.
      *
      * @param brokerUrl   The url of the broker to connect to.
      * @param virtualHost The virtual host to conect to.
@@ -93,7 +94,7 @@
     public TestClient(String brokerUrl, String virtualHost, String clientName)
     {
         log.debug("public TestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost
-            + ", String clientName = " + clientName + "): called");
+                  + ", String clientName = " + clientName + "): called");
 
         // Retain the connection parameters.
         this.brokerUrl = brokerUrl;
@@ -117,13 +118,13 @@
     {
         // Use the command line parser to evaluate the command line.
         CommandLineParser commandLine =
-            new CommandLineParser(
-                new String[][]
-                {
-                    { "b", "The broker URL.", "broker", "false" },
-                    { "h", "The virtual host to use.", "virtual host", "false" },
-                    { "n", "The test client name.", "name", "false" }
-                });
+                new CommandLineParser(
+                        new String[][]
+                                {
+                                        {"b", "The broker URL.", "broker", "false"},
+                                        {"h", "The virtual host to use.", "virtual host", "false"},
+                                        {"n", "The test client name.", "name", "false"}
+                                });
 
         // Capture the command line arguments or display errors and correct usage and then exit.
         Properties options = null;
@@ -151,9 +152,17 @@
         // Create a test client and start it running.
         TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName);
 
+        // Use a class path scanner to find all the interop test case implementations.
+        Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
+                new ArrayList<Class<? extends InteropClientTestCase>>();
+        // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
+        // Hard code the test classes till the classpath scanner is fixed.
+        Collections.addAll(testCaseClasses,
+                           new Class[]{TestCase1DummyRun.class, TestCase2BasicP2P.class, TestClient.class});
+
         try
         {
-            client.start();
+            client.start(testCaseClasses);
         }
         catch (Exception e)
         {
@@ -165,20 +174,12 @@
     /**
      * Starts the interop test client running. This causes it to start listening for incoming test invites.
      *
-     * @throws JMSException Any underlying JMSExceptions are allowed to fall through.
+     * @throws JMSException Any underlying JMSExceptions are allowed to fall through. @param testCaseClasses
      */
-    private void start() throws JMSException
+    protected void start(Collection<Class<? extends InteropClientTestCase>> testCaseClasses) throws JMSException
     {
         log.debug("private void start(): called");
 
-        // Use a class path scanner to find all the interop test case implementations.
-        Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
-            new ArrayList<Class<? extends InteropClientTestCase>>();
-        // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
-        // Hard code the test classes till the classpath scanner is fixed.
-        Collections.addAll(testCaseClasses,
-            new Class[] { TestCase1DummyRun.class, TestCase2BasicP2P.class, TestCase3BasicPubSub.class });
-
         // Create all the test case implementations and index them by the test names.
         for (Class<? extends InteropClientTestCase> nextClass : testCaseClasses)
         {
@@ -200,9 +201,9 @@
         }
 
         // Open a connection to communicate with the coordinator on.
-        Connection connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
+        _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
 
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         // Set this up to listen for control messages.
         MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + clientName));
@@ -215,7 +216,7 @@
         producer = session.createProducer(null);
 
         // Start listening for incoming control messages.
-        connection.start();
+        _connection.start();
     }
 
     /**
@@ -232,22 +233,25 @@
      * @param virtualHost             The virtual host to connectio to, <tt>null</tt> to use the default.
      *
      * @return A JMS conneciton.
+     *
+     * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it to a
+     * Utils library class.
      */
     public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
     {
         log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
-            + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+                  + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
 
         try
         {
             Properties connectionProps =
-                PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream(
-                        connectionPropsResource));
+                    PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream(
+                            connectionPropsResource));
 
             if (brokerUrl != null)
             {
                 String connectionString =
-                    "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
+                        "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
                 connectionProps.setProperty(CONNECTION_PROPERTY, connectionString);
             }
 
@@ -286,21 +290,21 @@
             String controlType = message.getStringProperty("CONTROL_TYPE");
             String testName = message.getStringProperty("TEST_NAME");
 
+            log.info("onMessage(Message message = " + message + "): for '" + controlType + "' to '" + testName + "'");
+
             // Check if the message is a test invite.
             if ("INVITE".equals(controlType))
             {
-                String testCaseName = message.getStringProperty("TEST_NAME");
-
                 // Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites
                 // for which test cases exist.
                 boolean enlist = false;
 
-                if (testCaseName != null)
+                if (testName != null)
                 {
-                    log.debug("Got an invite to test: " + testCaseName);
+                    log.debug("Got an invite to test: " + testName);
 
                     // Check if the requested test case is available.
-                    InteropClientTestCase testCase = testCases.get(testCaseName);
+                    InteropClientTestCase testCase = testCases.get(testName);
 
                     if (testCase != null)
                     {
@@ -308,6 +312,10 @@
                         currentTestCase = testCase;
                         enlist = true;
                     }
+                    else
+                    {
+                        log.warn("'" + testName + "' not part of this clients tests.");
+                    }
                 }
                 else
                 {
@@ -325,6 +333,8 @@
                     enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
                     enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID());
 
+                    log.info("Sending Message '" + enlistMessage + "'. to " + message.getJMSReplyTo());
+
                     producer.send(message.getJMSReplyTo(), enlistMessage);
                 }
             }
@@ -369,9 +379,10 @@
             }
             else if ("TERMINATE".equals(controlType))
             {
-                System.out.println("Received termination instruction from coordinator.");
+                log.info("Received termination instruction from coordinator.");
 
                 // Is a cleaner shutdown needed?
+                _connection.close();
                 System.exit(0);
             }
             else

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java?view=diff&rev=544109&r1=544108&r2=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java Mon Jun  4 02:47:53 2007
@@ -5,7 +5,6 @@
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.interop.testclient.InteropClientTestCase;
-import org.apache.qpid.interop.testclient.TestClient;
 
 /**
  * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
@@ -120,8 +119,8 @@
             session = new Session[1];
 
             connection[0] =
-                TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl,
-                    TestClient.virtualHost);
+                org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+                    org.apache.qpid.interop.testclient.TestClient.virtualHost);
             session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
 
             // Extract and retain the test parameters.
@@ -140,8 +139,8 @@
             for (int i = 0; i < numReceivers; i++)
             {
                 connection[i] =
-                    TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl,
-                        TestClient.virtualHost);
+                    org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+                        org.apache.qpid.interop.testclient.TestClient.virtualHost);
                 session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
 
                 sendDestination = session[i].createTopic(sendKey);

Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java?view=auto&rev=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java Mon Jun  4 02:47:53 2007
@@ -0,0 +1,567 @@
+package org.apache.qpid.sustained;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
+ * default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of
+ * messages sent/received.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Supply the name
+ * of the test case that this implements. <tr><td> Accept/Reject invites based on test parameters. <tr><td> Adapt to
+ * assigned roles. <tr><td> Send required number of test messages using pub/sub. <tr><td> Generate test reports.
+ * </table>
+ */
+public class SustainedTestClient extends TestCase3BasicPubSub implements ExceptionListener
+{
+    /** Used for debugging. */
+    private static final Logger log = Logger.getLogger(SustainedTestClient.class);
+
+    /** The role to be played by the test. */
+    private Roles role;
+
+    /** The number of test messages to send. */
+//    private int numMessages;
+
+    /** The number of receiver connection to use. */
+    private int numReceivers;
+
+    /** The routing key to send them to on the default direct exchange. */
+    private Destination sendDestination;
+
+    /** The routing key to send updates to on the default direct exchange. */
+    private Destination sendUpdateDestination;
+
+
+    /** The connections to send/receive the test messages on. */
+    private Connection[] connection;
+
+    /** The sessions to send/receive the test messages on. */
+    private Session[] session;
+
+    /** The producer to send the test messages with. */
+    MessageProducer producer;
+
+    /** Adapter that adjusts the send rate based on the updates from clients. */
+    SustainedRateAdapter _rateAdapter;
+
+    /**  */
+    int updateInterval;
+
+    private boolean _running = true;
+
+    /**
+     * Should provide the name of the test case that this class implements. The exact names are defined in the interop
+     * testing spec.
+     *
+     * @return The name of the test case that this implements.
+     */
+    public String getName()
+    {
+        log.debug("public String getName(): called");
+
+        return "Perf_SustainedPubSub";
+    }
+
+    /**
+     * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment
+     * message. When this method return the test case will be ready to execute.
+     *
+     * @param role              The role to be played; sender or receiver.
+     * @param assignRoleMessage The role assingment message, contains the full test parameters.
+     *
+     * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+     */
+    public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
+    {
+        log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
+                  + "): called");
+
+        // Take note of the role to be played.
+        this.role = role;
+
+        // Extract and retain the test parameters.
+        numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS");
+        updateInterval = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
+        String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
+        String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
+        int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
+
+        log.debug("numReceivers = " + numReceivers);
+        log.debug("updateInterval = " + updateInterval);
+        log.debug("ackMode = " + ackMode);
+        log.debug("sendKey = " + sendKey);
+        log.debug("sendUpdateKey = " + sendUpdateKey);
+        log.debug("role = " + role);
+
+        switch (role)
+        {
+            // Check if the sender role is being assigned, and set up a single message producer if so.
+            case SENDER:
+                log.info("*********** Creating SENDER");
+                // Create a new connection to pass the test messages on.
+                connection = new Connection[1];
+                session = new Session[1];
+
+                connection[0] =
+                        org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+                                                                                       org.apache.qpid.interop.testclient.TestClient.virtualHost);
+                session[0] = connection[0].createSession(false, ackMode);
+
+                // Extract and retain the test parameters.
+                sendDestination = session[0].createTopic(sendKey);
+
+                connection[0].setExceptionListener(this);
+
+                producer = session[0].createProducer(sendDestination);
+
+                sendUpdateDestination = session[0].createTopic(sendUpdateKey);
+                MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination);
+
+                _rateAdapter = new SustainedRateAdapter(this);
+                updateConsumer.setMessageListener(_rateAdapter);
+
+
+                break;
+
+                // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number
+                // of receiver connections.
+            case RECEIVER:
+                log.info("*********** Creating RECEIVER");
+                // Create the required number of receiver connections.
+                connection = new Connection[numReceivers];
+                session = new Session[numReceivers];
+
+                for (int i = 0; i < numReceivers; i++)
+                {
+                    connection[i] =
+                            org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
+                                                                                           org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+                                                                                           org.apache.qpid.interop.testclient.TestClient.virtualHost);
+                    session[i] = connection[i].createSession(false, ackMode);
+
+                    sendDestination = session[i].createTopic(sendKey);
+
+                    sendUpdateDestination = session[i].createTopic(sendUpdateKey);
+
+                    MessageConsumer consumer = session[i].createConsumer(sendDestination);
+
+                    consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, updateInterval, session[i], sendUpdateDestination));
+                }
+
+                break;
+        }
+
+        // Start all the connection dispatcher threads running.
+        for (int i = 0; i < connection.length; i++)
+        {
+            connection[i].start();
+        }
+    }
+
+    /** Performs the test case actions. */
+    public void start() throws JMSException
+    {
+        log.debug("public void start(): called");
+
+        // Check that the sender role is being performed.
+        switch (role)
+        {
+            // Check if the sender role is being assigned, and set up a single message producer if so.
+            case SENDER:
+                Message testMessage = session[0].createTextMessage("test");
+
+//            for (int i = 0; i < numMessages; i++)
+                while (_running)
+                {
+                    producer.send(testMessage);
+
+                    _rateAdapter.sentMessage();
+                }
+                break;
+            case RECEIVER:
+
+        }
+    }
+
+    /**
+     * Gets a report on the actions performed by the test case in its assigned role.
+     *
+     * @param session The session to create the report message in.
+     *
+     * @return The report message.
+     *
+     * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through.
+     */
+    public Message getReport(Session session) throws JMSException
+    {
+        log.debug("public Message getReport(Session session): called");
+
+        // Close the test connections.
+        for (int i = 0; i < connection.length; i++)
+        {
+            connection[i].close();
+        }
+
+        Message report = session.createMessage();
+        report.setStringProperty("CONTROL_TYPE", "REPORT");
+
+        return report;
+    }
+
+    public void onException(JMSException jmsException)
+    {
+        Exception linked = jmsException.getLinkedException();
+
+        if (linked != null)
+        {
+            if (linked instanceof AMQNoRouteException)
+            {
+                log.warn("No route .");
+            }
+            else if (linked instanceof AMQNoConsumersException)
+            {
+                log.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage());
+            }
+            else
+            {
+
+                log.warn("LinkedException:" + linked);
+            }
+
+            _rateAdapter.NO_CLIENTS = true;
+        }
+        else
+        {
+            log.warn("Exception:" + linked);
+        }
+    }
+
+    class SustainedListener implements MessageListener
+    {
+        private int _received = 0;
+        private int _updateInterval = 0;
+        private Long _time;
+        MessageProducer _updater;
+        Session _session;
+        String _client;
+
+
+        public SustainedListener(String clientname, int updateInterval, Session session, Destination sendDestination) throws JMSException
+        {
+            _updateInterval = updateInterval;
+            _client = clientname;
+            _session = session;
+            _updater = session.createProducer(sendDestination);
+        }
+
+        public void setReportInterval(int reportInterval)
+        {
+            _updateInterval = reportInterval;
+            _received = 0;
+        }
+
+        public void onMessage(Message message)
+        {
+            if (log.isDebugEnabled())
+            {
+                log.debug("Message " + _received + "received in listener");
+            }
+
+            if (message instanceof TextMessage)
+            {
+
+                try
+                {
+                    if (((TextMessage) message).getText().equals("test"))
+                    {
+                        if (_received == 0)
+                        {
+                            _time = System.nanoTime();
+                            sendStatus(0, _received);
+                        }
+
+                        _received++;
+
+                        if (_received % _updateInterval == 0)
+                        {
+                            Long currentTime = System.nanoTime();
+
+                            try
+                            {
+                                sendStatus(currentTime - _time, _received);
+                                _time = currentTime;
+                            }
+                            catch (JMSException e)
+                            {
+                                log.error("Unable to send update.");
+                            }
+                        }
+
+                    }
+                }
+                catch (JMSException e)
+                {
+                    //ignore error
+                }
+            }
+        }
+
+        private void sendStatus(long time, int received) throws JMSException
+        {
+            Message updateMessage = _session.createTextMessage("update");
+            updateMessage.setStringProperty("CLIENT_ID", _client);
+            updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
+            updateMessage.setLongProperty("RECEIVED", received);
+            updateMessage.setLongProperty("DURATION", time);
+
+            log.info("**** SENDING **** CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+
+            _updater.send(updateMessage);
+        }
+
+    }
+
+    class SustainedRateAdapter implements MessageListener
+    {
+        private SustainedTestClient _client;
+        private long _variance = 250; //no. messages to allow drifting
+        private volatile long _delay;   //in nanos
+        private long _sent;
+        private Map<String, Long> _slowClients = new HashMap<String, Long>();
+        private static final long PAUSE_SLEEP = 10; // 10 ms
+        private static final long NO_CLIENT_SLEEP = 1000; // 1s 
+        private static final long MAX_MESSAGE_DRIFT = 1000; // no messages drifted from producer
+        private volatile boolean NO_CLIENTS = true;
+        private int _delayShifting;
+        private static final int REPORTS_WITHOUT_CHANGE = 10;
+        private static final double MAXIMUM_DELAY_SHIFT = .02; //2% 
+
+        SustainedRateAdapter(SustainedTestClient client)
+        {
+            _client = client;
+        }
+
+        public void onMessage(Message message)
+        {
+            if (log.isDebugEnabled())
+            {
+                log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called");
+            }
+
+            try
+            {
+                String controlType = message.getStringProperty("CONTROL_TYPE");
+
+                // Check if the message is a test invite.
+                if ("UPDATE".equals(controlType))
+                {
+                    NO_CLIENTS = false;
+                    long duration = message.getLongProperty("DURATION");
+                    long received = message.getLongProperty("RECEIVED");
+                    String client = message.getStringProperty("CLIENT_ID");
+
+                    log.info("**** SENDING **** CLIENT_ID:" + client + " RECEIVED:" + received + " DURATION:" + duration);
+
+
+                    recordSlow(client, received);
+
+                    adjustDelay(client, received, duration);
+                }
+            }
+            catch (JMSException e)
+            {
+                //
+            }
+        }
+
+        class Pair<X, Y>
+        {
+            X item1;
+            Y item2;
+
+            Pair(X i1, Y i2)
+            {
+                item1 = i1;
+                item2 = i2;
+            }
+
+            X getItem1()
+            {
+                return item1;
+            }
+
+            Y getItem2()
+            {
+                return item2;
+            }
+        }
+
+        Map<String, Pair<Long, Long>> delays = new HashMap<String, Pair<Long, Long>>();
+        Long totalReceived = 0L;
+        Long totalDuration = 0L;
+
+        private void adjustDelay(String client, long received, long duration)
+        {
+            Pair<Long, Long> current = delays.get(client);
+
+            if (current == null)
+            {
+                delays.put(client, new Pair<Long, Long>(received, duration));
+            }
+            else
+            {
+                //reduce totals
+                totalReceived -= current.getItem1();
+                totalDuration -= current.getItem2();
+            }
+
+            totalReceived += received;
+            totalDuration += duration;
+
+            long averageDuration = totalDuration / delays.size();
+
+            long diff = Math.abs(_delay - averageDuration);
+
+            //if the averageDuration differs from the current by more than the specified variane then adjust delay.
+            if (diff > _variance)
+            {
+                if (averageDuration > _delay)
+                {
+                    // we can go faster
+                    _delay -= diff;
+                    if (_delay < 0)
+                    {
+                        _delay = 0;
+                    }
+                }
+                else
+                {
+                    // we need to slow down
+                    _delay += diff;
+                }
+                delayChanged();
+            }
+            else
+            {
+                delayStable();
+            }
+
+        }
+
+        private void delayChanged()
+        {
+            _delayShifting = REPORTS_WITHOUT_CHANGE;
+        }
+
+        private void delayStable()
+        {
+            _delayShifting--;
+
+            if (_delayShifting < 0)
+            {
+                _delayShifting = 0;
+                log.info("Delay stabilised:" + _delay);
+            }
+        }
+
+        // Record Slow clients
+        private void recordSlow(String client, long received)
+        {
+            if (received < (_sent - _variance))
+            {
+                _slowClients.put(client, received);
+            }
+            else
+            {
+                _slowClients.remove(client);
+            }
+        }
+
+        public void sentMessage()
+        {
+            if (_sent % updateInterval == 0)
+            {
+
+                // Cause test to pause when we have slow
+                if (!_slowClients.isEmpty() || NO_CLIENTS)
+                {
+                    log.info("Pausing for slow clients");
+
+                    //_delay <<= 1;
+
+                    while (!_slowClients.isEmpty())
+                    {
+                        sleep(PAUSE_SLEEP);
+                    }
+
+                    if (NO_CLIENTS)
+                    {
+                        sleep(NO_CLIENT_SLEEP);
+                    }
+
+                    log.debug("Continuing");
+                    return;
+                }
+                else
+                {
+                    log.info("Delay:" + _delay);
+                }
+            }
+
+            _sent++;
+
+            if (_delay > 0)
+            {
+                // less than 10ms sleep doesn't work.
+                // _delay is in nano seconds
+                if (_delay < 1000000)
+                {
+                    sleep(0, (int) _delay);
+                }
+                else
+                {
+                    if (_delay < 30000000000L)
+                    {
+                        sleep(_delay / 1000000, (int) (_delay % 1000000));
+                    }
+                }
+            }
+        }
+
+        private void sleep(long sleep)
+        {
+            sleep(sleep, 0);
+        }
+
+        private void sleep(long milli, int nano)
+        {
+            try
+            {
+                log.debug("Sleep:" + milli + ":" + nano);
+                Thread.sleep(milli, nano);
+            }
+            catch (InterruptedException e)
+            {
+                //
+            }
+        }
+    }
+
+}

Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java?view=auto&rev=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java Mon Jun  4 02:47:53 2007
@@ -0,0 +1,219 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.interop.coordinator.ListeningCoordinatorTest;
+import org.apache.qpid.interop.coordinator.TestClientDetails;
+import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub;
+import org.apache.qpid.util.ConversationFactory;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub implements ListeningCoordinatorTest
+{
+    /** Used for debugging. */
+    private static final Logger log = Logger.getLogger(SustainedTestCoordinator.class);
+    private List<TestClientDetails> _receivers;
+    private static final String SUSTAINED_KEY = "Perf_SustainedPubSub";
+    Map<String, Object> _testProperties;
+
+    /**
+     * Creates a new coordinating test case with the specified name.
+     *
+     * @param name The test case name.
+     */
+    public SustainedTestCoordinator(String name)
+    {
+        super(name);
+        _receivers = new LinkedList();
+    }
+
+    /**
+     * Adds a receiver to this test.
+     *
+     * @param receiver The contact details of the sending client in the test.
+     */
+    public void setReceiver(TestClientDetails receiver)
+    {
+        _receivers.add(receiver);
+    }
+
+
+    /**
+     * Performs the a single test run
+     *
+     * @throws Exception if there was a problem running the test.
+     */
+    public void testBasicPubSub() throws Exception
+    {
+        log.debug("public void testSinglePubSubCycle(): called");
+
+        Map<String, Object> testConfig = new HashMap<String, Object>();
+        testConfig.put("TEST_NAME", "Perf_SustainedPubSub");
+        testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY);
+        //testConfig.put("SUSTAINED_MSG_RATE", 10);
+        testConfig.put("SUSTAINED_NUM_RECEIVERS", 2);
+        testConfig.put("SUSTAINED_UPDATE_INTERVAL", 25);
+        testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE");
+        testConfig.put("ACKNOWLEDGE_MODE", AMQSession.NO_ACKNOWLEDGE);
+
+        sequenceTest(testConfig);
+    }
+
+    /**
+     * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner loop
+     * of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports from the
+     * participants.
+     *
+     * @param testProperties The test case definition.
+     *
+     * @return The test results from the senders and receivers.
+     *
+     * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
+     */
+    protected Message[] sequenceTest(Map<String, Object> testProperties) throws JMSException
+    {
+        log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called");
+
+        Session session = conversationFactory.getSession();
+        Destination senderControlTopic = session.createTopic(sender.privateControlKey);
+
+        ConversationFactory.Conversation senderConversation = conversationFactory.startConversation();
+
+        // Assign the sender role to the sending test client.
+        Message assignSender = conversationFactory.getSession().createMessage();
+        setPropertiesOnMessage(assignSender, testProperties);
+        assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
+        assignSender.setStringProperty("ROLE", "SENDER");
+
+        senderConversation.send(senderControlTopic, assignSender);
+
+        //Assign and wait for the receiver ckuebts to be ready.
+        _testProperties = testProperties;
+
+        // Wait for the senders to confirm their roles.
+        senderConversation.receive();
+
+        assignReceivers();
+
+        // Start the test.
+        Message start = session.createMessage();
+        start.setStringProperty("CONTROL_TYPE", "START");
+
+        senderConversation.send(senderControlTopic, start);
+
+        // Wait for the test sender to return its report.
+        Message senderReport = senderConversation.receive();
+
+        try
+        {
+            Thread.sleep(500);
+        }
+        catch (InterruptedException e)
+        {
+        }
+
+        // Ask the receiver for its report.
+        Message statusRequest = session.createMessage();
+        statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST");
+
+
+        return new Message[]{senderReport};
+    }
+
+    private void assignReceivers()
+    {
+        for (TestClientDetails receiver : _receivers)
+        {
+            registerReceiver(receiver);
+        }
+    }
+
+    private void registerReceiver(TestClientDetails receiver)
+    {
+        log.info("registerReceiver called for receiver:" + receiver);
+        try
+        {
+            Session session = conversationFactory.getSession();
+            Destination receiverControlTopic = session.createTopic(receiver.privateControlKey);
+            ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation();
+            // Assign the receiver role the receiving client.
+            Message assignReceiver = session.createMessage();
+            setPropertiesOnMessage(assignReceiver, _testProperties);
+            assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
+            assignReceiver.setStringProperty("ROLE", "RECEIVER");
+
+            receiverConversation.send(receiverControlTopic, assignReceiver);
+
+            //Don't wait for receiver to be ready.... we can't this is being done in
+            // the dispatcher thread, and most likely the acceptance message we
+            // want is sitting in the Dispatcher._queue waiting its turn for being
+            // dispatched so if we block here we won't can't get the message.             
+            // So assume consumer is ready for action.
+            //receiverConversation.receive();
+        }
+        catch (JMSException e)
+        {
+            log.warn("Unable to assign receiver:" + receiver + ". Due to:" + e.getMessage());
+        }
+    }
+
+    public void latejoin(Message message)
+    {
+        try
+        {
+
+            TestClientDetails clientDetails = new TestClientDetails();
+            clientDetails.clientName = message.getStringProperty("CLIENT_NAME");
+            clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY");
+
+
+            registerReceiver(clientDetails);
+        }
+        catch (JMSException e)
+        {
+            //swallow
+        }
+    }
+
+    /**
+     * Should provide a translation from the junit method name of a test to its test case name as defined in the interop
+     * testing specification. For example the method "testP2P" might map onto the interop test case name
+     * "TC2_BasicP2P".
+     *
+     * @param methodName The name of the JUnit test method.
+     *
+     * @return The name of the corresponding interop test case.
+     */
+    public String getTestCaseNameForTestMethod(String methodName)
+    {
+        return "Perf_SustainedPubSub";
+    }
+}

Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java?view=auto&rev=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java Mon Jun  4 02:47:53 2007
@@ -0,0 +1,157 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.interop.testclient.InteropClientTestCase;
+import org.apache.qpid.util.CommandLineParser;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+
+public class TestClient extends org.apache.qpid.interop.testclient.TestClient
+{
+    private static Logger log = Logger.getLogger(TestClient.class);
+
+    /**
+     * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client
+     * identifying name.
+     *
+     * @param brokerUrl   The url of the broker to connect to.
+     * @param virtualHost The virtual host to conect to.
+     * @param clientName  The client name to use.
+     */
+    public TestClient(String brokerUrl, String virtualHost, String clientName)
+    {
+        super(brokerUrl, virtualHost, clientName);
+    }
+
+    /**
+     * The entry point for the interop test coordinator. This client accepts the following command line arguments:
+     *
+     * <p/><table> <tr><td> -b         <td> The broker URL.       <td> Optional. <tr><td> -h         <td> The virtual
+     * host.     <td> Optional. <tr><td> -n         <td> The test client name. <td> Optional. <tr><td> name=value <td>
+     * Trailing argument define name/value pairs. Added to system properties. <td> Optional. </table>
+     *
+     * @param args The command line arguments.
+     */
+    public static void main(String[] args)
+    {
+        // Use the command line parser to evaluate the command line.
+        CommandLineParser commandLine =
+                new CommandLineParser(
+                        new String[][]
+                                {
+                                        {"b", "The broker URL.", "broker", "false"},
+                                        {"h", "The virtual host to use.", "virtual host", "false"},
+                                        {"n", "The test client name.", "name", "false"},
+                                        {"j", "Join this test client to running test.", "join", ""}
+                                });
+
+        // Capture the command line arguments or display errors and correct usage and then exit.
+        Properties options = null;
+
+        try
+        {
+            options = commandLine.parseCommandLine(args);
+        }
+        catch (IllegalArgumentException e)
+        {
+            System.out.println(commandLine.getErrors());
+            System.out.println(commandLine.getUsage());
+            System.exit(1);
+        }
+
+        // Extract the command line options.
+        String brokerUrl = options.getProperty("b");
+        String virtualHost = options.getProperty("h");
+        String clientName = options.getProperty("n");
+        String join = options.getProperty("j");
+
+        // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
+        // overridden values from there.
+        commandLine.addCommandLineToSysProperties();
+
+        // Create a test client and start it running.
+        TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName);
+
+        // Use a class path scanner to find all the interop test case implementations.
+        Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
+                new ArrayList<Class<? extends InteropClientTestCase>>();
+        // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
+        // Hard code the test classes till the classpath scanner is fixed.
+        Collections.addAll(testCaseClasses,
+                           SustainedTestClient.class);
+
+
+        try
+        {
+            client.start(testCaseClasses, join);
+        }
+        catch (Exception e)
+        {
+            log.error("The test client was unable to start.", e);
+            System.exit(1);
+        }
+    }
+
+    protected void start(Collection<Class<? extends InteropClientTestCase>> testCaseClasses, String join) throws JMSException, ClassNotFoundException
+    {
+        super.start(testCaseClasses);
+        log.debug("private void start(): called");
+
+        if (join != null && !join.equals(""))
+        {
+            Message latejoin = session.createMessage();
+
+            try
+            {
+                Object test = Class.forName(join).newInstance();
+                if (test instanceof InteropClientTestCase)
+                {
+                    currentTestCase = (InteropClientTestCase) test;
+                }
+                else
+                {
+                    throw new RuntimeException("Requested to join class '" + join + "' but this is not a InteropClientTestCase.");
+                }
+
+                latejoin.setStringProperty("CONTROL_TYPE", "LATEJOIN");
+                latejoin.setStringProperty("CLIENT_NAME", clientName);
+                latejoin.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
+                producer.send(session.createTopic("iop.control.test." + currentTestCase.getName()), latejoin);
+            }
+            catch (InstantiationException e)
+            {
+                log.warn("Unable to request latejoining of test:" + currentTestCase);
+            }
+            catch (IllegalAccessException e)
+            {
+                log.warn("Unable to request latejoining of test:" + currentTestCase);
+            }
+        }
+    }
+
+}

Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java?view=auto&rev=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java Mon Jun  4 02:47:53 2007
@@ -0,0 +1,117 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.qpid.interop.coordinator.Coordinator;
+import org.apache.qpid.interop.coordinator.ListeningTestDecorator;
+import org.apache.qpid.interop.coordinator.TestClientDetails;
+import org.apache.qpid.util.CommandLineParser;
+import org.apache.qpid.util.ConversationFactory;
+import org.apache.log4j.Logger;
+
+import java.util.Properties;
+import java.util.Set;
+
+import junit.framework.TestResult;
+import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+
+import javax.jms.Connection;
+
+public class TestCoordinator extends Coordinator
+{
+
+    private static final Logger log = Logger.getLogger(TestCoordinator.class);
+
+    /**
+     * Creates an interop test coordinator on the specified broker and virtual host.
+     *
+     * @param brokerUrl   The URL of the broker to connect to.
+     * @param virtualHost The virtual host to run all tests on. Optional, may be <tt>null</tt>.
+     */
+    TestCoordinator(String brokerUrl, String virtualHost)
+    {
+        super(brokerUrl, virtualHost);
+    }
+
+    protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set<TestClientDetails> enlistedClients, ConversationFactory conversationFactory, Connection connection)
+    {
+        return  new ListeningTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+    }
+
+
+    /**
+     * The entry point for the interop test coordinator. This client accepts the following command line arguments:
+     *
+     * <p/><table> <tr><td> -b         <td> The broker URL.   <td> Mandatory. <tr><td> -h         <td> The virtual host.
+     * <td> Optional. <tr><td> name=value <td> Trailing argument define name/value pairs. Added to system properties.
+     * <td> Optional. </table>
+     *
+     * @param args The command line arguments.
+     */
+    public static void main(String[] args)
+    {
+        try
+        {
+            // Use the command line parser to evaluate the command line with standard handling behaviour (print errors
+            // and usage then exist if there are errors).
+            Properties options =
+                    CommandLineParser.processCommandLine(args,
+                                                         new CommandLineParser(
+                                                                 new String[][]
+                                                                         {
+                                                                                 {"b", "The broker URL.", "broker", "false"},
+                                                                                 {"h", "The virtual host to use.", "virtual host", "false"},
+                                                                                 {"o", "The name of the directory to output test timings to.", "dir", "false"}
+                                                                         }));
+
+            // Extract the command line options.
+            String brokerUrl = options.getProperty("b");
+            String virtualHost = options.getProperty("h");
+            String reportDir = options.getProperty("o");
+            reportDir = (reportDir == null) ? "." : reportDir;
+
+
+            String[] testClassNames = {SustainedTestCoordinator.class.getName()};
+
+            // Create a coordinator and begin its test procedure.
+            Coordinator coordinator = new TestCoordinator(brokerUrl, virtualHost);
+
+            coordinator.setReportDir(reportDir);
+
+            TestResult testResult = coordinator.start(testClassNames);
+
+            if (testResult.failureCount() > 0)
+            {
+                System.exit(FAILURE_EXIT);
+            }
+            else
+            {
+                System.exit(SUCCESS_EXIT);
+            }
+        }
+        catch (Exception e)
+        {
+            System.err.println(e.getMessage());
+            log.error("Top level handler caught execption.", e);
+            System.exit(EXCEPTION_EXIT);
+        }
+    }
+}

Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date