You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/06/04 11:47:59 UTC
svn commit: r544109 - in
/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid:
interop/coordinator/ interop/coordinator/testcases/pubsub/
interop/testclient/ interop/testclient/testcases/ sustained/
Author: ritchiem
Date: Mon Jun 4 02:47:53 2007
New Revision: 544109
URL: http://svn.apache.org/viewvc?view=rev&rev=544109
Log:
Addition of a sustained test client. This is currently setup for running a pub/sub test.
The test allows for multiple clients to connect and participate in testing the broker throughput.
A single producer continually sends messages to a topic which the clients then send batched results back about.
The producer uses the timings in the reports to update the rate at which it sends messages. Ideally reaching a steady state where all messages produced are received by everyone within a specified time frame.
Added:
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java (with props)
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java (with props)
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/testcases/pubsub/
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java (with props)
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java (with props)
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java (with props)
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java (with props)
Modified:
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java?view=diff&rev=544109&r1=544108&r2=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java Mon Jun 4 02:47:53 2007
@@ -21,10 +21,7 @@
*/
package org.apache.qpid.interop.coordinator;
-import java.util.Collection;
import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.LinkedBlockingQueue;
import javax.jms.*;
@@ -74,13 +71,13 @@
private static final Logger log = Logger.getLogger(CoordinatingTestCase.class);
/** Holds the contact details for the sending test client. */
- TestClientDetails sender;
+ protected TestClientDetails sender;
/** Holds the contact details for the receving test client. */
- TestClientDetails receiver;
+ protected TestClientDetails receiver;
/** Holds the conversation factory over which to coordinate the test. */
- ConversationFactory conversationFactory;
+ protected ConversationFactory conversationFactory;
/**
* Creates a new coordinating test case with the specified name.
Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java?view=diff&rev=544109&r1=544108&r2=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java Mon Jun 4 02:47:53 2007
@@ -65,28 +65,28 @@
public static final String DEFAULT_CONNECTION_PROPS_RESOURCE = "org/apache/qpid/interop/connection.properties";
/** Holds the URL of the broker to coordinate the tests on. */
- String brokerUrl;
+ protected String brokerUrl;
/** Holds the virtual host to coordinate the tests on. If <tt>null</tt>, then the default virtual host is used. */
- String virtualHost;
+ protected String virtualHost;
/** Holds the list of all clients that enlisted, when the compulsory invite was issued. */
- Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
+ protected Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
/** Holds the conversation helper for the control conversation. */
- private ConversationFactory conversationFactory;
+ protected ConversationFactory conversationFactory;
/** Holds the connection that the coordinating messages are sent over. */
- private Connection connection;
+ protected Connection connection;
/**
* Holds the name of the class of the test currently being run. Ideally passed into the {@link #createTestResult}
* method, but as the signature is already fixed for this, the current value gets pushed here as a member variable.
*/
- private String currentTestClassName;
+ protected String currentTestClassName;
/** Holds the path of the directory to output test results too, if one is defined. */
- private static String reportDir;
+ protected static String _reportDir;
/**
* Creates an interop test coordinator on the specified broker and virtual host.
@@ -94,7 +94,7 @@
* @param brokerUrl The URL of the broker to connect to.
* @param virtualHost The virtual host to run all tests on. Optional, may be <tt>null</tt>.
*/
- Coordinator(String brokerUrl, String virtualHost)
+ public Coordinator(String brokerUrl, String virtualHost)
{
log.debug("Coordinator(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
@@ -121,38 +121,36 @@
// Use the command line parser to evaluate the command line with standard handling behaviour (print errors
// and usage then exist if there are errors).
Properties options =
- CommandLineParser.processCommandLine(args,
- new CommandLineParser(
- new String[][]
- {
- { "b", "The broker URL.", "broker", "false" },
- { "h", "The virtual host to use.", "virtual host", "false" },
- { "o", "The name of the directory to output test timings to.", "dir", "false" }
- }));
+ CommandLineParser.processCommandLine(args,
+ new CommandLineParser(
+ new String[][]
+ {
+ {"b", "The broker URL.", "broker", "false"},
+ {"h", "The virtual host to use.", "virtual host", "false"},
+ {"o", "The name of the directory to output test timings to.", "dir", "false"}
+ }));
// Extract the command line options.
String brokerUrl = options.getProperty("b");
String virtualHost = options.getProperty("h");
- reportDir = options.getProperty("o");
- reportDir = (reportDir == null) ? "." : reportDir;
+ _reportDir = options.getProperty("o");
+ _reportDir = (_reportDir == null) ? "." : _reportDir;
// Scan for available test cases using a classpath scanner.
Collection<Class<? extends CoordinatingTestCase>> testCaseClasses =
- new ArrayList<Class<? extends CoordinatingTestCase>>();
+ new ArrayList<Class<? extends CoordinatingTestCase>>();
// ClasspathScanner.getMatches(CoordinatingTestCase.class, "^Test.*", true);
// Hard code the test classes till the classpath scanner is fixed.
Collections.addAll(testCaseClasses,
- new Class[]
- {
- CoordinatingTestCase1DummyRun.class, CoordinatingTestCase2BasicP2P.class,
- CoordinatingTestCase3BasicPubSub.class
- });
+ CoordinatingTestCase1DummyRun.class,
+ CoordinatingTestCase2BasicP2P.class,
+ CoordinatingTestCase3BasicPubSub.class);
// Check that some test classes were actually found.
- if ((testCaseClasses == null) || testCaseClasses.isEmpty())
+ if (testCaseClasses.isEmpty())
{
throw new RuntimeException(
- "No test classes implementing CoordinatingTestCase were found on the class path.");
+ "No test classes implementing CoordinatingTestCase were found on the class path.");
}
int i = 0;
@@ -199,7 +197,7 @@
public TestResult start(String[] testClassNames) throws Exception
{
log.debug("public TestResult start(String[] testClassNames = " + PrettyPrintingUtils.printArray(testClassNames)
- + ": called");
+ + ": called");
// Connect to the broker.
connection = TestClient.createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
@@ -233,7 +231,7 @@
// Record the current test class, so that the test results can be output to a file incorporating this name.
this.currentTestClassName = testClassName;
- result = super.start(new String[] { testClassName });
+ result = super.start(new String[]{testClassName});
}
// At this point in time, all tests have completed. Broadcast the shutdown message.
@@ -257,7 +255,7 @@
public static Set<TestClientDetails> extractEnlists(Collection<Message> enlists) throws JMSException
{
log.debug("public static Set<TestClientDetails> extractEnlists(Collection<Message> enlists = " + enlists
- + "): called");
+ + "): called");
Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
@@ -315,9 +313,9 @@
targetTest = new WrappedSuiteTestDecorator(suite);
log.debug("Wrapped with a WrappedSuiteTestDecorator.");
}
-
// Wrap the tests in an inviting test decorator, to perform the invite/test cycle.
- targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+
+ targetTest = newTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
TestSuite suite = new TestSuite();
suite.addTest(targetTest);
@@ -328,6 +326,11 @@
return super.doRun(suite, wait);
}
+ protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set<TestClientDetails> enlistedClients, ConversationFactory conversationFactory, Connection connection)
+ {
+ return new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+ }
+
/**
* Creates the TestResult object to be used for test runs.
*
@@ -340,10 +343,10 @@
TKTestResult result = new TKTestResult(fPrinter.getWriter(), delay, verbose, testCaseName);
// Check if a directory to output reports to has been specified and attach test listeners if so.
- if (reportDir != null)
+ if (_reportDir != null)
{
// Create the report directory if it does not already exist.
- File reportDirFile = new File(reportDir);
+ File reportDirFile = new File(_reportDir);
if (!reportDirFile.exists())
{
@@ -381,5 +384,10 @@
}
return result;
+ }
+
+ public void setReportDir(String reportDir)
+ {
+ _reportDir = reportDir;
}
}
Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java?view=diff&rev=544109&r1=544108&r2=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java Mon Jun 4 02:47:53 2007
@@ -21,7 +21,6 @@
package org.apache.qpid.interop.coordinator;
import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -30,7 +29,6 @@
import junit.framework.Test;
import junit.framework.TestResult;
-import junit.framework.TestSuite;
import org.apache.log4j.Logger;
@@ -107,7 +105,7 @@
CoordinatingTestCase coordTest = (CoordinatingTestCase) test;
// Broadcast the invitation to find out what clients are available to test.
- Set<TestClientDetails> enlists = null;
+ Set<TestClientDetails> enlists;
try
{
Message invite = conversationFactory.getSession().createMessage();
Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java?view=auto&rev=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java Mon Jun 4 02:47:53 2007
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import javax.jms.Message;
+
+public interface ListeningCoordinatorTest
+{
+ public void latejoin(Message message);
+}
Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningCoordinatorTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java?view=auto&rev=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java Mon Jun 4 02:47:53 2007
@@ -0,0 +1,200 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop.coordinator;
+
+import junit.framework.Test;
+import junit.framework.TestResult;
+import org.apache.log4j.Logger;
+import org.apache.qpid.util.ConversationFactory;
+import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Broadcast test
+ * invitations and collect enlists. <td> {@link ConversationFactory}. <tr><td> Output test failures for clients
+ * unwilling to run the test case. <td> {@link Coordinator} <tr><td> Execute coordinated test cases. <td> {@link
+ * CoordinatingTestCase} </table>
+ */
+public class ListeningTestDecorator extends WrappedSuiteTestDecorator implements MessageListener
+{
+ private static final Logger log = Logger.getLogger(ListeningTestDecorator.class);
+
+ /** Holds the contact information for all test clients that are available and that may take part in the test. */
+ Set<TestClientDetails> allClients;
+
+ /** Holds the conversation helper for the control level conversation for coordinating the test through. */
+ ConversationFactory conversationFactory;
+
+ /** Holds the connection that the control conversation is held over. */
+ Connection connection;
+
+ /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */
+ WrappedSuiteTestDecorator testSuite;
+
+ /** Hold the current running test case. */
+ CoordinatingTestCase _currentTest = null;
+
+ /**
+ * Creates a wrapped suite test decorator from another one.
+ *
+ * @param suite The test suite.
+ * @param availableClients The list of all clients that responded to the compulsory invite.
+ * @param controlConversation The conversation helper for the control level, test coordination conversation.
+ * @param controlConnection The connection that the coordination messages are sent over.
+ */
+ public ListeningTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> availableClients,
+ ConversationFactory controlConversation, Connection controlConnection)
+ {
+ super(suite);
+
+ log.debug("public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> allClients = "
+ + availableClients + ", ConversationHelper controlConversation = " + controlConversation + "): called");
+
+ testSuite = suite;
+ allClients = availableClients;
+ conversationFactory = controlConversation;
+ connection = controlConnection;
+ }
+
+ /**
+ * Broadcasts a test invitation and accetps enlisting from participating clients. The wrapped test case is then
+ * repeated for every combination of test clients (provided the wrapped test case extends {@link
+ * CoordinatingTestCase}.
+ *
+ * <p/>Any JMSExceptions during the invite/enlist conversation will be allowed to fall through as runtime
+ * exceptions, resulting in the non-completion of the test run.
+ *
+ * @param testResult The the results object to monitor the test results with.
+ *
+ * @todo Better error recovery for failure of the invite/enlist conversation could be added.
+ */
+ public void run(TestResult testResult)
+ {
+ log.debug("public void run(TestResult testResult): called");
+
+ Collection<Test> tests = testSuite.getAllUnderlyingTests();
+
+ for (Test test : tests)
+ {
+ CoordinatingTestCase coordTest = (CoordinatingTestCase) test;
+
+ Set<TestClientDetails> enlists = signupClients(coordTest);
+
+ if (enlists.size() == 0)
+ {
+ throw new RuntimeException("No clients to test with");
+ }
+
+ Iterator<TestClientDetails> clients = enlists.iterator();
+ coordTest.setSender(clients.next());
+
+ while (clients.hasNext())
+ {
+ // Set the sending and receiving client details on the test case.
+ coordTest.setReceiver(clients.next());
+ }
+
+ // Pass down the connection to hold the coordination conversation over.
+ coordTest.setConversationFactory(conversationFactory);
+
+
+ if (coordTest instanceof ListeningCoordinatorTest)
+ {
+ _currentTest = coordTest;
+ }
+ // Execute the test case.
+ coordTest.run(testResult);
+
+ _currentTest = null;
+ }
+ }
+
+ private Set<TestClientDetails> signupClients(CoordinatingTestCase coordTest)
+ {
+ // Broadcast the invitation to find out what clients are available to test.
+ Set<TestClientDetails> enlists;
+ try
+ {
+ Message invite = conversationFactory.getSession().createMessage();
+ Destination controlTopic = conversationFactory.getSession().createTopic("iop.control");
+ ConversationFactory.Conversation conversation = conversationFactory.startConversation();
+
+ invite.setStringProperty("CONTROL_TYPE", "INVITE");
+ invite.setStringProperty("TEST_NAME", coordTest.getTestCaseNameForTestMethod(coordTest.getName()));
+
+ conversation.send(controlTopic, invite);
+
+ // Wait for a short time, to give test clients an opportunity to reply to the invitation.
+ Collection<Message> replies = conversation.receiveAll(allClients.size(), 5000);
+
+ log.debug("Received " + replies.size() + " enlist replies");
+
+ enlists = Coordinator.extractEnlists(replies);
+
+ //Create topic to listen on for latejoiners
+ Destination listenTopic = conversationFactory.getSession().createTopic("iop.control.test." + coordTest.getTestCaseNameForTestMethod(coordTest.getName()));
+
+ //Listen for joiners
+ conversationFactory.getSession().createConsumer(listenTopic).setMessageListener(this);
+ log.debug("Created consumer on :" + listenTopic);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException("There was a JMSException during the invite/enlist conversation.", e);
+ }
+
+ return enlists;
+ }
+
+ /**
+ * Prints a string summarizing this test decorator, mainly for debugging purposes.
+ *
+ * @return String representation for debugging purposes.
+ */
+ public String toString()
+ {
+ return "ListeningTestDecorator: [ testSuite = " + testSuite + " ]";
+ }
+
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ if (message.getStringProperty("CONTROL_TYPE").equals("LATEJOIN"))
+ {
+ ((ListeningCoordinatorTest) _currentTest).latejoin(message);
+ }
+ }
+ catch (JMSException e)
+ {
+ log.debug("Unable to process message:" + message);
+ }
+ }
+}
Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/ListeningTestDecorator.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java?view=diff&rev=544109&r1=544108&r2=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java Mon Jun 4 02:47:53 2007
@@ -75,16 +75,17 @@
/** Holds all the test cases loaded from the classpath. */
Map<String, InteropClientTestCase> testCases = new HashMap<String, InteropClientTestCase>();
- InteropClientTestCase currentTestCase;
+ protected InteropClientTestCase currentTestCase;
- private MessageProducer producer;
- private Session session;
+ protected Connection _connection;
+ protected MessageProducer producer;
+ protected Session session;
- private String clientName = CLIENT_NAME;
+ protected String clientName = CLIENT_NAME;
/**
- * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified
- * client identifying name.
+ * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client
+ * identifying name.
*
* @param brokerUrl The url of the broker to connect to.
* @param virtualHost The virtual host to conect to.
@@ -93,7 +94,7 @@
public TestClient(String brokerUrl, String virtualHost, String clientName)
{
log.debug("public TestClient(String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost
- + ", String clientName = " + clientName + "): called");
+ + ", String clientName = " + clientName + "): called");
// Retain the connection parameters.
this.brokerUrl = brokerUrl;
@@ -117,13 +118,13 @@
{
// Use the command line parser to evaluate the command line.
CommandLineParser commandLine =
- new CommandLineParser(
- new String[][]
- {
- { "b", "The broker URL.", "broker", "false" },
- { "h", "The virtual host to use.", "virtual host", "false" },
- { "n", "The test client name.", "name", "false" }
- });
+ new CommandLineParser(
+ new String[][]
+ {
+ {"b", "The broker URL.", "broker", "false"},
+ {"h", "The virtual host to use.", "virtual host", "false"},
+ {"n", "The test client name.", "name", "false"}
+ });
// Capture the command line arguments or display errors and correct usage and then exit.
Properties options = null;
@@ -151,9 +152,17 @@
// Create a test client and start it running.
TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName);
+ // Use a class path scanner to find all the interop test case implementations.
+ Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
+ new ArrayList<Class<? extends InteropClientTestCase>>();
+ // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
+ // Hard code the test classes till the classpath scanner is fixed.
+ Collections.addAll(testCaseClasses,
+ new Class[]{TestCase1DummyRun.class, TestCase2BasicP2P.class, TestClient.class});
+
try
{
- client.start();
+ client.start(testCaseClasses);
}
catch (Exception e)
{
@@ -165,20 +174,12 @@
/**
* Starts the interop test client running. This causes it to start listening for incoming test invites.
*
- * @throws JMSException Any underlying JMSExceptions are allowed to fall through.
+ * @throws JMSException Any underlying JMSExceptions are allowed to fall through. @param testCaseClasses
*/
- private void start() throws JMSException
+ protected void start(Collection<Class<? extends InteropClientTestCase>> testCaseClasses) throws JMSException
{
log.debug("private void start(): called");
- // Use a class path scanner to find all the interop test case implementations.
- Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
- new ArrayList<Class<? extends InteropClientTestCase>>();
- // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
- // Hard code the test classes till the classpath scanner is fixed.
- Collections.addAll(testCaseClasses,
- new Class[] { TestCase1DummyRun.class, TestCase2BasicP2P.class, TestCase3BasicPubSub.class });
-
// Create all the test case implementations and index them by the test names.
for (Class<? extends InteropClientTestCase> nextClass : testCaseClasses)
{
@@ -200,9 +201,9 @@
}
// Open a connection to communicate with the coordinator on.
- Connection connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
+ _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Set this up to listen for control messages.
MessageConsumer consumer = session.createConsumer(session.createTopic("iop.control." + clientName));
@@ -215,7 +216,7 @@
producer = session.createProducer(null);
// Start listening for incoming control messages.
- connection.start();
+ _connection.start();
}
/**
@@ -232,22 +233,25 @@
* @param virtualHost The virtual host to connectio to, <tt>null</tt> to use the default.
*
* @return A JMS conneciton.
+ *
+ * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it to a
+ * Utils library class.
*/
public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
{
log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
- + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+ + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
try
{
Properties connectionProps =
- PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream(
- connectionPropsResource));
+ PropertiesUtils.getProperties(TestClient.class.getClassLoader().getResourceAsStream(
+ connectionPropsResource));
if (brokerUrl != null)
{
String connectionString =
- "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
+ "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
connectionProps.setProperty(CONNECTION_PROPERTY, connectionString);
}
@@ -286,21 +290,21 @@
String controlType = message.getStringProperty("CONTROL_TYPE");
String testName = message.getStringProperty("TEST_NAME");
+ log.info("onMessage(Message message = " + message + "): for '" + controlType + "' to '" + testName + "'");
+
// Check if the message is a test invite.
if ("INVITE".equals(controlType))
{
- String testCaseName = message.getStringProperty("TEST_NAME");
-
// Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites
// for which test cases exist.
boolean enlist = false;
- if (testCaseName != null)
+ if (testName != null)
{
- log.debug("Got an invite to test: " + testCaseName);
+ log.debug("Got an invite to test: " + testName);
// Check if the requested test case is available.
- InteropClientTestCase testCase = testCases.get(testCaseName);
+ InteropClientTestCase testCase = testCases.get(testName);
if (testCase != null)
{
@@ -308,6 +312,10 @@
currentTestCase = testCase;
enlist = true;
}
+ else
+ {
+ log.warn("'" + testName + "' not part of this clients tests.");
+ }
}
else
{
@@ -325,6 +333,8 @@
enlistMessage.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
enlistMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+ log.info("Sending Message '" + enlistMessage + "'. to " + message.getJMSReplyTo());
+
producer.send(message.getJMSReplyTo(), enlistMessage);
}
}
@@ -369,9 +379,10 @@
}
else if ("TERMINATE".equals(controlType))
{
- System.out.println("Received termination instruction from coordinator.");
+ log.info("Received termination instruction from coordinator.");
// Is a cleaner shutdown needed?
+ _connection.close();
System.exit(0);
}
else
Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java?view=diff&rev=544109&r1=544108&r2=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java Mon Jun 4 02:47:53 2007
@@ -5,7 +5,6 @@
import org.apache.log4j.Logger;
import org.apache.qpid.interop.testclient.InteropClientTestCase;
-import org.apache.qpid.interop.testclient.TestClient;
/**
* Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
@@ -120,8 +119,8 @@
session = new Session[1];
connection[0] =
- TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl,
- TestClient.virtualHost);
+ org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+ org.apache.qpid.interop.testclient.TestClient.virtualHost);
session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
// Extract and retain the test parameters.
@@ -140,8 +139,8 @@
for (int i = 0; i < numReceivers; i++)
{
connection[i] =
- TestClient.createConnection(TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, TestClient.brokerUrl,
- TestClient.virtualHost);
+ org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+ org.apache.qpid.interop.testclient.TestClient.virtualHost);
session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
sendDestination = session[i].createTopic(sendKey);
Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java?view=auto&rev=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java Mon Jun 4 02:47:53 2007
@@ -0,0 +1,567 @@
+package org.apache.qpid.sustained;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
+ * default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of
+ * messages sent/received.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Supply the name
+ * of the test case that this implements. <tr><td> Accept/Reject invites based on test parameters. <tr><td> Adapt to
+ * assigned roles. <tr><td> Send required number of test messages using pub/sub. <tr><td> Generate test reports.
+ * </table>
+ */
+public class SustainedTestClient extends TestCase3BasicPubSub implements ExceptionListener
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(SustainedTestClient.class);
+
+ /** The role to be played by the test. */
+ private Roles role;
+
+ /** The number of test messages to send. */
+// private int numMessages;
+
+ /** The number of receiver connection to use. */
+ private int numReceivers;
+
+ /** The routing key to send them to on the default direct exchange. */
+ private Destination sendDestination;
+
+ /** The routing key to send updates to on the default direct exchange. */
+ private Destination sendUpdateDestination;
+
+
+ /** The connections to send/receive the test messages on. */
+ private Connection[] connection;
+
+ /** The sessions to send/receive the test messages on. */
+ private Session[] session;
+
+ /** The producer to send the test messages with. */
+ MessageProducer producer;
+
+ /** Adapter that adjusts the send rate based on the updates from clients. */
+ SustainedRateAdapter _rateAdapter;
+
+ /** */
+ int updateInterval;
+
+ private boolean _running = true;
+
+ /**
+ * Should provide the name of the test case that this class implements. The exact names are defined in the interop
+ * testing spec.
+ *
+ * @return The name of the test case that this implements.
+ */
+ public String getName()
+ {
+ log.debug("public String getName(): called");
+
+ return "Perf_SustainedPubSub";
+ }
+
+ /**
+ * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment
+ * message. When this method return the test case will be ready to execute.
+ *
+ * @param role The role to be played; sender or receiver.
+ * @param assignRoleMessage The role assingment message, contains the full test parameters.
+ *
+ * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+ */
+ public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
+ {
+ log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
+ + "): called");
+
+ // Take note of the role to be played.
+ this.role = role;
+
+ // Extract and retain the test parameters.
+ numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS");
+ updateInterval = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
+ String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
+ String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
+ int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
+
+ log.debug("numReceivers = " + numReceivers);
+ log.debug("updateInterval = " + updateInterval);
+ log.debug("ackMode = " + ackMode);
+ log.debug("sendKey = " + sendKey);
+ log.debug("sendUpdateKey = " + sendUpdateKey);
+ log.debug("role = " + role);
+
+ switch (role)
+ {
+ // Check if the sender role is being assigned, and set up a single message producer if so.
+ case SENDER:
+ log.info("*********** Creating SENDER");
+ // Create a new connection to pass the test messages on.
+ connection = new Connection[1];
+ session = new Session[1];
+
+ connection[0] =
+ org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+ org.apache.qpid.interop.testclient.TestClient.virtualHost);
+ session[0] = connection[0].createSession(false, ackMode);
+
+ // Extract and retain the test parameters.
+ sendDestination = session[0].createTopic(sendKey);
+
+ connection[0].setExceptionListener(this);
+
+ producer = session[0].createProducer(sendDestination);
+
+ sendUpdateDestination = session[0].createTopic(sendUpdateKey);
+ MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination);
+
+ _rateAdapter = new SustainedRateAdapter(this);
+ updateConsumer.setMessageListener(_rateAdapter);
+
+
+ break;
+
+ // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number
+ // of receiver connections.
+ case RECEIVER:
+ log.info("*********** Creating RECEIVER");
+ // Create the required number of receiver connections.
+ connection = new Connection[numReceivers];
+ session = new Session[numReceivers];
+
+ for (int i = 0; i < numReceivers; i++)
+ {
+ connection[i] =
+ org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
+ org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+ org.apache.qpid.interop.testclient.TestClient.virtualHost);
+ session[i] = connection[i].createSession(false, ackMode);
+
+ sendDestination = session[i].createTopic(sendKey);
+
+ sendUpdateDestination = session[i].createTopic(sendUpdateKey);
+
+ MessageConsumer consumer = session[i].createConsumer(sendDestination);
+
+ consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, updateInterval, session[i], sendUpdateDestination));
+ }
+
+ break;
+ }
+
+ // Start all the connection dispatcher threads running.
+ for (int i = 0; i < connection.length; i++)
+ {
+ connection[i].start();
+ }
+ }
+
+ /** Performs the test case actions. */
+ public void start() throws JMSException
+ {
+ log.debug("public void start(): called");
+
+ // Check that the sender role is being performed.
+ switch (role)
+ {
+ // Check if the sender role is being assigned, and set up a single message producer if so.
+ case SENDER:
+ Message testMessage = session[0].createTextMessage("test");
+
+// for (int i = 0; i < numMessages; i++)
+ while (_running)
+ {
+ producer.send(testMessage);
+
+ _rateAdapter.sentMessage();
+ }
+ break;
+ case RECEIVER:
+
+ }
+ }
+
+ /**
+ * Gets a report on the actions performed by the test case in its assigned role.
+ *
+ * @param session The session to create the report message in.
+ *
+ * @return The report message.
+ *
+ * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through.
+ */
+ public Message getReport(Session session) throws JMSException
+ {
+ log.debug("public Message getReport(Session session): called");
+
+ // Close the test connections.
+ for (int i = 0; i < connection.length; i++)
+ {
+ connection[i].close();
+ }
+
+ Message report = session.createMessage();
+ report.setStringProperty("CONTROL_TYPE", "REPORT");
+
+ return report;
+ }
+
+ public void onException(JMSException jmsException)
+ {
+ Exception linked = jmsException.getLinkedException();
+
+ if (linked != null)
+ {
+ if (linked instanceof AMQNoRouteException)
+ {
+ log.warn("No route .");
+ }
+ else if (linked instanceof AMQNoConsumersException)
+ {
+ log.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage());
+ }
+ else
+ {
+
+ log.warn("LinkedException:" + linked);
+ }
+
+ _rateAdapter.NO_CLIENTS = true;
+ }
+ else
+ {
+ log.warn("Exception:" + linked);
+ }
+ }
+
+ class SustainedListener implements MessageListener
+ {
+ private int _received = 0;
+ private int _updateInterval = 0;
+ private Long _time;
+ MessageProducer _updater;
+ Session _session;
+ String _client;
+
+
+ public SustainedListener(String clientname, int updateInterval, Session session, Destination sendDestination) throws JMSException
+ {
+ _updateInterval = updateInterval;
+ _client = clientname;
+ _session = session;
+ _updater = session.createProducer(sendDestination);
+ }
+
+ public void setReportInterval(int reportInterval)
+ {
+ _updateInterval = reportInterval;
+ _received = 0;
+ }
+
+ public void onMessage(Message message)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Message " + _received + "received in listener");
+ }
+
+ if (message instanceof TextMessage)
+ {
+
+ try
+ {
+ if (((TextMessage) message).getText().equals("test"))
+ {
+ if (_received == 0)
+ {
+ _time = System.nanoTime();
+ sendStatus(0, _received);
+ }
+
+ _received++;
+
+ if (_received % _updateInterval == 0)
+ {
+ Long currentTime = System.nanoTime();
+
+ try
+ {
+ sendStatus(currentTime - _time, _received);
+ _time = currentTime;
+ }
+ catch (JMSException e)
+ {
+ log.error("Unable to send update.");
+ }
+ }
+
+ }
+ }
+ catch (JMSException e)
+ {
+ //ignore error
+ }
+ }
+ }
+
+ private void sendStatus(long time, int received) throws JMSException
+ {
+ Message updateMessage = _session.createTextMessage("update");
+ updateMessage.setStringProperty("CLIENT_ID", _client);
+ updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
+ updateMessage.setLongProperty("RECEIVED", received);
+ updateMessage.setLongProperty("DURATION", time);
+
+ log.info("**** SENDING **** CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+
+ _updater.send(updateMessage);
+ }
+
+ }
+
+ class SustainedRateAdapter implements MessageListener
+ {
+ private SustainedTestClient _client;
+ private long _variance = 250; //no. messages to allow drifting
+ private volatile long _delay; //in nanos
+ private long _sent;
+ private Map<String, Long> _slowClients = new HashMap<String, Long>();
+ private static final long PAUSE_SLEEP = 10; // 10 ms
+ private static final long NO_CLIENT_SLEEP = 1000; // 1s
+ private static final long MAX_MESSAGE_DRIFT = 1000; // no messages drifted from producer
+ private volatile boolean NO_CLIENTS = true;
+ private int _delayShifting;
+ private static final int REPORTS_WITHOUT_CHANGE = 10;
+ private static final double MAXIMUM_DELAY_SHIFT = .02; //2%
+
+ SustainedRateAdapter(SustainedTestClient client)
+ {
+ _client = client;
+ }
+
+ public void onMessage(Message message)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called");
+ }
+
+ try
+ {
+ String controlType = message.getStringProperty("CONTROL_TYPE");
+
+ // Check if the message is a test invite.
+ if ("UPDATE".equals(controlType))
+ {
+ NO_CLIENTS = false;
+ long duration = message.getLongProperty("DURATION");
+ long received = message.getLongProperty("RECEIVED");
+ String client = message.getStringProperty("CLIENT_ID");
+
+ log.info("**** SENDING **** CLIENT_ID:" + client + " RECEIVED:" + received + " DURATION:" + duration);
+
+
+ recordSlow(client, received);
+
+ adjustDelay(client, received, duration);
+ }
+ }
+ catch (JMSException e)
+ {
+ //
+ }
+ }
+
+ class Pair<X, Y>
+ {
+ X item1;
+ Y item2;
+
+ Pair(X i1, Y i2)
+ {
+ item1 = i1;
+ item2 = i2;
+ }
+
+ X getItem1()
+ {
+ return item1;
+ }
+
+ Y getItem2()
+ {
+ return item2;
+ }
+ }
+
+ Map<String, Pair<Long, Long>> delays = new HashMap<String, Pair<Long, Long>>();
+ Long totalReceived = 0L;
+ Long totalDuration = 0L;
+
+ private void adjustDelay(String client, long received, long duration)
+ {
+ Pair<Long, Long> current = delays.get(client);
+
+ if (current == null)
+ {
+ delays.put(client, new Pair<Long, Long>(received, duration));
+ }
+ else
+ {
+ //reduce totals
+ totalReceived -= current.getItem1();
+ totalDuration -= current.getItem2();
+ }
+
+ totalReceived += received;
+ totalDuration += duration;
+
+ long averageDuration = totalDuration / delays.size();
+
+ long diff = Math.abs(_delay - averageDuration);
+
+ //if the averageDuration differs from the current by more than the specified variane then adjust delay.
+ if (diff > _variance)
+ {
+ if (averageDuration > _delay)
+ {
+ // we can go faster
+ _delay -= diff;
+ if (_delay < 0)
+ {
+ _delay = 0;
+ }
+ }
+ else
+ {
+ // we need to slow down
+ _delay += diff;
+ }
+ delayChanged();
+ }
+ else
+ {
+ delayStable();
+ }
+
+ }
+
+ private void delayChanged()
+ {
+ _delayShifting = REPORTS_WITHOUT_CHANGE;
+ }
+
+ private void delayStable()
+ {
+ _delayShifting--;
+
+ if (_delayShifting < 0)
+ {
+ _delayShifting = 0;
+ log.info("Delay stabilised:" + _delay);
+ }
+ }
+
+ // Record Slow clients
+ private void recordSlow(String client, long received)
+ {
+ if (received < (_sent - _variance))
+ {
+ _slowClients.put(client, received);
+ }
+ else
+ {
+ _slowClients.remove(client);
+ }
+ }
+
+ public void sentMessage()
+ {
+ if (_sent % updateInterval == 0)
+ {
+
+ // Cause test to pause when we have slow
+ if (!_slowClients.isEmpty() || NO_CLIENTS)
+ {
+ log.info("Pausing for slow clients");
+
+ //_delay <<= 1;
+
+ while (!_slowClients.isEmpty())
+ {
+ sleep(PAUSE_SLEEP);
+ }
+
+ if (NO_CLIENTS)
+ {
+ sleep(NO_CLIENT_SLEEP);
+ }
+
+ log.debug("Continuing");
+ return;
+ }
+ else
+ {
+ log.info("Delay:" + _delay);
+ }
+ }
+
+ _sent++;
+
+ if (_delay > 0)
+ {
+ // less than 10ms sleep doesn't work.
+ // _delay is in nano seconds
+ if (_delay < 1000000)
+ {
+ sleep(0, (int) _delay);
+ }
+ else
+ {
+ if (_delay < 30000000000L)
+ {
+ sleep(_delay / 1000000, (int) (_delay % 1000000));
+ }
+ }
+ }
+ }
+
+ private void sleep(long sleep)
+ {
+ sleep(sleep, 0);
+ }
+
+ private void sleep(long milli, int nano)
+ {
+ try
+ {
+ log.debug("Sleep:" + milli + ":" + nano);
+ Thread.sleep(milli, nano);
+ }
+ catch (InterruptedException e)
+ {
+ //
+ }
+ }
+ }
+
+}
Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java?view=auto&rev=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java Mon Jun 4 02:47:53 2007
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.interop.coordinator.ListeningCoordinatorTest;
+import org.apache.qpid.interop.coordinator.TestClientDetails;
+import org.apache.qpid.interop.coordinator.testcases.CoordinatingTestCase3BasicPubSub;
+import org.apache.qpid.util.ConversationFactory;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub implements ListeningCoordinatorTest
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(SustainedTestCoordinator.class);
+ private List<TestClientDetails> _receivers;
+ private static final String SUSTAINED_KEY = "Perf_SustainedPubSub";
+ Map<String, Object> _testProperties;
+
+ /**
+ * Creates a new coordinating test case with the specified name.
+ *
+ * @param name The test case name.
+ */
+ public SustainedTestCoordinator(String name)
+ {
+ super(name);
+ _receivers = new LinkedList();
+ }
+
+ /**
+ * Adds a receiver to this test.
+ *
+ * @param receiver The contact details of the sending client in the test.
+ */
+ public void setReceiver(TestClientDetails receiver)
+ {
+ _receivers.add(receiver);
+ }
+
+
+ /**
+ * Performs the a single test run
+ *
+ * @throws Exception if there was a problem running the test.
+ */
+ public void testBasicPubSub() throws Exception
+ {
+ log.debug("public void testSinglePubSubCycle(): called");
+
+ Map<String, Object> testConfig = new HashMap<String, Object>();
+ testConfig.put("TEST_NAME", "Perf_SustainedPubSub");
+ testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY);
+ //testConfig.put("SUSTAINED_MSG_RATE", 10);
+ testConfig.put("SUSTAINED_NUM_RECEIVERS", 2);
+ testConfig.put("SUSTAINED_UPDATE_INTERVAL", 25);
+ testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE");
+ testConfig.put("ACKNOWLEDGE_MODE", AMQSession.NO_ACKNOWLEDGE);
+
+ sequenceTest(testConfig);
+ }
+
+ /**
+ * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner loop
+ * of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports from the
+ * participants.
+ *
+ * @param testProperties The test case definition.
+ *
+ * @return The test results from the senders and receivers.
+ *
+ * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ protected Message[] sequenceTest(Map<String, Object> testProperties) throws JMSException
+ {
+ log.debug("protected Message[] sequenceTest(Object... testProperties = " + testProperties + "): called");
+
+ Session session = conversationFactory.getSession();
+ Destination senderControlTopic = session.createTopic(sender.privateControlKey);
+
+ ConversationFactory.Conversation senderConversation = conversationFactory.startConversation();
+
+ // Assign the sender role to the sending test client.
+ Message assignSender = conversationFactory.getSession().createMessage();
+ setPropertiesOnMessage(assignSender, testProperties);
+ assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
+ assignSender.setStringProperty("ROLE", "SENDER");
+
+ senderConversation.send(senderControlTopic, assignSender);
+
+ //Assign and wait for the receiver ckuebts to be ready.
+ _testProperties = testProperties;
+
+ // Wait for the senders to confirm their roles.
+ senderConversation.receive();
+
+ assignReceivers();
+
+ // Start the test.
+ Message start = session.createMessage();
+ start.setStringProperty("CONTROL_TYPE", "START");
+
+ senderConversation.send(senderControlTopic, start);
+
+ // Wait for the test sender to return its report.
+ Message senderReport = senderConversation.receive();
+
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ // Ask the receiver for its report.
+ Message statusRequest = session.createMessage();
+ statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST");
+
+
+ return new Message[]{senderReport};
+ }
+
+ private void assignReceivers()
+ {
+ for (TestClientDetails receiver : _receivers)
+ {
+ registerReceiver(receiver);
+ }
+ }
+
+ private void registerReceiver(TestClientDetails receiver)
+ {
+ log.info("registerReceiver called for receiver:" + receiver);
+ try
+ {
+ Session session = conversationFactory.getSession();
+ Destination receiverControlTopic = session.createTopic(receiver.privateControlKey);
+ ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation();
+ // Assign the receiver role the receiving client.
+ Message assignReceiver = session.createMessage();
+ setPropertiesOnMessage(assignReceiver, _testProperties);
+ assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
+ assignReceiver.setStringProperty("ROLE", "RECEIVER");
+
+ receiverConversation.send(receiverControlTopic, assignReceiver);
+
+ //Don't wait for receiver to be ready.... we can't this is being done in
+ // the dispatcher thread, and most likely the acceptance message we
+ // want is sitting in the Dispatcher._queue waiting its turn for being
+ // dispatched so if we block here we won't can't get the message.
+ // So assume consumer is ready for action.
+ //receiverConversation.receive();
+ }
+ catch (JMSException e)
+ {
+ log.warn("Unable to assign receiver:" + receiver + ". Due to:" + e.getMessage());
+ }
+ }
+
+ public void latejoin(Message message)
+ {
+ try
+ {
+
+ TestClientDetails clientDetails = new TestClientDetails();
+ clientDetails.clientName = message.getStringProperty("CLIENT_NAME");
+ clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY");
+
+
+ registerReceiver(clientDetails);
+ }
+ catch (JMSException e)
+ {
+ //swallow
+ }
+ }
+
+ /**
+ * Should provide a translation from the junit method name of a test to its test case name as defined in the interop
+ * testing specification. For example the method "testP2P" might map onto the interop test case name
+ * "TC2_BasicP2P".
+ *
+ * @param methodName The name of the JUnit test method.
+ *
+ * @return The name of the corresponding interop test case.
+ */
+ public String getTestCaseNameForTestMethod(String methodName)
+ {
+ return "Perf_SustainedPubSub";
+ }
+}
Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java?view=auto&rev=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java Mon Jun 4 02:47:53 2007
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.interop.testclient.InteropClientTestCase;
+import org.apache.qpid.util.CommandLineParser;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+
+public class TestClient extends org.apache.qpid.interop.testclient.TestClient
+{
+ private static Logger log = Logger.getLogger(TestClient.class);
+
+ /**
+ * Creates a new interop test client, listenting to the specified broker and virtual host, with the specified client
+ * identifying name.
+ *
+ * @param brokerUrl The url of the broker to connect to.
+ * @param virtualHost The virtual host to conect to.
+ * @param clientName The client name to use.
+ */
+ public TestClient(String brokerUrl, String virtualHost, String clientName)
+ {
+ super(brokerUrl, virtualHost, clientName);
+ }
+
+ /**
+ * The entry point for the interop test coordinator. This client accepts the following command line arguments:
+ *
+ * <p/><table> <tr><td> -b <td> The broker URL. <td> Optional. <tr><td> -h <td> The virtual
+ * host. <td> Optional. <tr><td> -n <td> The test client name. <td> Optional. <tr><td> name=value <td>
+ * Trailing argument define name/value pairs. Added to system properties. <td> Optional. </table>
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ // Use the command line parser to evaluate the command line.
+ CommandLineParser commandLine =
+ new CommandLineParser(
+ new String[][]
+ {
+ {"b", "The broker URL.", "broker", "false"},
+ {"h", "The virtual host to use.", "virtual host", "false"},
+ {"n", "The test client name.", "name", "false"},
+ {"j", "Join this test client to running test.", "join", ""}
+ });
+
+ // Capture the command line arguments or display errors and correct usage and then exit.
+ Properties options = null;
+
+ try
+ {
+ options = commandLine.parseCommandLine(args);
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(commandLine.getErrors());
+ System.out.println(commandLine.getUsage());
+ System.exit(1);
+ }
+
+ // Extract the command line options.
+ String brokerUrl = options.getProperty("b");
+ String virtualHost = options.getProperty("h");
+ String clientName = options.getProperty("n");
+ String join = options.getProperty("j");
+
+ // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
+ // overridden values from there.
+ commandLine.addCommandLineToSysProperties();
+
+ // Create a test client and start it running.
+ TestClient client = new TestClient(brokerUrl, virtualHost, (clientName == null) ? CLIENT_NAME : clientName);
+
+ // Use a class path scanner to find all the interop test case implementations.
+ Collection<Class<? extends InteropClientTestCase>> testCaseClasses =
+ new ArrayList<Class<? extends InteropClientTestCase>>();
+ // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
+ // Hard code the test classes till the classpath scanner is fixed.
+ Collections.addAll(testCaseClasses,
+ SustainedTestClient.class);
+
+
+ try
+ {
+ client.start(testCaseClasses, join);
+ }
+ catch (Exception e)
+ {
+ log.error("The test client was unable to start.", e);
+ System.exit(1);
+ }
+ }
+
+ protected void start(Collection<Class<? extends InteropClientTestCase>> testCaseClasses, String join) throws JMSException, ClassNotFoundException
+ {
+ super.start(testCaseClasses);
+ log.debug("private void start(): called");
+
+ if (join != null && !join.equals(""))
+ {
+ Message latejoin = session.createMessage();
+
+ try
+ {
+ Object test = Class.forName(join).newInstance();
+ if (test instanceof InteropClientTestCase)
+ {
+ currentTestCase = (InteropClientTestCase) test;
+ }
+ else
+ {
+ throw new RuntimeException("Requested to join class '" + join + "' but this is not a InteropClientTestCase.");
+ }
+
+ latejoin.setStringProperty("CONTROL_TYPE", "LATEJOIN");
+ latejoin.setStringProperty("CLIENT_NAME", clientName);
+ latejoin.setStringProperty("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
+ producer.send(session.createTopic("iop.control.test." + currentTestCase.getName()), latejoin);
+ }
+ catch (InstantiationException e)
+ {
+ log.warn("Unable to request latejoining of test:" + currentTestCase);
+ }
+ catch (IllegalAccessException e)
+ {
+ log.warn("Unable to request latejoining of test:" + currentTestCase);
+ }
+ }
+ }
+
+}
Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestClient.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java?view=auto&rev=544109
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java Mon Jun 4 02:47:53 2007
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.sustained;
+
+import org.apache.qpid.interop.coordinator.Coordinator;
+import org.apache.qpid.interop.coordinator.ListeningTestDecorator;
+import org.apache.qpid.interop.coordinator.TestClientDetails;
+import org.apache.qpid.util.CommandLineParser;
+import org.apache.qpid.util.ConversationFactory;
+import org.apache.log4j.Logger;
+
+import java.util.Properties;
+import java.util.Set;
+
+import junit.framework.TestResult;
+import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
+
+import javax.jms.Connection;
+
+public class TestCoordinator extends Coordinator
+{
+
+ private static final Logger log = Logger.getLogger(TestCoordinator.class);
+
+ /**
+ * Creates an interop test coordinator on the specified broker and virtual host.
+ *
+ * @param brokerUrl The URL of the broker to connect to.
+ * @param virtualHost The virtual host to run all tests on. Optional, may be <tt>null</tt>.
+ */
+ TestCoordinator(String brokerUrl, String virtualHost)
+ {
+ super(brokerUrl, virtualHost);
+ }
+
+ protected WrappedSuiteTestDecorator newTestDecorator(WrappedSuiteTestDecorator targetTest, Set<TestClientDetails> enlistedClients, ConversationFactory conversationFactory, Connection connection)
+ {
+ return new ListeningTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
+ }
+
+
+ /**
+ * The entry point for the interop test coordinator. This client accepts the following command line arguments:
+ *
+ * <p/><table> <tr><td> -b <td> The broker URL. <td> Mandatory. <tr><td> -h <td> The virtual host.
+ * <td> Optional. <tr><td> name=value <td> Trailing argument define name/value pairs. Added to system properties.
+ * <td> Optional. </table>
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ try
+ {
+ // Use the command line parser to evaluate the command line with standard handling behaviour (print errors
+ // and usage then exist if there are errors).
+ Properties options =
+ CommandLineParser.processCommandLine(args,
+ new CommandLineParser(
+ new String[][]
+ {
+ {"b", "The broker URL.", "broker", "false"},
+ {"h", "The virtual host to use.", "virtual host", "false"},
+ {"o", "The name of the directory to output test timings to.", "dir", "false"}
+ }));
+
+ // Extract the command line options.
+ String brokerUrl = options.getProperty("b");
+ String virtualHost = options.getProperty("h");
+ String reportDir = options.getProperty("o");
+ reportDir = (reportDir == null) ? "." : reportDir;
+
+
+ String[] testClassNames = {SustainedTestCoordinator.class.getName()};
+
+ // Create a coordinator and begin its test procedure.
+ Coordinator coordinator = new TestCoordinator(brokerUrl, virtualHost);
+
+ coordinator.setReportDir(reportDir);
+
+ TestResult testResult = coordinator.start(testClassNames);
+
+ if (testResult.failureCount() > 0)
+ {
+ System.exit(FAILURE_EXIT);
+ }
+ else
+ {
+ System.exit(SUCCESS_EXIT);
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ log.error("Top level handler caught execption.", e);
+ System.exit(EXCEPTION_EXIT);
+ }
+ }
+}
Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/sustained/TestCoordinator.java
------------------------------------------------------------------------------
svn:keywords = Rev Date