You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/08/01 18:19:37 UTC

svn commit: r561855 [2/3] - in /incubator/qpid/branches/M2/java: integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/ integrationtests/src/main/java/org/apache/qpid/interop/testcases/ integrationtests/src/main/java/org/apache/qpid/pe...

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEnd.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEnd.java?view=diff&rev=561855&r1=561854&r2=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEnd.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEnd.java Wed Aug  1 09:19:31 2007
@@ -74,4 +74,18 @@
      * @throws JMSException Any JMSExceptions occurring during the close are allowed to fall through.
      */
     public void close() throws JMSException;
+
+    /**
+     * Returns the message monitor for reporting on received messages on this circuit end.
+     *
+     * @return The message monitor for this circuit end.
+     */
+    public MessageMonitor getMessageMonitor();
+
+    /**
+     * Returns the exception monitor for reporting on exceptions received on this circuit end.
+     *
+     * @return The exception monitor for this circuit end.
+     */
+    public ExceptionMonitor getExceptionMonitor();
 }

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEndBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEndBase.java?view=diff&rev=561855&r1=561854&r2=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEndBase.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitEndBase.java Wed Aug  1 09:19:31 2007
@@ -41,21 +41,31 @@
     /** Holds the single message consumer. */
     MessageConsumer consumer;
 
-    /** Holds the session for the circuit end. */
+    /** Holds the controlSession for the circuit end. */
     Session session;
 
+    /** Holds the message monitor for the circuit end. */
+    MessageMonitor messageMonitor;
+
+    /** Holds the exception monitor for the circuit end. */
+    ExceptionMonitor exceptionMonitor;
+
     /**
-     * Creates a circuit end point on the specified producer, consumer and session.
+     * Creates a circuit end point on the specified producer, consumer and controlSession.
      *
      * @param producer The message producer for the circuit end point.
      * @param consumer The message consumer for the circuit end point.
-     * @param session  The session for the circuit end point.
+     * @param session  The controlSession for the circuit end point.
      */
-    public CircuitEndBase(MessageProducer producer, MessageConsumer consumer, Session session)
+    public CircuitEndBase(MessageProducer producer, MessageConsumer consumer, Session session, MessageMonitor messageMonitor,
+        ExceptionMonitor exceptionMonitor)
     {
         this.producer = producer;
         this.consumer = consumer;
         this.session = session;
+
+        this.messageMonitor = messageMonitor;
+        this.exceptionMonitor = exceptionMonitor;
     }
 
     /**
@@ -115,5 +125,25 @@
         {
             consumer.close();
         }
+    }
+
+    /**
+     * Returns the message monitor for reporting on received messages on this circuit end.
+     *
+     * @return The message monitor for this circuit end.
+     */
+    public MessageMonitor getMessageMonitor()
+    {
+        return messageMonitor;
+    }
+
+    /**
+     * Returns the exception monitor for reporting on exceptions received on this circuit end.
+     *
+     * @return The exception monitor for this circuit end.
+     */
+    public ExceptionMonitor getExceptionMonitor()
+    {
+        return exceptionMonitor;
     }
 }

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java?view=diff&rev=561855&r1=561854&r2=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ExceptionMonitor.java Wed Aug  1 09:19:31 2007
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.test.framework;
 
