You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/07/25 14:18:03 UTC
svn commit: r559419 [3/3] - in /incubator/qpid/branches/M2/java:
broker/src/main/java/org/apache/qpid/server/queue/
common/src/main/java/org/apache/qpid/util/ etc/
integrationtests/src/main/java/org/apache/qpid/interop/coordinator/
integrationtests/src...
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java?view=diff&rev=559419&r1=559418&r2=559419
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java Wed Jul 25 05:17:59 2007
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.server.exchange;
+import org.apache.qpid.interop.coordinator.sequencers.TestCaseSequencer;
import org.apache.qpid.test.framework.Circuit;
-import org.apache.qpid.test.framework.CircuitImpl;
import org.apache.qpid.test.framework.FrameworkBaseCase;
import org.apache.qpid.test.framework.MessagingTestConfigProperties;
import static org.apache.qpid.test.framework.MessagingTestConfigProperties.*;
@@ -67,6 +67,16 @@
/** Used to read the tests configurable properties through. */
ParsedProperties testProps;
+ /**
+ * Creates a new test case with the specified name.
+ *
+ * @param name The test case name.
+ */
+ public MandatoryMessageTest(String name)
+ {
+ super(name);
+ }
+
/** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
public void test_QPID_508_MandatoryOkNoTxP2P()
{
@@ -74,10 +84,10 @@
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, false);
- Circuit testCircuit = CircuitImpl.createCircuit(testProps);
-
- // Send one message with no errors.
- assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noExceptionsAssertion())));
+ // Run the default test sequence over the test circuit checking for no errors.
+ TestCaseSequencer sequencer = getTestSequencer();
+ Circuit testCircuit = sequencer.createCircuit(testProps);
+ sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noExceptionsAssertion()), testProps);
}
/** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
@@ -87,10 +97,10 @@
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, false);
- Circuit testCircuit = CircuitImpl.createCircuit(testProps);
-
- // Send one message with no errors.
- assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noExceptionsAssertion())));
+ // Run the default test sequence over the test circuit checking for no errors.
+ TestCaseSequencer sequencer = getTestSequencer();
+ Circuit testCircuit = sequencer.createCircuit(testProps);
+ sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noExceptionsAssertion()), testProps);
}
/**
@@ -103,13 +113,14 @@
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, false);
- Circuit testCircuit = CircuitImpl.createCircuit(testProps);
+ TestCaseSequencer sequencer = getTestSequencer();
+ Circuit testCircuit = sequencer.createCircuit(testProps);
// Disconnect the consumer.
testCircuit.getReceiver().getConsumer().close();
// Send one message with no errors.
- assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noExceptionsAssertion())));
+ sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noExceptionsAssertion()), testProps);
}
/**
@@ -122,13 +133,14 @@
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, false);
- Circuit testCircuit = CircuitImpl.createCircuit(testProps);
+ TestCaseSequencer sequencer = getTestSequencer();
+ Circuit testCircuit = sequencer.createCircuit(testProps);
// Disconnect the consumer.
testCircuit.getReceiver().getConsumer().close();
// Send one message with no errors.
- assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noExceptionsAssertion())));
+ sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noExceptionsAssertion()), testProps);
}
/** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
@@ -138,14 +150,14 @@
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, false);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receivers to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
- Circuit testCircuit = CircuitImpl.createCircuit(testProps);
-
- // Send one message and get a linked no consumers exception.
- assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noRouteAssertion())));
+ // Send one message and get a linked no route exception.
+ TestCaseSequencer sequencer = getTestSequencer();
+ Circuit testCircuit = sequencer.createCircuit(testProps);
+ sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noRouteAssertion()), testProps);
}
/** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
@@ -155,14 +167,14 @@
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, false);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receivers to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
- Circuit testCircuit = CircuitImpl.createCircuit(testProps);
-
- // Send one message and get a linked no consumers exception.
- assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noRouteAssertion())));
+ // Send one message and get a linked no route exception.
+ TestCaseSequencer sequencer = getTestSequencer();
+ Circuit testCircuit = sequencer.createCircuit(testProps);
+ sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noRouteAssertion()), testProps);
}
/** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
@@ -172,10 +184,10 @@
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, true);
- Circuit testCircuit = CircuitImpl.createCircuit(testProps);
-
- // Send one message with no errors.
- assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noExceptionsAssertion())));
+ // Run the default test sequence over the test circuit checking for no errors.
+ TestCaseSequencer sequencer = getTestSequencer();
+ Circuit testCircuit = sequencer.createCircuit(testProps);
+ sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noExceptionsAssertion()), testProps);
}
/** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
@@ -185,10 +197,10 @@
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, true);
- Circuit testCircuit = CircuitImpl.createCircuit(testProps);
-
- // Send one message with no errors.
- assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noExceptionsAssertion())));
+ // Run the default test sequence over the test circuit checking for no errors.
+ TestCaseSequencer sequencer = getTestSequencer();
+ Circuit testCircuit = sequencer.createCircuit(testProps);
+ sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noExceptionsAssertion()), testProps);
}
/**
@@ -204,13 +216,14 @@
// Use durable subscriptions, so that the route remains open with no subscribers.
testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
- Circuit testCircuit = CircuitImpl.createCircuit(testProps);
+ TestCaseSequencer sequencer = getTestSequencer();
+ Circuit testCircuit = sequencer.createCircuit(testProps);
// Disconnect the consumer.
testCircuit.getReceiver().getConsumer().close();
// Send one message with no errors.
- assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noExceptionsAssertion())));
+ sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noExceptionsAssertion()), testProps);
}
/**
@@ -226,13 +239,14 @@
// Use durable subscriptions, so that the route remains open with no subscribers.
testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
- Circuit testCircuit = CircuitImpl.createCircuit(testProps);
+ TestCaseSequencer sequencer = getTestSequencer();
+ Circuit testCircuit = sequencer.createCircuit(testProps);
// Disconnect the consumer.
testCircuit.getReceiver().getConsumer().close();
// Send one message with no errors.
- assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noExceptionsAssertion())));
+ sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noExceptionsAssertion()), testProps);
}
/** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
@@ -242,14 +256,14 @@
testProps.setProperty(TRANSACTED_PROPNAME, false);
testProps.setProperty(PUBSUB_PROPNAME, true);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receivers to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
- Circuit testCircuit = CircuitImpl.createCircuit(testProps);
-
- // Send one message and get a linked no consumers exception.
- assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noRouteAssertion())));
+ // Send one message and get a linked no route exception.
+ TestCaseSequencer sequencer = getTestSequencer();
+ Circuit testCircuit = sequencer.createCircuit(testProps);
+ sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noRouteAssertion()), testProps);
}
/** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
@@ -259,14 +273,14 @@
testProps.setProperty(TRANSACTED_PROPNAME, true);
testProps.setProperty(PUBSUB_PROPNAME, true);
- // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receivers to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
- Circuit testCircuit = CircuitImpl.createCircuit(testProps);
-
- // Send one message and get a linked no consumers exception.
- assertNoFailures(testCircuit.test(1, assertionList(testCircuit.getPublisher().noRouteAssertion())));
+ // Send one message and get a linked no route exception.
+ TestCaseSequencer sequencer = getTestSequencer();
+ Circuit testCircuit = sequencer.createCircuit(testProps);
+ sequencer.sequenceTest(testCircuit, assertionList(testCircuit.getPublisher().noRouteAssertion()), testProps);
}
protected void setUp() throws Exception
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=diff&rev=559419&r1=559418&r2=559419
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Wed Jul 25 05:17:59 2007
@@ -24,7 +24,6 @@
import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.client.*;
import org.apache.qpid.client.transport.TransportConnection;
@@ -127,8 +126,8 @@
con.start();
TextMessage tm = (TextMessage) consumer.receive(1000L);
- assertTrue("No message routed to receiver", tm != null);
- assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg3".equals(tm.getText()));
+ assertTrue("No message routed to receivers", tm != null);
+ assertTrue("Wrong message routed to receivers: " + tm.getText(), "msg3".equals(tm.getText()));
try
{
@@ -194,8 +193,8 @@
con.start();
TextMessage tm = (TextMessage) consumer.receive(1000L);
- assertTrue("No message routed to receiver", tm != null);
- assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText()));
+ assertTrue("No message routed to receivers", tm != null);
+ assertTrue("Wrong message routed to receivers: " + tm.getText(), "msg1".equals(tm.getText()));
try
{
@@ -260,8 +259,8 @@
con.start();
TextMessage tm = (TextMessage) consumer.receive(1000L);
- assertTrue("No message routed to receiver", tm != null);
- assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText()));
+ assertTrue("No message routed to receivers", tm != null);
+ assertTrue("Wrong message routed to receivers: " + tm.getText(), "msg1".equals(tm.getText()));
try
{
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java?view=diff&rev=559419&r1=559418&r2=559419
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java Wed Jul 25 05:17:59 2007
@@ -1,39 +1,41 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
- *
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQConnectionClosedException;
-import org.apache.qpid.util.CommandLineParser;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.log4j.Logger;
+import org.apache.qpid.util.CommandLineParser;
-import javax.jms.Session;
import javax.jms.JMSException;
-import javax.jms.Queue;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
import javax.jms.TextMessage;
+
import java.io.IOException;
import java.util.Properties;
@@ -41,7 +43,6 @@
{
private static final Logger _logger = Logger.getLogger(PersistentTestManual.class);
-
private static final String QUEUE = "direct://amq.direct//PersistentTest-Queue2?durable='true',exclusive='true'";
protected AMQConnection _connection;
@@ -89,7 +90,7 @@
public void test() throws AMQException, URLSyntaxException
{
- //Create the Durable Queue
+ // Create the Durable Queue
try
{
_session.createConsumer(_session.createQueue(QUEUE)).close();
@@ -121,16 +122,17 @@
System.out.println("Continuing....");
}
- //Test queue is still there.
- AMQConnection connection = new AMQConnection(_brokerDetails, _username, _password, "DifferentClientID", _virtualpath);
+ // Test queue is still there.
+ AMQConnection connection =
+ new AMQConnection(_brokerDetails, _username, _password, "DifferentClientID", _virtualpath);
AMQSession session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
session.createConsumer(session.createQueue(QUEUE));
- _logger.error("Create consumer succeeded." +
- " This shouldn't be allowed as this means the queue didn't exist when it should");
+ _logger.error("Create consumer succeeded."
+ + " This shouldn't be allowed as this means the queue didn't exist when it should");
connection.close();
@@ -189,6 +191,7 @@
{
//
}
+
System.exit(0);
}
@@ -196,7 +199,7 @@
{
String TEST_TEXT = "init";
- //Create a new session to send producer
+ // Create a new session to send producer
Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = session.createQueue(QUEUE);
@@ -204,10 +207,9 @@
producer.send(session.createTextMessage(TEST_TEXT));
- //create a new consumer on the original session
+ // create a new consumer on the original session
TextMessage m = (TextMessage) _session.createConsumer(q).receive();
-
if ((m != null) && m.getText().equals(TEST_TEXT))
{
return true;
@@ -216,6 +218,7 @@
{
_logger.error("Incorrect values returned from Queue Test:" + m);
System.exit(0);
+
return false;
}
}
@@ -259,8 +262,8 @@
{
PersistentTestManual test;
- Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][]{}));
-
+ Properties options =
+ CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
test = new PersistentTestManual(options);
try
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Circuit.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Circuit.java?view=diff&rev=559419&r1=559418&r2=559419
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Circuit.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Circuit.java Wed Jul 25 05:17:59 2007
@@ -24,7 +24,7 @@
/**
* A Circuit is the basic test unit against which test cases are to be written. A circuit consists of two 'ends', an
- * instigating 'publisher' end and a more passive 'receiver' end.
+ * instigating 'publisher' end and a more passive 'receivers' end.
*
* <p/>Once created, the life-cycle of a circuit may be controlled by {@link #start()}ing it, or {@link #close()}ing it.
* Once started, the circuit is ready to send messages over. Once closed the circuit can no longer be used.
@@ -97,10 +97,10 @@
*/
public List<Assertion> applyAssertions(List<Assertion> assertions);
- /**
+ /*
* Sends a message on the test circuit. The exact nature of the message sent is controlled by the test parameters.
*/
- public void send();
+ // public void send();
/**
* Runs the default test procedure against the circuit, and checks that all of the specified assertions hold.
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitImpl.java?view=diff&rev=559419&r1=559418&r2=559419
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitImpl.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/CircuitImpl.java Wed Jul 25 05:17:59 2007
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.test.framework;
-import junit.framework.Assert;
-
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.framework.MessageMonitor;
@@ -49,6 +47,9 @@
* <tr><td> Perform the default test procedure on the circuit.
* <tr><td> Provide access to connection and session exception monitors <td> {@link ExceptionMonitor}
* </table>
+ *
+ * @todo Add ability to create routes with no consumers active on them. Immediate/Mandatory tests are closing consumers
+ * themsleves to create this scenario. Should make it part of the test configuration.
*/
public class CircuitImpl implements Circuit
{
@@ -74,12 +75,12 @@
private ExceptionMonitor exceptionMonitor;
/**
- * Creates a test circuit using the specified test parameters. The publisher, receiver, connection and
+ * Creates a test circuit using the specified test parameters. The publisher, receivers, connection and
* connection monitor must already have been created, to assemble the circuit.
*
* @param testProps The test parameters.
* @param publisher The test publisher.
- * @param receiver The test receiver.
+ * @param receiver The test receivers.
* @param connection The connection.
* @param connectionExceptionMonitor The connection exception monitor.
*/
@@ -93,7 +94,7 @@
this.connectionExceptionMonitor = connectionExceptionMonitor;
this.exceptionMonitor = new ExceptionMonitor();
- // Set this as the parent circuit on the publisher and receiver.
+ // Set this as the parent circuit on the publisher and receivers.
publisher.setCircuit(this);
receiver.setCircuit(this);
}
@@ -107,9 +108,11 @@
*/
public static Circuit createCircuit(ParsedProperties testProps)
{
- // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
+ // Create a standard publisher/receivers test client pair on a shared connection, individual sessions.
try
{
+ // ParsedProperties testProps = new ParsedProperties(testProps);
+
// Get a unique offset to append to destination names to make them unique to the connection.
long uniqueId = uniqueDestsId.incrementAndGet();
@@ -211,7 +214,7 @@
}
catch (JMSException e)
{
- throw new RuntimeException("Could not create publisher/receiver pair due to a JMSException.", e);
+ throw new RuntimeException("Could not create publisher/receivers pair due to a JMSException.", e);
}
}
@@ -348,7 +351,7 @@
// Apply all of the requested assertions, keeping record of any that fail.
List<Assertion> failures = applyAssertions(assertions);
- // Clean up the publisher/receiver/session/connections.
+ // Clean up the publisher/receivers/session/connections.
close();
// Return any failed assertions to the caller.
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkBaseCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkBaseCase.java?view=diff&rev=559419&r1=559418&r2=559419
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkBaseCase.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/FrameworkBaseCase.java Wed Jul 25 05:17:59 2007
@@ -25,11 +25,18 @@
import org.apache.log4j.NDC;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.interop.coordinator.sequencers.TestCaseSequencer;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Properties;
/**
* FrameworkBaseCase provides a starting point for writing test cases against the test framework. Its main purpose is
@@ -45,6 +52,42 @@
*/
public class FrameworkBaseCase extends TestCase
{
+ /** Holds the test sequencer to create and run test circuits with. */
+ protected TestCaseSequencer testSequencer = new DefaultTestSequencer();
+
+ /**
+ * Creates a new test case with the specified name.
+ *
+ * @param name The test case name.
+ */
+ public FrameworkBaseCase(String name)
+ {
+ super(name);
+ }
+
+ /**
+ * Returns the test case sequencer that provides test circuit, and test sequence implementations. The sequencer
+ * that this base case returns by default is suitable for running a test circuit with both circuit ends colocated
+ * on the same JVM.
+ *
+ * @return The test case sequencer.
+ */
+ protected TestCaseSequencer getTestSequencer()
+ {
+ return testSequencer;
+ }
+
+ /**
+ * Overrides the default test sequencer. Test decorators can use this to supply distributed test sequencers or other
+ * test sequencer specializations.
+ *
+ * @param sequencer The new test sequencer.
+ */
+ public void setTestSequencer(TestCaseSequencer sequencer)
+ {
+ this.testSequencer = sequencer;
+ }
+
/**
* Creates a list of assertions.
*
@@ -131,6 +174,37 @@
finally
{
NDC.pop();
+ }
+ }
+
+ /**
+ * DefaultTestSequencer is a test sequencer that creates test circuits with publishing and receiving ends rooted
+ * on the same JVM.
+ */
+ public class DefaultTestSequencer implements TestCaseSequencer
+ {
+ /**
+ * Holds a test coordinating conversation with the test clients. This should consist of assigning the test roles,
+ * begining the test and gathering the test reports from the participants.
+ *
+ * @param testCircuit The test circuit.
+ * @param assertions The list of assertions to apply to the test circuit.
+ * @param testProperties The test case definition.
+ */
+ public void sequenceTest(Circuit testCircuit, List<Assertion> assertions, Properties testProperties)
+ {
+ assertNoFailures(testCircuit.test(1, assertions));
+ }
+
+ /**
+ * Creates a test circuit for the test, configered by the test parameters specified.
+ *
+ * @param testProperties The test parameters.
+ * @return A test circuit.
+ */
+ public Circuit createCircuit(ParsedProperties testProperties)
+ {
+ return CircuitImpl.createCircuit(testProperties);
}
}
}
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessagingTestConfigProperties.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessagingTestConfigProperties.java?view=diff&rev=559419&r1=559418&r2=559419
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessagingTestConfigProperties.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/MessagingTestConfigProperties.java Wed Jul 25 05:17:59 2007
@@ -48,7 +48,7 @@
* <tr><td> destinationCount <td> 1 <td> The number of receivers listening to the pings.
* <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
* <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
- * <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> uniqueDests <td> true <td> Whether each receivers only listens to one ping destination or all.
* <tr><td> durableDests <td> false <td> Whether or not durable destinations are used.
* <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
* 0 - SESSION_TRANSACTED
@@ -81,12 +81,6 @@
/** Defines the class to use as the initial context factory by default. */
public static final String INITIAL_CONTEXT_FACTORY_DEFAULT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
- /** Holds the name of the default connection factory configuration property. */
- public static final String CONNECTION_PROPNAME = "connectionfactory.broker";
-
- /** Defeins the default connection configuration. */
- public static final String CONNECTION_DEFAULT = "amqp://guest:guest@clientid/?brokerlist='vm://:1'";
-
/** Holds the name of the property to get the test broker url from. */
public static final String BROKER_PROPNAME = "qpid.test.broker";
@@ -125,16 +119,16 @@
/** Holds the default value of the publisher consumer flag. */
public static final boolean PUBLISHER_CONSUMER_BIND_DEFAULT = false;
- /** Holds the name of the property to get the bind receiver procuder flag from. */
+ /** Holds the name of the property to get the bind receivers procuder flag from. */
public static final String RECEIVER_PRODUCER_BIND_PROPNAME = "receiverProducerBind";
- /** Holds the default value of the receiver producer flag. */
+ /** Holds the default value of the receivers producer flag. */
public static final boolean RECEIVER_PRODUCER_BIND_DEFAULT = false;
- /** Holds the name of the property to get the bind receiver procuder flag from. */
+ /** Holds the name of the property to get the bind receivers procuder flag from. */
public static final String RECEIVER_CONSUMER_BIND_PROPNAME = "receiverConsumerBind";
- /** Holds the default value of the receiver consumer flag. */
+ /** Holds the default value of the receivers consumer flag. */
public static final boolean RECEIVER_CONSUMER_BIND_DEFAULT = true;
/** Holds the name of the property to get the destination name root from. */
@@ -275,7 +269,7 @@
static
{
defaults.setPropertyIfNull(INITIAL_CONTEXT_FACTORY_PROPNAME, INITIAL_CONTEXT_FACTORY_DEFAULT);
- defaults.setPropertyIfNull(CONNECTION_PROPNAME, CONNECTION_DEFAULT);
+ // defaults.setPropertyIfNull(CONNECTION_PROPNAME, CONNECTION_DEFAULT);
defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
defaults.setPropertyIfNull(PUBLISHER_PRODUCER_BIND_PROPNAME, PUBLISHER_PRODUCER_BIND_DEFAULT);
defaults.setPropertyIfNull(PUBLISHER_CONSUMER_BIND_PROPNAME, PUBLISHER_CONSUMER_BIND_DEFAULT);
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Receiver.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Receiver.java?view=diff&rev=559419&r1=559418&r2=559419
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Receiver.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/Receiver.java Wed Jul 25 05:17:59 2007
@@ -22,27 +22,27 @@
/**
* A Receiver is a {@link CircuitEnd} that represents one end of a test circuit. Its main purpose is to
- * provide assertions that can be applied to test the behaviour of the receiver.
+ * provide assertions that can be applied to test the behaviour of the receivers.
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities
- * <tr><td> Provide assertion that the receiver received no exceptions.
- * <tr><td> Provide assertion that the receiver received all test messages sent to it.
+ * <tr><td> Provide assertion that the receivers received no exceptions.
+ * <tr><td> Provide assertion that the receivers received all test messages sent to it.
* </table>
*/
public interface Receiver extends CircuitEnd
{
/**
- * Provides an assertion that the receiver encountered no exceptions.
+ * Provides an assertion that the receivers encountered no exceptions.
*
- * @return An assertion that the receiver encountered no exceptions.
+ * @return An assertion that the receivers encountered no exceptions.
*/
public Assertion noExceptionsAssertion();
/**
- * Provides an assertion that the receiver got all messages that were sent to it.
+ * Provides an assertion that the receivers got all messages that were sent to it.
*
- * @return An assertion that the receiver got all messages that were sent to it.
+ * @return An assertion that the receivers got all messages that were sent to it.
*/
public Assertion allMessagesAssertion();
}
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ReceiverImpl.java?view=diff&rev=559419&r1=559418&r2=559419
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ReceiverImpl.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/ReceiverImpl.java Wed Jul 25 05:17:59 2007
@@ -32,8 +32,8 @@
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Provide a message producer for sending messages.
* <tr><td> Provide a message consumer for receiving messages.
- * <tr><td> Provide assertion that the receiver received no exceptions.
- * <tr><td> Provide assertion that the receiver received all test messages sent to it.
+ * <tr><td> Provide assertion that the receivers received no exceptions.
+ * <tr><td> Provide assertion that the receivers received all test messages sent to it.
* </table>
*/
public class ReceiverImpl extends CircuitEndBase implements Receiver
@@ -54,9 +54,9 @@
}
/**
- * Provides an assertion that the receiver encountered no exceptions.
+ * Provides an assertion that the receivers encountered no exceptions.
*
- * @return An assertion that the receiver encountered no exceptions.
+ * @return An assertion that the receivers encountered no exceptions.
*/
public Assertion noExceptionsAssertion()
{
@@ -64,9 +64,9 @@
}
/**
- * Provides an assertion that the receiver got all messages that were sent to it.
+ * Provides an assertion that the receivers got all messages that were sent to it.
*
- * @return An assertion that the receiver got all messages that were sent to it.
+ * @return An assertion that the receivers got all messages that were sent to it.
*/
public Assertion allMessagesAssertion()
{
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/TestUtils.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/TestUtils.java?view=diff&rev=559419&r1=559418&r2=559419
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/TestUtils.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/test/framework/TestUtils.java Wed Jul 25 05:17:59 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.test.framework;
+import org.apache.log4j.Logger;
+
import static org.apache.qpid.test.framework.MessagingTestConfigProperties.*;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
@@ -27,10 +29,14 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
+import java.util.Properties;
+import java.util.Map;
+
/**
* TestUtils provides static helper methods that are usefull for writing tests against QPid.
*
@@ -42,18 +48,34 @@
*/
public class TestUtils
{
+ /** Used for debugging. */
+ private static Logger log = Logger.getLogger(TestUtils.class);
+
/**
- * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple
- * convenience method for code that does anticipate handling connection failures. All exceptions that indicate
- * that the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure
+ * Establishes a JMS connection using a set of properties and qpids built in JNDI implementation. This is a simple
+ * convenience method for code that does not anticipate handling connection failures. All exceptions that indicate
+ * that the connection has failed, are wrapped as rutime exceptions, presumably handled by a top level failure
* handler.
*
+ * <p/>This utility makes use of the following test parameters from {@link MessagingTestConfigProperties} to control
+ * the connection creation:
+ *
+ * <p/><table>
+ * <tr><td> {@link MessagingTestConfigProperties#USERNAME_PROPNAME} <td> The username.
+ * <tr><td> {@link MessagingTestConfigProperties#PASSWORD_PROPNAME} <td> The password.
+ * <tr><td> {@link MessagingTestConfigProperties#VIRTUAL_HOST_PROPNAME} <td> The virtual host name.
+ * <tr><td> {@link MessagingTestConfigProperties#BROKER_PROPNAME} <td> The broker URL.
+ * <tr><td> {@link MessagingTestConfigProperties#CONNECTION_NAME} <td> The broker name in the initial context.
+ *
* @param messagingProps Connection properties as defined in {@link MessagingTestConfigProperties}.
*
* @return A JMS conneciton.
*/
public static Connection createConnection(ParsedProperties messagingProps)
{
+ log.debug("public static Connection createConnection(ParsedProperties messagingProps = " + messagingProps
+ + "): called");
+
try
{
// Extract the configured connection properties from the test configuration.
@@ -62,12 +84,14 @@
String virtualHost = messagingProps.getProperty(VIRTUAL_HOST_PROPNAME);
String brokerUrl = messagingProps.getProperty(BROKER_PROPNAME);
- // Set up the broker connection url.
+ // Create the broker connection url.
String connectionString =
- "amqp://" + conUsername + ":" + conPassword + "/" + ((virtualHost != null) ? virtualHost : "")
+ "amqp://" + conUsername + ":" + conPassword + "@clientid/" + ((virtualHost != null) ? virtualHost : "")
+ "?brokerlist='" + brokerUrl + "'";
- // messagingProps.setProperty(CONNECTION_PROPNAME, connectionString);
+ // Create properties to create the initial context from, and inject the connection factory configuration
+ // for the defined connection name into it.
+ messagingProps.setProperty("connectionfactory." + CONNECTION_NAME, connectionString);
Context ctx = new InitialContext(messagingProps);
@@ -106,6 +130,27 @@
Thread.currentThread().interrupt();
throw new RuntimeException("Failed to generate the requested pause length.", e);
+ }
+ }
+
+ /**
+ * Sets properties of different types on a JMS Message.
+ *
+ * @param message The message to set properties on.
+ * @param properties The property name/value pairs to set.
+ *
+ * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
+ *
+ * @todo Move this helper method somewhere else. For example, TestUtils.
+ */
+ public static void setPropertiesOnMessage(Message message, Map<Object, Object> properties) throws JMSException
+ {
+ for (Map.Entry<Object, Object> entry : properties.entrySet())
+ {
+ String name = entry.getKey().toString();
+ Object value = entry.getValue();
+
+ message.setObjectProperty(name, value);
}
}
}
Added: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/util/ClasspathScanner.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/util/ClasspathScanner.java?view=auto&rev=559419
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/util/ClasspathScanner.java (added)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/util/ClasspathScanner.java Wed Jul 25 05:17:59 2007
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.io.File;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+/**
+ * An ClasspathScanner scans the classpath for classes that implement an interface or extend a base class and have names
+ * that match a regular expression.
+ *
+ * <p/>In order to test whether a class implements an interface or extends a class, the class must be loaded (unless
+ * the class files were to be scanned directly). Using this collector can cause problems when it scans the classpath,
+ * because loading classes will initialize their statics, which in turn may cause undesired side effects. For this
+ * reason, the collector should always be used with a regular expression, through which the class file names are
+ * filtered, and only those that pass this filter will be tested. For example, if you define tests in classes that
+ * end with the keyword "Test" then use the regular expression "Test$" to match this.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Find all classes matching type and name pattern on the classpath.
+ * </table>
+ *
+ * @todo Add logic to scan jars as well as directories.
+ */
+public class ClasspathScanner
+{
+ private static final Logger log = Logger.getLogger(ClasspathScanner.class);
+
+ /**
+ * Scans the classpath and returns all classes that extend a specified class and match a specified name.
+ * There is an flag that can be used to indicate that only Java Beans will be matched (that is, only those classes
+ * that have a default constructor).
+ *
+ * @param matchingClass The class or interface to match.
+ * @param matchingRegexp The regular expression to match against the class name.
+ * @param beanOnly Flag to indicate that onyl classes with default constructors should be matched.
+ *
+ * @return All the classes that match this collector.
+ */
+ public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass, String matchingRegexp,
+ boolean beanOnly)
+ {
+ log.debug("public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass = " + matchingClass
+ + ", String matchingRegexp = " + matchingRegexp + ", boolean beanOnly = " + beanOnly + "): called");
+
+ // Build a compiled regular expression from the pattern to match.
+ Pattern matchPattern = Pattern.compile(matchingRegexp);
+
+ String classPath = System.getProperty("java.class.path");
+ Map<String, Class<? extends T>> result = new HashMap<String, Class<? extends T>>();
+
+ log.debug("classPath = " + classPath);
+
+ // Find matching classes starting from all roots in the classpath.
+ for (String path : splitClassPath(classPath))
+ {
+ gatherFiles(new File(path), "", result, matchPattern, matchingClass);
+ }
+
+ return result.values();
+ }
+
+ /**
+ * Finds all matching classes rooted at a given location in the file system. If location is a directory it
+ * is recursively examined.
+ *
+ * @param classRoot The root of the current point in the file system being examined.
+ * @param classFileName The name of the current file or directory to examine.
+ * @param result The accumulated mapping from class names to classes that match the scan.
+ *
+ * @todo Recursion ok as file system depth is not likely to exhaust the stack. Might be better to replace with
+ * iteration.
+ */
+ private static <T> void gatherFiles(File classRoot, String classFileName, Map<String, Class<? extends T>> result,
+ Pattern matchPattern, Class<? extends T> matchClass)
+ {
+ log.debug("private static <T> void gatherFiles(File classRoot = " + classRoot + ", String classFileName = "
+ + classFileName + ", Map<String, Class<? extends T>> result, Pattern matchPattern = " + matchPattern
+ + ", Class<? extends T> matchClass = " + matchClass + "): called");
+
+ File thisRoot = new File(classRoot, classFileName);
+
+ // If the current location is a file, check if it is a matching class.
+ if (thisRoot.isFile())
+ {
+ // Check that the file has a matching name.
+ if (matchesName(thisRoot.getName(), matchPattern))
+ {
+ String className = classNameFromFile(thisRoot.getName());
+
+ // Check that the class has matching type.
+ try
+ {
+ Class<?> candidateClass = Class.forName(className);
+
+ Class matchedClass = matchesClass(candidateClass, matchClass);
+
+ if (matchedClass != null)
+ {
+ result.put(className, matchedClass);
+ }
+ }
+ catch (ClassNotFoundException e)
+ {
+ // Ignore this. The matching class could not be loaded.
+ log.debug("Got ClassNotFoundException, ignoring.", e);
+ }
+ }
+
+ return;
+ }
+ // Otherwise the current location is a directory, so examine all of its contents.
+ else
+ {
+ String[] contents = thisRoot.list();
+
+ if (contents != null)
+ {
+ for (String content : contents)
+ {
+ gatherFiles(classRoot, classFileName + File.separatorChar + content, result, matchPattern, matchClass);
+ }
+ }
+ }
+ }
+
+ /**
+ * Checks if the specified class file name corresponds to a class with name matching the specified regular expression.
+ *
+ * @param classFileName The class file name.
+ * @param matchPattern The regular expression pattern to match.
+ *
+ * @return <tt>true</tt> if the class name matches, <tt>false</tt> otherwise.
+ */
+ private static boolean matchesName(String classFileName, Pattern matchPattern)
+ {
+ String className = classNameFromFile(classFileName);
+ Matcher matcher = matchPattern.matcher(className);
+
+ return matcher.matches();
+ }
+
+ /**
+ * Checks if the specified class to compare extends the base class being scanned for.
+ *
+ * @param matchingClass The base class to match against.
+ * @param toMatch The class to match against the base class.
+ *
+ * @return The class to check, cast as an instance of the class to match if the class extends the base class, or
+ * <tt>null</tt> otherwise.
+ */
+ private static <T> Class<? extends T> matchesClass(Class<?> matchingClass, Class<? extends T> toMatch)
+ {
+ try
+ {
+ return matchingClass.asSubclass(toMatch);
+ }
+ catch (ClassCastException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Takes a classpath (which is a series of paths) and splits it into its component paths.
+ *
+ * @param classPath The classpath to split.
+ *
+ * @return A list of the component paths that make up the class path.
+ */
+ private static List<String> splitClassPath(String classPath)
+ {
+ List<String> result = new LinkedList<String>();
+ String separator = System.getProperty("path.separator");
+ StringTokenizer tokenizer = new StringTokenizer(classPath, separator);
+
+ while (tokenizer.hasMoreTokens())
+ {
+ result.add(tokenizer.nextToken());
+ }
+
+ return result;
+ }
+
+ /**
+ * Translates from the filename of a class to its fully qualified classname. Files are named using forward slash
+ * seperators and end in ".class", whereas fully qualified class names use "." sperators and no ".class" ending.
+ *
+ * @param classFileName The filename of the class to translate to a class name.
+ *
+ * @return The fully qualified class name.
+ */
+ private static String classNameFromFile(String classFileName)
+ {
+ log.debug("private static String classNameFromFile(String classFileName = " + classFileName + "): called");
+
+ // Remove the .class ending.
+ String s = classFileName.substring(0, classFileName.length() - ".class".length());
+
+ // Turn / seperators in . seperators.
+ String s2 = s.replace(File.separatorChar, '.');
+
+ // Knock off any leading . caused by a leading /.
+ if (s2.startsWith("."))
+ {
+ return s2.substring(1);
+ }
+
+ return s2;
+ }
+}
Added: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/util/ConversationFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/util/ConversationFactory.java?view=auto&rev=559419
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/util/ConversationFactory.java (added)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/util/ConversationFactory.java Wed Jul 25 05:17:59 2007
@@ -0,0 +1,479 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
+ * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
+ * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.
+ *
+ * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a
+ * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation
+ * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order
+ * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded
+ * conversation (the conversation methods can be called many times in parallel):
+ *
+ * <p/><pre>
+ * class Initiator
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, null,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * initiateConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
+ * Message greeting = conversation.receive();
+ *
+ * // Exchange goodbyes.
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * Message goodbye = conversation.receive();
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ *
+ * class Responder
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * respondToConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * Message greeting = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Hello."));
+ *
+ * // Exchange goodbyes.
+ * Message goodbye = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * <p/>Conversation correlation id's are generated on a per thread basis.
+ *
+ * <p/>The same session is shared amongst all conversations. Calls to send are therefore synchronized because JMS
+ * sessions are not multi-threaded.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><th> Associate messages to an ongoing conversation using correlation ids.
+ * <tr><td> Auto manage sessions for conversations.
+ * <tr><td> Store messages not in a conversation in dead letter box.
+ * </table>
+ */
+public class ConversationFactory
+{
+ /** Used for debugging. */
+ private static final Logger log = Logger.getLogger(ConversationFactory.class);
+
+ /** Holds a map from correlation id's to queues. */
+ private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>();
+
+ /** Holds the connection over which the conversation is conducted. */
+ private Connection connection;
+
+ /** Holds the session over which the conversation is conduxted. */
+ private Session session;
+
+ /** The message consumer for incoming messages. */
+ MessageConsumer consumer;
+
+ /** The message producer for outgoing messages. */
+ MessageProducer producer;
+
+ /** The well-known or temporary destination to receive replies on. */
+ Destination receiveDestination;
+
+ /** Holds the queue implementation class for the reply queue. */
+ Class<? extends BlockingQueue> queueClass;
+
+ /** Used to hold any replies that are received outside of the context of a conversation. */
+ BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>();
+
+ /* Used to hold conversation state on a per thread basis. */
+ /*
+ ThreadLocal<Conversation> threadLocals =
+ new ThreadLocal<Conversation>()
+ {
+ protected Conversation initialValue()
+ {
+ Conversation settings = new Conversation();
+ settings.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return settings;
+ }
+ };
+ */
+
+ /** Generates new coversation id's as needed. */
+ AtomicLong conversationIdGenerator = new AtomicLong();
+
+ /**
+ * Creates a conversation helper on the specified connection with the default sending destination, and listening
+ * to the specified receiving destination.
+ *
+ * @param connection The connection to build the conversation helper on.
+ * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary
+ * queue.
+ * @param queueClass The queue implementation class.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public ConversationFactory(Connection connection, Destination receiveDestination,
+ Class<? extends BlockingQueue> queueClass) throws JMSException
+ {
+ log.debug("public ConversationFactory(Connection connection, Destination receiveDestination = " + receiveDestination
+ + ", Class<? extends BlockingQueue> queueClass = " + queueClass + "): called");
+
+ this.connection = connection;
+ this.queueClass = queueClass;
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Check if a well-known receive destination has been provided, or use a temporary queue if not.
+ this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue();
+
+ consumer = session.createConsumer(receiveDestination);
+ producer = session.createProducer(null);
+
+ consumer.setMessageListener(new Receiver());
+ }
+
+ /**
+ * Creates a new conversation context.
+ *
+ * @return A new conversation context.
+ */
+ public Conversation startConversation()
+ {
+ log.debug("public Conversation startConversation(): called");
+
+ Conversation conversation = new Conversation();
+ conversation.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return conversation;
+ }
+
+ /**
+ * Ensures that the reply queue for a conversation exists.
+ *
+ * @param conversationId The conversation correlation id.
+ */
+ private void initQueueForId(long conversationId)
+ {
+ if (!idsToQueues.containsKey(conversationId))
+ {
+ idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass));
+ }
+ }
+
+ /**
+ * Clears the dead letter box, returning all messages that were in it.
+ *
+ * @return All messages in the dead letter box.
+ */
+ public Collection<Message> emptyDeadLetterBox()
+ {
+ log.debug("public Collection<Message> emptyDeadLetterBox(): called");
+
+ Collection<Message> result = new ArrayList<Message>();
+ deadLetterBox.drainTo(result);
+
+ return result;
+ }
+
+ /**
+ * Gets the session over which the conversation is conducted.
+ *
+ * @return The session over which the conversation is conducted.
+ */
+ public Session getSession()
+ {
+ // Conversation settings = threadLocals.get();
+
+ return session;
+ }
+
+ /**
+ * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply
+ * destination automatically updated to the last received reply-to destination.
+ */
+ public class Conversation
+ {
+ /** Holds the correlation id for the context. */
+ long conversationId;
+
+ /**
+ * Holds the send destination for the context. This will automatically be updated to the most recently received
+ * reply-to destination.
+ */
+ Destination sendDestination;
+
+ /**
+ * Sends a message to the default sending location. The correlation id of the message will be assigned by this
+ * method, overriding any previously set value.
+ *
+ * @param sendDestination The destination to send to. This may be null to use the last received reply-to
+ * destination.
+ * @param message The message to send.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no
+ * send destination is specified and there is no most recent reply-to destination available
+ * to use.
+ */
+ public void send(Destination sendDestination, Message message) throws JMSException
+ {
+ log.debug("public void send(Destination sendDestination = " + sendDestination + ", Message message = " + message
+ + "): called");
+
+ // Conversation settings = threadLocals.get();
+ // long conversationId = conversationId;
+ message.setJMSCorrelationID(Long.toString(conversationId));
+ message.setJMSReplyTo(receiveDestination);
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ // Check if an overriding send to destination has been set or use the last reply-to if not.
+ Destination sendTo = null;
+
+ if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else
+ {
+ throw new JMSException("The send destination was specified, and no most recent reply-to available to use.");
+ }
+
+ // Send the message.
+ synchronized (this)
+ {
+ producer.send(sendTo, message);
+ }
+ }
+
+ /**
+ * Gets the next message in an ongoing conversation. This method may block until such a message is received.
+ *
+ * @return The next incoming message in the conversation.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message
+ * did not have its reply-to destination set up.
+ */
+ public Message receive() throws JMSException
+ {
+ log.debug("public Message receive(): called");
+
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ try
+ {
+ Message result = queue.take();
+
+ // Keep the reply-to destination to send replies to.
+ sendDestination = result.getJMSReplyTo();
+
+ return result;
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are
+ * received they will be returned. If a timeout is specified, then all messages up to the limit, received within
+ * that timespan will be returned. At least one of the message count or timeout should be set to a value of
+ * 1 or greater.
+ *
+ * @param num The number of messages to receive, or all if this is less than 1.
+ * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1.
+ *
+ * @return All messages received within the count limit and the timeout.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ */
+ public Collection<Message> receiveAll(int num, long timeout) throws JMSException
+ {
+ log.debug("public Collection<Message> receiveAll(int num = " + num + ", long timeout = " + timeout
+ + "): called");
+
+ // Check that a timeout or message count was set.
+ if ((num < 1) && (timeout < 1))
+ {
+ throw new IllegalArgumentException("At least one of message count (num) or timeout must be set.");
+ }
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ // Used to collect the received messages in.
+ Collection<Message> result = new ArrayList<Message>();
+
+ // Used to indicate when the timeout or message count has expired.
+ boolean receiveMore = true;
+
+ int messageCount = 0;
+
+ // Receive messages until the timeout or message count expires.
+ do
+ {
+ try
+ {
+ Message next = null;
+
+ // Try to receive the message with a timeout if one has been set.
+ if (timeout > 0)
+ {
+ next = queue.poll(timeout, TimeUnit.MILLISECONDS);
+
+ // Check if the timeout expired, and stop receiving if so.
+ if (next == null)
+ {
+ receiveMore = false;
+ }
+ }
+ // Receive the message without a timeout.
+ else
+ {
+ next = queue.take();
+ }
+
+ // Increment the message count if a message was received.
+ messageCount += (next != null) ? 1 : 0;
+
+ // Check if all the requested messages were received, and stop receiving if so.
+ if ((num > 0) && (messageCount >= num))
+ {
+ receiveMore = false;
+ }
+
+ // Keep the reply-to destination to send replies to.
+ sendDestination = (next != null) ? next.getJMSReplyTo() : sendDestination;
+
+ if (next != null)
+ {
+ result.add(next);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the threads interrupted status.
+ Thread.currentThread().interrupt();
+
+ // Stop receiving but return the messages received so far.
+ receiveMore = false;
+ }
+ }
+ while (receiveMore);
+
+ return result;
+ }
+
+ /**
+ * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any
+ * incoming messages using them will go to the dead letter box.
+ */
+ public void end()
+ {
+ log.debug("public void end(): called");
+
+ // Ensure that the thread local for the current thread is cleaned up.
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+ // threadLocals.remove();
+
+ // Ensure that its queue is removed from the queue map.
+ BlockingQueue<Message> queue = idsToQueues.remove(conversationId);
+
+ // Move any outstanding messages on the threads conversation id into the dead letter box.
+ queue.drainTo(deadLetterBox);
+ }
+ }
+
+ /**
+ * Implements the message listener for this conversation handler.
+ */
+ protected class Receiver implements MessageListener
+ {
+ /**
+ * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id
+ * and placed into queues.
+ *
+ * @param message The incoming message.
+ */
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message = " + message + "): called");
+
+ try
+ {
+ Long conversationId = Long.parseLong(message.getJMSCorrelationID());
+
+ // Find the converstaion queue to place the message on. If there is no conversation for the message id,
+ // the the dead letter box queue is used.
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+ queue = (queue == null) ? deadLetterBox : queue;
+
+ queue.put(message);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}