+import org.apache.log4j.Logger;
+
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 
@@ -39,6 +41,9 @@
  */
 public class ExceptionMonitor implements ExceptionListener
 {
+    /** Used for debugging. */
+    private final Logger log = Logger.getLogger(ExceptionMonitor.class);
+
     /** Holds the received exceptions. */
     List<JMSException> exceptions = new ArrayList<JMSException>();
 
@@ -49,6 +54,8 @@
      */
     public void onException(JMSException e)
     {
+        log.debug("public void onException(JMSException e): called");
+
         exceptions.add(e);
     }
 

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkBaseCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkBaseCase.java?view=diff&rev=561855&r1=561854&r2=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkBaseCase.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkBaseCase.java Wed Aug  1 09:19:31 2007
@@ -22,13 +22,16 @@
 
 import junit.framework.TestCase;
 
+import org.apache.log4j.Logger;
 import org.apache.log4j.NDC;
 
 import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.test.framework.sequencers.TestCaseSequencer;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.test.framework.localcircuit.CircuitImpl;
+import org.apache.qpid.test.framework.localcircuit.LocalCircuitImpl;
+import org.apache.qpid.test.framework.sequencers.CircuitFactory;
+import org.apache.qpid.util.ConversationFactory;
 
+import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
 import java.util.ArrayList;
@@ -47,10 +50,13 @@
  * <tr><td> Convert failed assertions to error messages.
  * </table>
  */
-public class FrameworkBaseCase extends TestCase
+public class FrameworkBaseCase extends AsymptoticTestCase
 {
+    /** Used for debugging purposes. */
+    private static final Logger log = Logger.getLogger(FrameworkBaseCase.class);
+
     /** Holds the test sequencer to create and run test circuits with. */
-    protected TestCaseSequencer testSequencer = new DefaultTestSequencer();
+    protected CircuitFactory circuitFactory = new DefaultCircuitFactory();
 
     /**
      * Creates a new test case with the specified name.
@@ -69,20 +75,20 @@
      *
      * @return The test case sequencer.
      */
-    protected TestCaseSequencer getTestSequencer()
+    protected CircuitFactory getCircuitFactory()
     {
-        return testSequencer;
+        return circuitFactory;
     }
 
     /**
-     * Overrides the default test sequencer. Test decorators can use this to supply distributed test sequencers or other
-     * test sequencer specializations.
+     * Overrides the default test circuit factory. Test decorators can use this to supply distributed test sequencers or
+     * other test circuit factory specializations.
      *
-     * @param sequencer The new test sequencer.
+     * @param circuitFactory The new test circuit factory.
      */
-    public void setTestSequencer(TestCaseSequencer sequencer)
+    public void setCircuitFactory(CircuitFactory circuitFactory)
     {
-        this.testSequencer = sequencer;
+        this.circuitFactory = circuitFactory;
     }
 
     /**
@@ -112,6 +118,8 @@
      */
     protected void assertNoFailures(List<Assertion> asserts)
     {
+        log.debug("protected void assertNoFailures(List<Assertion> asserts = " + asserts + "): called");
+
         // Check if there are no assertion failures, and return without doing anything if so.
         if ((asserts == null) || asserts.isEmpty())
         {
@@ -175,10 +183,25 @@
     }
 
     /**
-     * DefaultTestSequencer is a test sequencer that creates test circuits with publishing and receiving ends rooted
+     * Should provide a translation from the junit method name of a test to its test case name as known to the test
+     * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test
+     * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case
+     * name "TC2_BasicP2P".
+     *
+     * @param methodName The name of the JUnit test method.
+     *
+     * @return The name of the corresponding interop test case.
+     */
+    public String getTestCaseNameForTestMethod(String methodName)
+    {
+        return methodName;
+    }
+
+    /**
+     * DefaultCircuitFactory is a test sequencer that creates test circuits with publishing and receiving ends rooted
      * on the same JVM.
      */
-    public class DefaultTestSequencer implements TestCaseSequencer
+    public class DefaultCircuitFactory implements CircuitFactory
     {
         /**
          * Holds a test coordinating conversation with the test clients. This should consist of assigning the test roles,
@@ -201,7 +224,57 @@
          */
         public Circuit createCircuit(ParsedProperties testProperties)
         {
-            return CircuitImpl.createCircuit(testProperties);
+            return LocalCircuitImpl.createCircuit(testProperties);
+        }
+
+        /**
+         * Sets the sender test client to coordinate the test with.
+         *
+         * @param sender The contact details of the sending client in the test.
+         */
+        public void setSender(TestClientDetails sender)
+        {
+            throw new RuntimeException("Not implemented.");
+        }
+
+        /**
+         * Sets the receiving test client to coordinate the test with.
+         *
+         * @param receiver The contact details of the sending client in the test.
+         */
+        public void setReceiver(TestClientDetails receiver)
+        {
+            throw new RuntimeException("Not implemented.");
+        }
+
+        /**
+         * Supplies the sending test client.
+         *
+         * @return The sending test client.
+         */
+        public TestClientDetails getSender()
+        {
+            throw new RuntimeException("Not implemented.");
+        }
+
+        /**
+         * Supplies the receiving test client.
+         *
+         * @return The receiving test client.
+         */
+        public List<TestClientDetails> getReceivers()
+        {
+            throw new RuntimeException("Not implemented.");
+        }
+
+        /**
+         * Accepts the conversation factory over which to hold the test coordinating conversation.
+         *
+         * @param conversationFactory The conversation factory to coordinate the test over.
+         */
+        public void setConversationFactory(ConversationFactory conversationFactory)
+        {
+            throw new RuntimeException("Not implemented.");
         }
     }
 }

Added: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkClientBaseCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkClientBaseCase.java?view=auto&rev=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkClientBaseCase.java (added)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkClientBaseCase.java Wed Aug  1 09:19:31 2007
@@ -0,0 +1,11 @@
+package org.apache.qpid.test.framework;
+
+/**
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td>
+ * </table>
+ */
+public class FrameworkClientBaseCase
+{
+}

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java?view=diff&rev=561855&r1=561854&r2=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessageMonitor.java Wed Aug  1 09:19:31 2007
@@ -20,27 +20,86 @@
  */
 package org.apache.qpid.test.framework;
 
+import org.apache.log4j.Logger;
+
 import javax.jms.Message;
 import javax.jms.MessageListener;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * MessageMonitor is used to record information about messages received. This will provide methods to check various
  * properties, such as the type, number and content of messages received in order to verify the correct behaviour of
  * tests.
  *
- * <p/>At the moment this monitor does not do anything.
- *
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Count incoming messages.
+ * <tr><td> Record time ellapsed since the arrival of the first message.
+ * <tr><td> Reset all counts and timings.
  * </table>
  */
 public class MessageMonitor implements MessageListener
 {
+    /** Used for debugging. */
+    private final Logger log = Logger.getLogger(MessageMonitor.class);
+
+    /** Holds the count of messages received since the last query. */
+    protected AtomicInteger numMessages = new AtomicInteger();
+
+    /** Holds the time of arrival of the first message. */
+    protected Long firstMessageTime = null;
+
     /**
      * Handles received messages. Does Nothing.
      *
      * @param message The message. Ignored.
      */
     public void onMessage(Message message)
-    { }
+    {
+        log.debug("public void onMessage(Message message): called");
+
+        numMessages.getAndIncrement();
+    }
+
+    /**
+     * Gets the count of messages.
+     *
+     * @return The count of messages.
+     */
+    public int getNumMessage()
+    {
+        if (firstMessageTime == null)
+        {
+            firstMessageTime = System.nanoTime();
+        }
+
+        return numMessages.get();
+    }
+
+    /**
+     * Gets the time elapsed since the first message arrived, in nanos, or zero if no messages have arrived yet.
+     *
+     * @return The time elapsed since the first message arrived, in nanos, or zero if no messages have arrived yet.
+     */
+    public long getTime()
+    {
+        if (firstMessageTime != null)
+        {
+            return System.nanoTime() - firstMessageTime;
+        }
+        else
+        {
+            return 0L;
+        }
+    }
+
+    /**
+     * Resets the message count and timer to zero.
+     */
+    public void reset()
+    {
+        numMessages.set(0);
+        firstMessageTime = null;
+    }
 }

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessagingTestConfigProperties.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessagingTestConfigProperties.java?view=diff&rev=561855&r1=561854&r2=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessagingTestConfigProperties.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessagingTestConfigProperties.java Wed Aug  1 09:19:31 2007
@@ -24,6 +24,8 @@
 
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 
+import java.util.Properties;
+
 /**
  * MessagingTestConfigProperties defines a set of property names and default values for specifying a messaging topology,
  * and test parameters for running a messaging test over that topology. A Properties object holding some of these
@@ -67,8 +69,11 @@
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td> Provide the names and defaults of all test parameters.
  * </table>
+ *
+ * @todo Put a type-safe wrapper around these properties, but continue to store the parameters as properties. This is
+ *       simply to ensure that it is a simple matter to serialize/deserialize string/string pairs onto messages.
  */
-public class MessagingTestConfigProperties
+public class MessagingTestConfigProperties extends ParsedProperties
 {
     // ====================== Connection Properties ==================================
 
@@ -131,6 +136,18 @@
     /** Holds the default value of the receivers consumer flag. */
     public static final boolean RECEIVER_CONSUMER_BIND_DEFAULT = true;
 
+    /** Holds the name of the property to get the publishers consumer active flag from. */
+    public static final String PUBLISHER_CONSUMER_ACTIVE_PROPNAME = "publisherConsumerActive";
+
+    /** Holds the default value of the publishers consumer active flag. */
+    public static final boolean PUBLISHER_CONSUMER_ACTIVE_DEFAULT = true;
+
+    /** Holds the name of the property to get the receivers consumer active flag from. */
+    public static final String RECEIVER_CONSUMER_ACTIVE_PROPNAME = "receiverConsumerActive";
+
+    /** Holds the default value of the receivers consumer active flag. */
+    public static final boolean RECEIVER_CONSUMER_ACTIVE_DEFAULT = true;
+
     /** Holds the name of the property to get the destination name root from. */
     public static final String SEND_DESTINATION_NAME_ROOT_PROPNAME = "sendDestinationRoot";
 
@@ -214,7 +231,7 @@
     public static final boolean DURABLE_DESTS_DEFAULT = false;
 
     /** Holds the name of the proeprty to set the prefetch size from. */
-    public static final String PREFECTH_PROPNAME = "prefetch";
+    public static final String PREFETCH_PROPNAME = "prefetch";
 
     /** Defines the default prefetch size to use when consuming messages. */
     public static final int PREFETCH_DEFAULT = 100;
@@ -275,6 +292,8 @@
         defaults.setPropertyIfNull(PUBLISHER_CONSUMER_BIND_PROPNAME, PUBLISHER_CONSUMER_BIND_DEFAULT);
         defaults.setPropertyIfNull(RECEIVER_PRODUCER_BIND_PROPNAME, RECEIVER_PRODUCER_BIND_DEFAULT);
         defaults.setPropertyIfNull(RECEIVER_CONSUMER_BIND_PROPNAME, RECEIVER_CONSUMER_BIND_DEFAULT);
+        defaults.setPropertyIfNull(PUBLISHER_CONSUMER_ACTIVE_PROPNAME, PUBLISHER_CONSUMER_ACTIVE_DEFAULT);
+        defaults.setPropertyIfNull(RECEIVER_CONSUMER_ACTIVE_PROPNAME, RECEIVER_CONSUMER_ACTIVE_DEFAULT);
         defaults.setPropertyIfNull(SEND_DESTINATION_NAME_ROOT_PROPNAME, SEND_DESTINATION_NAME_ROOT_DEFAULT);
         defaults.setPropertyIfNull(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME, RECEIVE_DESTINATION_NAME_ROOT_DEFAULT);
         defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
@@ -294,10 +313,173 @@
         defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
         defaults.setPropertyIfNull(DURABLE_SUBSCRIPTION_PROPNAME, DURABLE_SUBSCRIPTION_DEFAULT);
         defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
-        defaults.setPropertyIfNull(PREFECTH_PROPNAME, PREFETCH_DEFAULT);
+        defaults.setPropertyIfNull(PREFETCH_PROPNAME, PREFETCH_DEFAULT);
         defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT);
         defaults.setPropertyIfNull(EXCLUSIVE_PROPNAME, EXCLUSIVE_DEFAULT);
         defaults.setPropertyIfNull(IMMEDIATE_PROPNAME, IMMEDIATE_DEFAULT);
         defaults.setPropertyIfNull(MANDATORY_PROPNAME, MANDATORY_DEFAULT);
+    }
+
+    /**
+     * Creates a test configuration based on the defaults.
+     */
+    public MessagingTestConfigProperties()
+    {
+        super(defaults);
+    }
+
+    /**
+     * Creates a test configuration based on the supplied properties.
+     *
+     * @param properties The test configuration.
+     */
+    public MessagingTestConfigProperties(Properties properties)
+    {
+        super(properties);
+    }
+
+    public int getMessageSize()
+    {
+        return getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
+    }
+
+    public boolean getPublisherProducerBind()
+    {
+        return getPropertyAsBoolean(PUBLISHER_PRODUCER_BIND_PROPNAME);
+    }
+
+    public boolean getPublisherConsumerBind()
+    {
+        return getPropertyAsBoolean(PUBLISHER_CONSUMER_BIND_PROPNAME);
+    }
+
+    public boolean getReceiverProducerBind()
+    {
+        return getPropertyAsBoolean(RECEIVER_PRODUCER_BIND_PROPNAME);
+    }
+
+    public boolean getReceiverConsumerBind()
+    {
+        return getPropertyAsBoolean(RECEIVER_CONSUMER_BIND_PROPNAME);
+    }
+
+    public boolean getPublisherConsumerActive()
+    {
+        return getPropertyAsBoolean(PUBLISHER_CONSUMER_ACTIVE_PROPNAME);
+    }
+
+    public boolean getReceiverConsumerActive()
+    {
+        return getPropertyAsBoolean(RECEIVER_CONSUMER_ACTIVE_PROPNAME);
+    }
+
+    public String getSendDestinationNameRoot()
+    {
+        return getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME);
+    }
+
+    public String getReceiveDestinationNameRoot()
+    {
+        return getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME);
+    }
+
+    public boolean getPersistentMode()
+    {
+        return getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
+    }
+
+    public boolean getTransacted()
+    {
+        return getPropertyAsBoolean(TRANSACTED_PROPNAME);
+    }
+
+    public String getBroker()
+    {
+        return getProperty(BROKER_PROPNAME);
+    }
+
+    public String getVirtualHost()
+    {
+        return getProperty(VIRTUAL_HOST_PROPNAME);
+    }
+
+    public String getRate()
+    {
+        return getProperty(RATE_PROPNAME);
+    }
+
+    public boolean getPubsub()
+    {
+        return getPropertyAsBoolean(PUBSUB_PROPNAME);
+    }
+
+    public String getUsername()
+    {
+        return getProperty(USERNAME_PROPNAME);
+    }
+
+    public String getPassword()
+    {
+        return getProperty(PASSWORD_PROPNAME);
+    }
+
+    public int getDestinationCount()
+    {
+        return getPropertyAsInteger(DESTINATION_COUNT_PROPNAME);
+    }
+
+    public long getTimeout()
+    {
+        return getPropertyAsLong(TIMEOUT_PROPNAME);
+    }
+
+    public int getTxBatchSize()
+    {
+        return getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME);
+    }
+
+    public boolean getDurableDests()
+    {
+        return getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
+    }
+
+    public int getAckMode()
+    {
+        return getPropertyAsInteger(ACK_MODE_PROPNAME);
+    }
+
+    public boolean getDurableSubscription()
+    {
+        return getPropertyAsBoolean(DURABLE_SUBSCRIPTION_PROPNAME);
+    }
+
+    public int getMaxPending()
+    {
+        return getPropertyAsInteger(MAX_PENDING_PROPNAME);
+    }
+
+    public int getPrefecth()
+    {
+        return getPropertyAsInteger(PREFETCH_PROPNAME);
+    }
+
+    public boolean getNoLocal()
+    {
+        return getPropertyAsBoolean(NO_LOCAL_PROPNAME);
+    }
+
+    public boolean getExclusive()
+    {
+        return getPropertyAsBoolean(EXCLUSIVE_PROPNAME);
+    }
+
+    public boolean getImmediate()
+    {
+        return getPropertyAsBoolean(IMMEDIATE_PROPNAME);
+    }
+
+    public boolean getMandatory()
+    {
+        return getPropertyAsBoolean(MANDATORY_PROPNAME);
     }
 }

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Publisher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Publisher.java?view=diff&rev=561855&r1=561854&r2=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Publisher.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Publisher.java Wed Aug  1 09:19:31 2007
@@ -21,17 +21,17 @@
 package org.apache.qpid.test.framework;
 
 /**
- * A Publisher is a {@link CircuitEnd} that represents one end of a test circuit. Its main purpose is to
- * provide assertions that can be applied to test the behaviour of the publisher.
+ * A Publisher is a {@link CircuitEnd} that represents the status of the publishing side of a test circuit. Its main
+ * purpose is to provide assertions that can be applied to test the behaviour of the publishers.
  *
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities
- * <tr><td> Provide assertion that the publisher received no exceptions.
- * <tr><td> Provide assertion that the publisher received a no consumers error code.
- * <tr><td> Provide assertion that the publisher received a no route error code.
+ * <tr><td> Provide assertion that the publishers received no exceptions.
+ * <tr><td> Provide assertion that the publishers received a no consumers error code on every message.
+ * <tr><td> Provide assertion that the publishers received a no route error code on every message.
  * </table>
  */
-public interface Publisher extends CircuitEnd
+public interface Publisher
 {
     /**
      * Provides an assertion that the publisher encountered no exceptions.

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Receiver.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Receiver.java?view=diff&rev=561855&r1=561854&r2=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Receiver.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Receiver.java Wed Aug  1 09:19:31 2007
@@ -21,8 +21,8 @@
 package org.apache.qpid.test.framework;
 
 /**
- * A Receiver is a {@link CircuitEnd} that represents one end of a test circuit. Its main purpose is to
- * provide assertions that can be applied to test the behaviour of the receivers.
+ * A Receiver is a {@link CircuitEnd} that represents the status of the receiving side of a test circuit. Its main
+ * purpose is to provide assertions that can be applied to check the behaviour of the receivers.
  *
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities
@@ -30,7 +30,7 @@
  * <tr><td> Provide assertion that the receivers received all test messages sent to it.
  * </table>
  */
-public interface Receiver extends CircuitEnd
+public interface Receiver
 {
     /**
      * Provides an assertion that the receivers encountered no exceptions.

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java?view=diff&rev=561855&r1=561854&r2=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedCircuitImpl.java Wed Aug  1 09:19:31 2007
@@ -20,11 +20,21 @@
  */
 package org.apache.qpid.test.framework.distributedcircuit;
 
-import org.apache.qpid.test.framework.Assertion;
-import org.apache.qpid.test.framework.Circuit;
-import org.apache.qpid.test.framework.Publisher;
-import org.apache.qpid.test.framework.Receiver;
+import org.apache.log4j.Logger;
 
+import org.apache.qpid.test.framework.*;
+import org.apache.qpid.util.ConversationFactory;
+
+import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -42,9 +52,199 @@
  * <tr><td> Send test messages over the circuit.
  * <tr><td> Perform the default test procedue on the circuit.
  * </table>
+ *
+ * @todo There is a short pause after receiving sender reports before asking for receiver reports, because receivers may
+ *       not have finished receiving all their test messages before the report request arrives. This is going to be a
+ *       problem for taking test timings and needs to be eliminiated. Suggested solution: have receiver send back reports
+ *       asynchronously, on test batch size boundaries, and do so automatically rather than having to have the report
+ *       request sent to them. Number each test run, or otherwise uniquely identify it, when a receiver does not get
+ *       any more messages on a test run for more than a timeout, it can assume the test is complete and send a final
+ *       report. On the coordinator end a future will need to be created to wait for all final reports to come in, and
+ *       to register results and timings for the test. This must work in such a way that a new test cycle can be started
+ *       without waiting for the results of the old one to come in.
+ *
+ * @todo Test circuits to be created per test thread, not per test method call. Per-thread setup and tear down to be
+ *       reposible for circuit creation and clean up. Many individual test method calls to run over the same circuit.
+ *       Important, otherwise test results will be skewed by circuit creation overheads.
+ *
  */
-public class DistributedCircuitImpl implements Circuit
+public class DistributedCircuitImpl implements Circuit, TimingControllerAware
 {
+    /** Used for debugging purposes. */
+    private static Logger log = Logger.getLogger(DistributedCircuitImpl.class);
+
+    /** Holds the conversation factory over which to coordinate the test. */
+    protected ConversationFactory conversationFactory;
+
+    /** Holds the controlSession over which to hold the control conversation. */
+    protected Session controlSession;
+
+    /** Holds the sender nodes in the test circuit. */
+    protected List<TestClientDetails> senders;
+
+    /** Holds the receiver nodes in the test circuit. */
+    protected List<TestClientDetails> receivers;
+
+    /** Holds the sender control conversations. */
+    protected ConversationFactory.Conversation[] senderConversation;
+
+    /** Holds the receiver control conversations. */
+    protected ConversationFactory.Conversation[] receiverConversation;
+
+    /** Holds the control topics for the senders in the test circuit. */
+    protected Destination[] senderControlTopic;
+
+    /** Holds the control topics for the receivers in the test circuit. */
+    protected Destination[] receiverControlTopic;
+
+    /** Holds the number of messages to send per test run. */
+    protected int numMessages;
+
+    /**
+     * Holds the timing controller for the circuit. This is used to log test times asynchronously, when reciever nodes
+     * return their reports after senders have completed a test case.
+     */
+    TimingController timingController;
+
+    /**
+     * Creates a distributed test circuit on the specified senders and receivers.
+     *
+     * @param session              The controlSession for all control conversations.
+     * @param senders              The senders.
+     * @param receivers            The receivers.
+     * @param senderConversation   A control conversation with the senders.
+     * @param receiverConversation A control conversation with the receivers.
+     * @param senderControlTopic   The senders control topic.
+     * @param receiverControlTopic The receivers control topic.
+     */
+    protected DistributedCircuitImpl(Session session, List<TestClientDetails> senders, List<TestClientDetails> receivers,
+        ConversationFactory.Conversation[] senderConversation, ConversationFactory.Conversation[] receiverConversation,
+        Destination[] senderControlTopic, Destination[] receiverControlTopic)
+    {
+        this.controlSession = session;
+        this.senders = senders;
+        this.receivers = receivers;
+        this.senderConversation = senderConversation;
+        this.receiverConversation = receiverConversation;
+        this.senderControlTopic = senderControlTopic;
+        this.receiverControlTopic = receiverControlTopic;
+    }
+
+    /**
+     * Creates a distributed test circuit from the specified test parameters, on the senders and receivers
+     * given.
+     *
+     * @param testProps           The test parameters.
+     * @param senders             The sender ends in the test circuit.
+     * @param receivers           The receiver ends in the test circuit.
+     * @param conversationFactory A conversation factory for creating the control conversations with senders and receivers.
+     *
+     * @return A connected and ready to start, test circuit.
+     */
+    public static Circuit createCircuit(ParsedProperties testProps, List<TestClientDetails> senders,
+        List<TestClientDetails> receivers, ConversationFactory conversationFactory)
+    {
+        log.debug("public static Circuit createCircuit(ParsedProperties testProps, List<TestClientDetails> senders, "
+            + " List<TestClientDetails> receivers, ConversationFactory conversationFactory)");
+
+        try
+        {
+            Session session = conversationFactory.getSession();
+
+            // Create control conversations with each of the senders.
+            ConversationFactory.Conversation[] senderConversation = new ConversationFactory.Conversation[senders.size()];
+            Destination[] senderControlTopic = new Destination[senders.size()];
+
+            for (int i = 0; i < senders.size(); i++)
+            {
+                TestClientDetails sender = senders.get(i);
+
+                senderControlTopic[i] = session.createTopic(sender.privateControlKey);
+                senderConversation[i] = conversationFactory.startConversation();
+            }
+
+            log.debug("Sender conversations created.");
+
+            // Create control conversations with each of the receivers.
+            ConversationFactory.Conversation[] receiverConversation = new ConversationFactory.Conversation[receivers.size()];
+            Destination[] receiverControlTopic = new Destination[receivers.size()];
+
+            for (int i = 0; i < receivers.size(); i++)
+            {
+                TestClientDetails receiver = receivers.get(i);
+
+                receiverControlTopic[i] = session.createTopic(receiver.privateControlKey);
+                receiverConversation[i] = conversationFactory.startConversation();
+            }
+
+            log.debug("Receiver conversations created.");
+
+            // Assign the sender role to each of the sending test clients.
+            for (int i = 0; i < senders.size(); i++)
+            {
+                TestClientDetails sender = senders.get(i);
+
+                Message assignSender = conversationFactory.getSession().createMessage();
+                TestUtils.setPropertiesOnMessage(assignSender, testProps);
+                assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
+                assignSender.setStringProperty("ROLE", "SENDER");
+
+                senderConversation[i].send(senderControlTopic[i], assignSender);
+            }
+
+            log.debug("Sender role assignments sent.");
+
+            // Assign the receivers role to each of the receiving test clients.
+            for (int i = 0; i < receivers.size(); i++)
+            {
+                TestClientDetails receiver = receivers.get(i);
+
+                Message assignReceiver = session.createMessage();
+                TestUtils.setPropertiesOnMessage(assignReceiver, testProps);
+                assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
+                assignReceiver.setStringProperty("ROLE", "RECEIVER");
+
+                receiverConversation[i].send(receiverControlTopic[i], assignReceiver);
+            }
+
+            log.debug("Receiver role assignments sent.");
+
+            // Wait for the senders and receivers to confirm their roles.
+            for (int i = 0; i < senders.size(); i++)
+            {
+                senderConversation[i].receive();
+            }
+
+            log.debug("Got all sender role confirmations");
+
+            for (int i = 0; i < receivers.size(); i++)
+            {
+                receiverConversation[i].receive();
+            }
+
+            log.debug("Got all receiver role confirmations");
+
+            // Package everything up as a circuit.
+            return new DistributedCircuitImpl(session, senders, receivers, senderConversation, receiverConversation,
+                    senderControlTopic, receiverControlTopic);
+        }
+        catch (JMSException e)
+        {
+            throw new RuntimeException("JMSException not handled.");
+        }
+    }
+
+    /**
+     * Used by tests cases that can supply a {@link uk.co.thebadgerset.junit.extensions.TimingController} to set the
+     * controller on an aware test.
+     *
+     * @param controller The timing controller.
+     */
+    public void setTimingController(TimingController controller)
+    {
+        this.timingController = controller;
+    }
+
     /**
      * Gets the interface on the publishing end of the circuit.
      *
@@ -70,16 +270,132 @@
      */
     public void start()
     {
-        throw new RuntimeException("Not Implemented.");
+        log.debug("public void start(): called");
+
+        try
+        {
+            // Start the test on each of the senders.
+            Message start = controlSession.createMessage();
+            start.setStringProperty("CONTROL_TYPE", "START");
+            start.setIntProperty("MESSAGE_COUNT", numMessages);
+
+            for (int i = 0; i < senders.size(); i++)
+            {
+                senderConversation[i].send(senderControlTopic[i], start);
+            }
+
+            log.debug("All senders told to start their tests.");
+        }
+        catch (JMSException e)
+        {
+            throw new RuntimeException("Unhandled JMSException.", e);
+        }
     }
 
     /**
      * Checks the test circuit. The effect of this is to gather the circuits state, for both ends of the circuit,
      * into a report, against which assertions may be checked.
+     *
+     * @todo Replace the asynch receiver report thread with a choice of direct os asynch executor, so that asynch
+     *       or synch logging of test timings is optional. Also need to provide an onMessage method that is capable
+     *       of receiving timing reports that receivers will generate during an ongoing test, on the test sample
+     *       size boundaries. The message timing logging code should be factored out as a common method that can
+     *       be called in response to the final report responses, or the onMessage method. Another alternative is
+     *       to abandon the final report request altogether and just use the onMessage method? I think the two
+     *       differ though, as the final report is used to apply assertions, and the ongoing report is just for
+     *       periodic timing results... In which case, maybe there needs to be a way for the onMessage method
+     *       to process just some of the incoming messages, and forward the rest on to the conversion helper, as
+     *       a sort of pre-conversation helper filter?
      */
     public void check()
     {
-        throw new RuntimeException("Not Implemented.");
+        log.debug("public void check(): called");
+
+        try
+        {
+            // Wait for all the test senders to return their reports.
+            for (int i = 0; i < senders.size(); i++)
+            {
+                Message senderReport = senderConversation[i].receive();
+                log.debug("Sender " + senderReport.getStringProperty("CLIENT_NAME") + " reports message count: "
+                    + senderReport.getIntProperty("MESSAGE_COUNT"));
+                log.debug("Sender " + senderReport.getStringProperty("CLIENT_NAME") + " reports message time: "
+                    + senderReport.getLongProperty("TEST_TIME"));
+            }
+
+            log.debug("Got all sender test reports.");
+
+            // Apply sender assertions to pass/fail the tests.
+
+            // Inject a short pause to give the receivers time to finish receiving their test messages.
+            TestUtils.pause(500);
+
+            // Ask the receivers for their reports.
+            Message statusRequest = controlSession.createMessage();
+            statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST");
+
+            for (int i = 0; i < receivers.size(); i++)
+            {
+                receiverConversation[i].send(receiverControlTopic[i], statusRequest);
+            }
+
+            log.debug("All receiver test reports requested.");
+
+            // Wait for all receiver reports to come in, but do so asynchronously.
+            Runnable gatherAllReceiverReports =
+                new Runnable()
+                {
+                    public void run()
+                    {
+                        try
+                        {
+                            // Wait for all the receivers to send their reports.
+                            for (int i = 0; i < receivers.size(); i++)
+                            {
+                                Message receiverReport = receiverConversation[i].receive();
+
+                                String clientName = receiverReport.getStringProperty("CLIENT_NAME");
+                                int messageCount = receiverReport.getIntProperty("MESSAGE_COUNT");
+                                long testTime = receiverReport.getLongProperty("TEST_TIME");
+
+                                log.debug("Receiver " + clientName + " reports message count: " + messageCount);
+                                log.debug("Receiver " + receiverReport.getStringProperty("CLIENT_NAME")
+                                    + " reports message time: " + testTime);
+
+                                // Apply receiver assertions to pass/fail the tests.
+
+                                // Log the test timings on the asynchronous test timing controller.
+                                try
+                                {
+                                    timingController.completeTest(true, messageCount, testTime);
+                                }
+                                // The timing controll can throw InterruptedException is the current test is to be
+                                // interrupted.
+                                catch (InterruptedException e)
+                                {
+                                    e.printStackTrace();
+                                }
+                            }
+
+                            log.debug("All receiver test reports received.");
+                        }
+                        catch (JMSException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                };
+
+            Thread receiverReportsThread = new Thread(gatherAllReceiverReports);
+            receiverReportsThread.start();
+
+            // return new Message[] { senderReport, receiverReport };
+
+        }
+        catch (JMSException e)
+        {
+            throw new RuntimeException("Unhandled JMSException.", e);
+        }
     }
 
     /**
@@ -87,19 +403,34 @@
      */
     public void close()
     {
-        throw new RuntimeException("Not Implemented.");
+        log.debug("public void close(): called");
+
+        // End the current test on all senders and receivers.
     }
 
     /**
-     * Applied a list of assertions against the test circuit. The {@link #check()} method should be called before doing
+     * Applies a list of assertions against the test circuit. The {@link #check()} method should be called before doing
      * this, to ensure that the circuit has gathered its state into a report to assert against.
      *
      * @param assertions The list of assertions to apply.
+     *
      * @return Any assertions that failed.
      */
     public List<Assertion> applyAssertions(List<Assertion> assertions)
     {
-        throw new RuntimeException("Not Implemented.");
+        log.debug("public List<Assertion> applyAssertions(List<Assertion> assertions = " + assertions + "): called");
+
+        List<Assertion> failures = new LinkedList<Assertion>();
+
+        for (Assertion assertion : assertions)
+        {
+            if (!assertion.apply())
+            {
+                failures.add(assertion);
+            }
+        }
+
+        return failures;
     }
 
     /**
@@ -107,10 +438,34 @@
      *
      * @param numMessages The number of messages to send using the default test procedure.
      * @param assertions  The list of assertions to apply.
+     *
      * @return Any assertions that failed.
+     *
+     * @todo From check onwards needs to be handled as a future. The future must call back onto the test case to
+     *       report results asynchronously.
      */
     public List<Assertion> test(int numMessages, List<Assertion> assertions)
     {
-        throw new RuntimeException("Not Implemented.");
+        log.debug("public List<Assertion> test(int numMessages = " + numMessages + ", List<Assertion> assertions = "
+            + assertions + "): called");
+
+        // Keep the number of messages to send per test run, where the send method can reference it.
+        this.numMessages = numMessages;
+
+        // Start the test running on all sender circuit ends.
+        start();
+
+        // Request status reports to be handed in.
+        check();
+
+        // Assert conditions on the publishing end of the circuit.
+        // Assert conditions on the receiving end of the circuit.
+        List<Assertion> failures = applyAssertions(assertions);
+
+        // Close the circuit ending the current test case.
+        close();
+
+        // Pass with no failed assertions or fail with a list of failed assertions.
+        return failures;
     }
 }

Added: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedPublisherImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedPublisherImpl.java?view=auto&rev=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedPublisherImpl.java (added)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedPublisherImpl.java Wed Aug  1 09:19:31 2007
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.framework.distributedcircuit;
+
+import org.apache.qpid.test.framework.Assertion;
+import org.apache.qpid.test.framework.Publisher;
+
+/**
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td>
+ * </table>
+ */
+public class DistributedPublisherImpl implements Publisher
+{
+    /**
+     * Provides an assertion that the publisher encountered no exceptions.
+     *
+     * @return An assertion that the publisher encountered no exceptions.
+     */
+    public Assertion noExceptionsAssertion()
+    {
+        throw new RuntimeException("Not implemented.");
+    }
+
+    /**
+     * Provides an assertion that the publisher got a no consumers exception on every message.
+     *
+     * @return An assertion that the publisher got a no consumers exception on every message.
+     */
+    public Assertion noConsumersAssertion()
+    {
+        throw new RuntimeException("Not implemented.");
+    }
+
+    /**
+     * Provides an assertion that the publisher got a no rout exception on every message.
+     *
+     * @return An assertion that the publisher got a no rout exception on every message.
+     */
+    public Assertion noRouteAssertion()
+    {
+        throw new RuntimeException("Not implemented.");
+    }
+}

Added: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedReceiverImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedReceiverImpl.java?view=auto&rev=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedReceiverImpl.java (added)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/DistributedReceiverImpl.java Wed Aug  1 09:19:31 2007
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.framework.distributedcircuit;
+
+import org.apache.qpid.test.framework.Assertion;
+import org.apache.qpid.test.framework.Receiver;
+
+/**
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td>
+ * </table>
+ */
+public class DistributedReceiverImpl implements Receiver
+{
+    /**
+     * Provides an assertion that the receivers encountered no exceptions.
+     *
+     * @return An assertion that the receivers encountered no exceptions.
+     */
+    public Assertion noExceptionsAssertion()
+    {
+        throw new RuntimeException("Not implemented.");
+    }
+
+    /**
+     * Provides an assertion that the receivers got all messages that were sent to it.
+     *
+     * @return An assertion that the receivers got all messages that were sent to it.
+     */
+    public Assertion allMessagesAssertion()
+    {
+        throw new RuntimeException("Not implemented.");
+    }
+}

Added: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java?view=auto&rev=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java (added)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedcircuit/TestClientCircuitEnd.java Wed Aug  1 09:19:31 2007
@@ -0,0 +1,315 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.framework.distributedcircuit;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.test.framework.*;
+import org.apache.qpid.test.framework.distributedtesting.TestClientControlledTest;
+import org.apache.qpid.test.framework.localcircuit.LocalCircuitImpl;
+
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+import javax.jms.*;
+
+/**
+ * A TestClientCircuitEnd is a {@link CircuitEnd} that may be controlled from a
+ * {@link org.apache.qpid.test.framework.distributedtesting.TestClient}, and that forms a single publishing or
+ * receiving end point in a distributed test {@link org.apache.qpid.test.framework.Circuit}.
+ *
+ * <p/>When operating in the SENDER role, this circuit end is capable of acting as part of the default circuit test
+ * procedure (described in the class comment for {@link org.apache.qpid.test.framework.Circuit}). That is, it will
+ * send the number of test messages required, using the test configuration parameters given in the test invite, and
+ * return a report on its activities to the circuit controller.
+ *
+ * <p/>When operation in the RECEIVER role, this circuit end acts as part of the default circuit test procedure. It will
+ * receive test messages, on the setup specified in the test configuration parameters, and keep count of the messages
+ * received, and time taken to receive them. When requested by the circuit controller to provide a report, it will
+ * return this report of its activities.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td>
+ * </table>
+ */
+public class TestClientCircuitEnd implements CircuitEnd, TestClientControlledTest
+{
+    /** Used for debugging. */
+    Logger log = Logger.getLogger(TestClientCircuitEnd.class);
+
+    /** Holds the test parameters. */
+    ParsedProperties testProps;
+
+    /** The number of test messages to send. */
+    private int numMessages;
+
+    /** The role to be played by the test. */
+    private Roles role;
+
+    /** The connection to send the test messages on. */
+    private Connection connection;
+
+    /** Holds the circuit end for this test. */
+    CircuitEnd circuitEnd;
+
+    /**
+     * Holds a message monitor for this circuit end, either the monitor on the consumer when in RECEIVER more, or
+     * a monitor updated on every message sent, when acting as a SENDER.
+     */
+    MessageMonitor messageMonitor;
+
+    /**
+     * Should provide the name of the test case that this class implements. The exact names are defined in the
+     * interop testing spec.
+     *
+     * @return The name of the test case that this implements.
+     */
+    public String getName()
+    {
+        return "DEFAULT_CIRCUIT_TEST";
+    }
+
+    /**
+     * Determines whether the test invite that matched this test case is acceptable.
+     *
+     * @param inviteMessage The invitation to accept or reject.
+     * @return <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it.
+     *
+     * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+     */
+    public boolean acceptInvite(Message inviteMessage) throws JMSException
+    {
+        log.debug("public boolean acceptInvite(Message inviteMessage): called");
+
+        // Populate the test parameters from the invitation.
+        testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+        for (Object key : testProps.keySet())
+        {
+            String propName = (String) key;
+
+            // If the test parameters is overridden by the invitation, use it instead.
+            String inviteValue = inviteMessage.getStringProperty(propName);
+
+            if (inviteValue != null)
+            {
+                testProps.setProperty(propName, inviteValue);
+                log.debug("Test invite supplied override to " + propName + " of " + inviteValue);
+            }
+
+        }
+
+        // Accept the invitation.
+        return true;
+    }
+
+    /**
+     * Assigns the role to be played by this test case. The test parameters are fully specified in the
+     * assignment message. When this method return the test case will be ready to execute.
+     *
+     * @param role              The role to be played; sender or receivers.
+     * @param assignRoleMessage The role assingment message, contains the full test parameters.
+     *
+     * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+     */
+    public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
+    {
+        log.debug("public void assignRole(Roles role, Message assignRoleMessage): called");
+
+        // Take note of the role to be played.
+        this.role = role;
+
+        // Extract and retain the test parameters.
+        numMessages = 1; // assignRoleMessage.getIntProperty("NUM_MESSAGES");
+
+        // Connect using the test parameters.
+        connection = TestUtils.createConnection(testProps);
+
+        // Create a circuit end that matches the assigned role and test parameters.
+        switch (role)
+        {
+        // Check if the sender role is being assigned, and set up a message producer if so.
+        case SENDER:
+
+            // Set up the publisher.
+            circuitEnd = LocalCircuitImpl.createPublisherCircuitEnd(connection, testProps, 0L);
+
+            // Create a custom message monitor that will be updated on every message sent.
+            messageMonitor = new MessageMonitor();
+
+            break;
+
+        // Otherwise the receivers role is being assigned, so set this up to listen for messages.
+        case RECEIVER:
+
+            // Set up the receiver.
+            circuitEnd = LocalCircuitImpl.createReceiverCircuitEnd(connection, testProps, 0L);
+
+            // Use the message monitor from the consumer for stats.
+            messageMonitor = getMessageMonitor();
+
+            break;
+        }
+
+        // Reset all messaging stats for the report.
+        messageMonitor.reset();
+
+        connection.start();
+    }
+
+    /**
+     * Performs the test case actions. Returning from here, indicates that the sending role has completed its test.
+     *
+     * @param numMessages The number of test messages to send.
+     *
+     * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+     *
+     * @todo Add round robin on destinations where multiple destinations being used.
+     *
+     * @todo Add rate limiting when rate limit specified on publishers.
+     *
+     * @todo Add Max pending message size protection. The receiver will have to send back some acks once in a while,
+     *       to notify the publisher that its messages are being consumed. This makes the safety valve harder to
+     *       implement than in the single VM case. For example, if the limit is 1000 messages, might want to get back
+     *       an ack every 500, to notify the publisher that it can keep sending. What about pub/sub tests? Will it be
+     *       necessary to wait for an ack from every receiver? This will have the effect of rate limiting to slow
+     *       consumers too.
+     *
+     * @todo Add commits on every commit batch size boundary.
+     */
+    public void start(int numMessages) throws JMSException
+    {
+        log.debug("public void start(): called");
+
+        // If in the SENDER role, send the specified number of test messages to the circuit destinations.
+        if (role.equals(Roles.SENDER))
+        {
+            Message testMessage = getSession().createMessage();
+
+            for (int i = 0; i < numMessages; i++)
+            {
+                getProducer().send(testMessage);
+
+                // Increment the message count and timings.
+                messageMonitor.onMessage(testMessage);
+            }
+        }
+    }
+
+    /**
+     * Gets a report on the actions performed by the test case in its assigned role.
+     *
+     * @param session The controlSession to create the report message in.
+     * @return The report message.
+     *
+     * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through.
+     */
+    public Message getReport(Session session) throws JMSException
+    {
+        Message report = session.createMessage();
+        report.setStringProperty("CONTROL_TYPE", "REPORT");
+
+        // Add the count of messages sent/received to the report.
+        report.setIntProperty("MESSAGE_COUNT", messageMonitor.getNumMessage());
+
+        // Add the time to send/receive messages to the report.
+        report.setLongProperty("TEST_TIME", messageMonitor.getTime());
+
+        // Add any exceptions detected to the report.
+
+        return report;
+    }
+
+    /**
+     * Gets the message producer at this circuit end point.
+     *
+     * @return The message producer at with this circuit end point.
+     */
+    public MessageProducer getProducer()
+    {
+        return circuitEnd.getProducer();
+    }
+
+    /**
+     * Gets the message consumer at this circuit end point.
+     *
+     * @return The message consumer at this circuit end point.
+     */
+    public MessageConsumer getConsumer()
+    {
+        return circuitEnd.getConsumer();
+    }
+
+    /**
+     * Send the specified message over the producer at this end point.
+     *
+     * @param message The message to send.
+     *
+     * @throws JMSException Any JMS exception occuring during the send is allowed to fall through.
+     */
+    public void send(Message message) throws JMSException
+    {
+        // Send the message on the circuit ends producer.
+        circuitEnd.send(message);
+    }
+
+    /**
+     * Gets the JMS Session associated with this circuit end point.
+     *
+     * @return The JMS Session associated with this circuit end point.
+     */
+    public Session getSession()
+    {
+        return circuitEnd.getSession();
+    }
+
+    /**
+     * Closes the message producers and consumers and the sessions, associated with this circuit end point.
+     *
+     * @throws JMSException Any JMSExceptions occurring during the close are allowed to fall through.
+     */
+    public void close() throws JMSException
+    {
+        // Close the producer and consumer.
+        circuitEnd.close();
+    }
+
+    /**
+     * Returns the message monitor for reporting on received messages on this circuit end.
+     *
+     * @return The message monitor for this circuit end.
+     */
+    public MessageMonitor getMessageMonitor()
+    {
+        return circuitEnd.getMessageMonitor();
+    }
+
+    /**
+     * Returns the exception monitor for reporting on exceptions received on this circuit end.
+     *
+     * @return The exception monitor for this circuit end.
+     */
+    public ExceptionMonitor getExceptionMonitor()
+    {
+        return circuitEnd.getExceptionMonitor();
+    }
+}

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java?view=diff&rev=561855&r1=561854&r2=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/Coordinator.java Wed Aug  1 09:19:31 2007
@@ -25,18 +25,23 @@
 import junit.framework.TestSuite;
 
 import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
 
+import org.apache.qpid.test.framework.FrameworkBaseCase;
 import org.apache.qpid.test.framework.MessagingTestConfigProperties;
-import org.apache.qpid.test.framework.TestUtils;
 import org.apache.qpid.test.framework.TestClientDetails;
+import org.apache.qpid.test.framework.TestUtils;
 import org.apache.qpid.test.framework.listeners.XMLTestListener;
 import org.apache.qpid.util.ConversationFactory;
 import org.apache.qpid.util.PrettyPrintingUtils;
 
+import uk.co.thebadgerset.junit.extensions.AsymptoticTestDecorator;
 import uk.co.thebadgerset.junit.extensions.TKTestResult;
 import uk.co.thebadgerset.junit.extensions.TKTestRunner;
 import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+import uk.co.thebadgerset.junit.extensions.listeners.CSVTestListener;
 import uk.co.thebadgerset.junit.extensions.util.CommandLineParser;
+import uk.co.thebadgerset.junit.extensions.util.MathUtils;
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
 import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
 
@@ -120,17 +125,43 @@
     /** Flag that indicates that all test clients should be terminated upon completion of the test cases. */
     protected boolean terminate;
 
+    /** Flag that indicates the CSV results listener should be used to output results. */
+    protected boolean csvResults;
+
+    /** Flag that indiciates the XML results listener should be used to output results. */
+    protected boolean xmlResults;
+
     /**
      * Creates an interop test coordinator on the specified broker and virtual host.
      *
-     * @param brokerUrl   The URL of the broker to connect to.
-     * @param virtualHost The virtual host to run all tests on. Optional, may be <tt>null</tt>.
-     * @param reportDir   The directory to write out test results to.
-     * @param engine      The distributed test engine type to run the tests with.
+     * @param repetitions   The number of times to repeat the test, or test batch size.
+     * @param duration      The length of time to run the tests for. -1 means no duration has been set.
+     * @param threads       The concurrency levels to ramp up to.
+     * @param delay         A delay in milliseconds between test runs.
+     * @param params        The sets of 'size' parameters to pass to test.
+     * @param testCaseName  The name of the test case to run.
+     * @param reportDir     The directory to output the test results to.
+     * @param runName       The name of the test run; used to name the output file.
+     * @param verbose       Whether to print comments during test run.
+     * @param brokerUrl     The URL of the broker to connect to.
+     * @param virtualHost   The virtual host to run all tests on. Optional, may be <tt>null</tt>.
+     * @param engine        The distributed test engine type to run the tests with.
+     * @param terminate     <tt>true</tt> if test client nodes should be terminated at the end of the tests.
+     * @param csv           <tt>true</tt> if the CSV results listener should be attached.
+     * @param xml           <tt>true</tt> if the XML results listener should be attached.
      */
-    public Coordinator(String brokerUrl, String virtualHost, String reportDir, TestEngine engine, boolean terminate)
+    public Coordinator(Integer repetitions, Long duration, int[] threads, int delay, int[] params, String testCaseName,
+        String reportDir, String runName, boolean verbose, String brokerUrl, String virtualHost, TestEngine engine,
+        boolean terminate, boolean csv, boolean xml)
     {
-        log.debug("Coordinator(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+        super(repetitions, duration, threads, delay, params, testCaseName, reportDir, runName, verbose);
+
+        log.debug("public Coordinator(Integer repetitions = " + repetitions + " , Long duration = " + duration
+            + ", int[] threads = " + Arrays.toString(threads) + ", int delay = " + delay + ", int[] params = "
+            + Arrays.toString(params) + ", String testCaseName = " + testCaseName + ", String reportDir = " + reportDir
+            + ", String runName = " + runName + ", boolean verbose = " + verbose + ", String brokerUrl = " + brokerUrl
+            + ", String virtualHost =" + virtualHost + ", TestEngine engine = " + engine + ", boolean terminate = "
+            + terminate + ", boolean csv = " + csv + ", boolean xml = " + xml + "): called");
 
         // Retain the connection parameters.
         this.brokerUrl = brokerUrl;
@@ -138,6 +169,8 @@
         this.reportDir = reportDir;
         this.engine = engine;
         this.terminate = terminate;
+        this.csvResults = csv;
+        this.xmlResults = xml;
     }
 
     /**
@@ -158,6 +191,8 @@
      */
     public static void main(String[] args)
     {
+        NDC.push("coordinator");
+        log.debug("public static void main(String[] args = " + Arrays.toString(args) + "): called");
         console.info("Qpid Distributed Test Coordinator.");
 
         // Override the default broker url to be localhost:5672.
@@ -181,7 +216,25 @@
                                     "e", "The test execution engine to use. Default is interop.", "engine", "interop",
                                     "^interop$|^fanout$", "true"
                                 },
-                                { "t", "Terminate test clients on completion of tests.", "flag", "false" }
+                                { "t", "Terminate test clients on completion of tests.", null, "false" },
+                                { "-csv", "Output test results in CSV format.", null, "false" },
+                                { "-xml", "Output test results in XML format.", null, "false" },
+                                {
+                                    "c", "The number of tests to run concurrently.", "num", "false",
+                                    MathUtils.SEQUENCE_REGEXP
+                                },
+                                { "r", "The number of times to repeat each test.", "num", "false" },
+                                {
+                                    "d", "The length of time to run the tests for.", "duration", "false",
+                                    MathUtils.DURATION_REGEXP
+                                },
+                                {
+                                    "f", "The maximum rate to call the tests at.", "frequency", "false",
+                                    "^([1-9][0-9]*)/([1-9][0-9]*)$"
+                                },
+                                { "s", "The size parameter to run tests with.", "size", "false", MathUtils.SEQUENCE_REGEXP },
+                                { "v", "Verbose mode.", null, "false" },
+                                { "n", "A name for this test run, used to name the output file.", "name", "true" }
                             }), testContextProperties));
 
             // Extract the command line options.
@@ -192,16 +245,29 @@
             String testEngine = options.getProperty("e");
             TestEngine engine = "fanout".equals(testEngine) ? TestEngine.FANOUT : TestEngine.INTEROP;
             boolean terminate = options.getPropertyAsBoolean("t");
+            boolean csvResults = options.getPropertyAsBoolean("-csv");
+            boolean xmlResults = options.getPropertyAsBoolean("-xml");
+
+            String threadsString = options.getProperty("c");
+            Integer repetitions = options.getPropertyAsInteger("r");
+            String durationString = options.getProperty("d");
+            String paramsString = options.getProperty("s");
+            boolean verbose = options.getPropertyAsBoolean("v");
+            String testRunName = options.getProperty("n");
+
+            int[] threads = (threadsString == null) ? null : MathUtils.parseSequence(threadsString);
+            int[] params = (paramsString == null) ? null : MathUtils.parseSequence(paramsString);
+            Long duration = (durationString == null) ? null : MathUtils.parseDuration(durationString);
 
             // If broker or virtual host settings were specified as command line options, override the defaults in the
             // test context properties with them.
 
             // Collection all of the test cases to be run.
-            Collection<Class<? extends DistributedTestCase>> testCaseClasses =
-                new ArrayList<Class<? extends DistributedTestCase>>();
+            Collection<Class<? extends FrameworkBaseCase>> testCaseClasses =
+                new ArrayList<Class<? extends FrameworkBaseCase>>();
 
             // Scan for available test cases using a classpath scanner.
-            // ClasspathScanner.getMatches(InteropTestCase.class, "^Test.*", true);
+            // ClasspathScanner.getMatches(DistributedTestCase.class, "^Test.*", true);
 
             // Hard code the test classes till the classpath scanner is fixed.
             // Collections.addAll(testCaseClasses, InteropTestCase1DummyRun.class, InteropTestCase2BasicP2P.class,
@@ -222,7 +288,7 @@
                 {
                     Class nextClass = Class.forName(nextFreeArg);
 
-                    if (DistributedTestCase.class.isAssignableFrom(nextClass))
+                    if (FrameworkBaseCase.class.isAssignableFrom(nextClass))
                     {
                         testCaseClasses.add(nextClass);
                         console.info("Found distributed test case: " + nextFreeArg);
@@ -237,7 +303,8 @@
             // Check that some test classes were actually found.
             if (testCaseClasses.isEmpty())
             {
-                throw new RuntimeException("No test cases implementing InteropTestCase were specified on the command line.");
+                throw new RuntimeException(
+                    "No test cases implementing DistributedTestCase were specified on the command line.");
             }
 
             // Extract the names of all the test classes, to pass to the start method.
@@ -250,7 +317,9 @@
             }
 
             // Create a coordinator and begin its test procedure.
-            Coordinator coordinator = new Coordinator(brokerUrl, virtualHost, reportDir, engine, terminate);
+            Coordinator coordinator =
+                new Coordinator(repetitions, duration, threads, 0, params, null, reportDir, testRunName, verbose, brokerUrl,
+                    virtualHost, engine, terminate, csvResults, xmlResults);
 
             TestResult testResult = coordinator.start(testClassNames);
 
@@ -306,10 +375,15 @@
         conversation.send(controlTopic, invite);
 
         // Wait for a short time, to give test clients an opportunity to reply to the invitation.
-        Collection<Message> enlists = conversation.receiveAll(0, 3000);
-
+        Collection<Message> enlists = conversation.receiveAll(0, 500);
         enlistedClients = extractEnlists(enlists);
 
+        for (TestClientDetails client : enlistedClients)
+        {
+            log.debug("Got enlisted test client: " + client);
+            console.info("Test node " + client.clientName + " available.");
+        }
+
         // Run the test in the suite using JUnit.
         TestResult result = null;
 
@@ -357,7 +431,20 @@
             clientDetails.clientName = enlist.getStringProperty("CLIENT_NAME");
             clientDetails.privateControlKey = enlist.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY");
 
-            enlistedClients.add(clientDetails);
+            String replyType = enlist.getStringProperty("CONTROL_TYPE");
+
+            if ("ENLIST".equals(replyType))
+            {
+                enlistedClients.add(clientDetails);
+            }
+            else if ("DECLINE".equals(replyType))
+            {
+                log.debug("Test client " + clientDetails.clientName + " declined the invite.");
+            }
+            else
+            {
+                log.warn("Got an unknown reply type, " + replyType + ", to the invite.");
+            }
         }
 
         return enlistedClients;
@@ -395,9 +482,9 @@
                 Test nextTest = suite.testAt(i);
                 log.debug("suite.testAt(" + i + ") = " + nextTest);
 
-                if (nextTest instanceof DistributedTestCase)
+                if (nextTest instanceof FrameworkBaseCase)
                 {
-                    log.debug("nextTest is a DistributedTestCase");
+                    log.debug("nextTest is a FrameworkBaseCase");
                 }
             }
 
@@ -408,13 +495,13 @@
         // Wrap the tests in a suitable distributed test decorator, to perform the invite/test cycle.
         targetTest = newTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
 
-        TestSuite suite = new TestSuite();
-        suite.addTest(targetTest);
+        // TestSuite suite = new TestSuite();
+        // suite.addTest(targetTest);
 
         // Wrap the tests in a scaled test decorator to them them as a 'batch' in one thread.
         // targetTest = new ScaledTestDecorator(targetTest, new int[] { 1 });
 
-        return super.doRun(suite, wait);
+        return super.doRun(targetTest, wait);
     }
 
     /**
@@ -466,20 +553,48 @@
             // Create the results file (make the name of this configurable as a command line parameter).
             Writer timingsWriter;
 
-            try
+            // Set up an XML results listener to output the timings to the results file, if requested on the command line.
+            if (xmlResults)
             {
-                File timingsFile = new File(reportDirFile, "TEST." + currentTestClassName + ".xml");
-                timingsWriter = new BufferedWriter(new FileWriter(timingsFile), 20000);
+                try
+                {
+                    File timingsFile = new File(reportDirFile, "TEST." + currentTestClassName + ".xml");
+                    timingsWriter = new BufferedWriter(new FileWriter(timingsFile), 20000);
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException("Unable to create the log file to write test results to: " + e, e);
+                }
+
+                XMLTestListener listener = new XMLTestListener(timingsWriter, currentTestClassName);
+                result.addListener(listener);
+                result.addTKTestListener(listener);
+
+                registerShutdownHook(listener);
             }
-            catch (IOException e)
+
+            // Set up an CSV results listener to output the timings to the results file, if requested on the command line.
+            if (csvResults)
             {
-                throw new RuntimeException("Unable to create the log file to write test results to: " + e, e);
-            }
+                try
+                {
+                    File timingsFile =
+                        new File(reportDirFile, testRunName + "-" + TIME_STAMP_FORMAT.format(new Date()) + "-timings.csv");
+                    timingsWriter = new BufferedWriter(new FileWriter(timingsFile), 20000);
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException("Unable to create the log file to write test results to: " + e, e);
+                }
 
-            // Set up an XML results listener to output the timings to the results file.
-            XMLTestListener listener = new XMLTestListener(timingsWriter, currentTestClassName);
-            result.addListener(listener);
-            result.addTKTestListener(listener);
+                CSVTestListener listener = new CSVTestListener(timingsWriter);
+                result.addListener(listener);
+                result.addTKTestListener(listener);
+
+                // Register the results listeners shutdown hook to flush its data if the test framework is shutdown
+                // prematurely.
+                registerShutdownHook(listener);
+            }
 
             // Register the results listeners shutdown hook to flush its data if the test framework is shutdown
             // prematurely.

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/DistributedTestDecorator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/DistributedTestDecorator.java?view=diff&rev=561855&r1=561854&r2=561855
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/DistributedTestDecorator.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/distributedtesting/DistributedTestDecorator.java Wed Aug  1 09:19:31 2007
@@ -24,8 +24,9 @@
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.test.framework.sequencers.DistributedTestSequencer;
+import org.apache.qpid.test.framework.FrameworkBaseCase;
 import org.apache.qpid.test.framework.TestClientDetails;
+import org.apache.qpid.test.framework.sequencers.CircuitFactory;
 import org.apache.qpid.util.ConversationFactory;
 
 import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
@@ -42,10 +43,9 @@
  * distributed test cases. It provides a helper method, {@link #signupClients}, that broadcasts an invitation and
  * returns the set of test clients that are available to particiapte in the test.
  *
- * <p/>When used to wrap a {@link org.apache.qpid.test.framework.FrameworkBaseCase} test, it replaces the default
- * {@link org.apache.qpid.test.framework.sequencers.TestCaseSequencer} implementations with a suitable
- * {@link org.apache.qpid.test.framework.sequencers.DistributedTestSequencer}. Concrete implementations
- * can use this to configure the sending and receiving roles on the test.
+ * <p/>When used to wrap a {@link FrameworkBaseCase} test, it replaces the default {@link CircuitFactory} implementations
+ * with a suitable circuit factory for distributed tests. Concrete implementations can use this to configure the sending
+ * and receiving roles on the test.
  *
  * <p><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
@@ -66,7 +66,7 @@
     /** Holds the connection that the control conversation is held over. */
     Connection connection;
 
-    /** Holds the underlying {@link DistributedTestCase}s that this decorator wraps. */
+    /** Holds the underlying test suite that this decorator wraps. */
     WrappedSuiteTestDecorator testSuite;
 
     /** Holds the control topic, on which test invitations are broadcast. */
@@ -117,7 +117,7 @@
      *
      * @return A distributed test sequencer.
      */
-    public abstract DistributedTestSequencer getDistributedTestSequencer();
+    public abstract CircuitFactory getTestSequencer();
 
     /**
      * Broadcasts an invitation to participate in a coordinating test case to find out what clients are available to
@@ -127,7 +127,7 @@
      *
      * @return A set of test clients that accepted the invitation.
      */
-    protected Set<TestClientDetails> signupClients(DistributedTestCase coordTest)
+    protected Set<TestClientDetails> signupClients(FrameworkBaseCase coordTest)
     {
         // Broadcast the invitation to find out what clients are available to test.
         Set<TestClientDetails> enlists;
@@ -143,7 +143,7 @@
             conversation.send(controlTopic, invite);
 
             // Wait for a short time, to give test clients an opportunity to reply to the invitation.
-            Collection<Message> replies = conversation.receiveAll(allClients.size(), 3000);
+            Collection<Message> replies = conversation.receiveAll(allClients.size(), 500);
             enlists = Coordinator.extractEnlists(replies);
         }
         catch (JMSException e)