You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/05/03 17:09:19 UTC
svn commit: r534903 - in
/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid:
interop/coordinator/ util/
Author: rgreig
Date: Thu May 3 08:09:18 2007
New Revision: 534903
URL: http://svn.apache.org/viewvc?view=rev&rev=534903
Log:
More interop test stuff.
Added:
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
Removed:
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java
Modified:
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java?view=diff&rev=534903&r1=534902&r2=534903
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java Thu May 3 08:09:18 2007
@@ -23,13 +23,13 @@
import java.util.Collection;
import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import javax.jms.*;
import junit.framework.TestCase;
-import org.apache.qpid.util.ConversationHelper;
+import org.apache.qpid.util.ConversationFactory;
/**
* An CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a
@@ -60,7 +60,8 @@
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Accept notification of test case participants. <td> {@link InvitingTestDecorator}
- * <tr><td> Coordinate the test sequence amongst participants. <td> {@link ConversationHelper}
+ * <tr><td> Accpet JMS Connection to carry out the coordination over.
+ * <tr><td> Coordinate the test sequence amongst participants. <td> {@link ConversationFactory}
* <tr><td> Supply test properties
* </table>
*/
@@ -72,7 +73,8 @@
/** Holds the contact details for the receving test client. */
TestClientDetails receiver;
- ConversationHelper conversation;
+ /** Holds the conversation factory over which to coordinate the test. */
+ ConversationFactory conversationFactory;
/**
* Creates a new coordinating test case with the specified name.
@@ -125,6 +127,16 @@
}
/**
+ * Accepts the conversation factory over which to hold the test coordinating conversation.
+ *
+ * @param conversationFactory The conversation factory to coordinate the test over.
+ */
+ public void setConversationFactory(ConversationFactory conversationFactory)
+ {
+ this.conversationFactory = conversationFactory;
+ }
+
+ /**
* Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner
* loop of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports
* from the participants.
@@ -137,41 +149,48 @@
*/
protected Message[] sequenceTest(Properties testProperties) throws JMSException
{
+ Session session = conversationFactory.getSession();
+ Destination senderControlTopic = session.createTopic(sender.privateControlKey);
+ Destination receiverControlTopic = session.createTopic(receiver.privateControlKey);
+
+ ConversationFactory.Conversation senderConversation = conversationFactory.startConversation();
+ ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation();
+
// Assign the sender role to the sending test client.
- Message assignSender = conversation.getSession().createMessage();
+ Message assignSender = conversationFactory.getSession().createMessage();
assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
assignSender.setStringProperty("ROLE", "SENDER");
- conversation.send(assignSender);
+ senderConversation.send(senderControlTopic, assignSender);
// Assign the receiver role the receiving client.
- Message assignReceiver = conversation.getSession().createMessage();
+ Message assignReceiver = session.createMessage();
assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
assignReceiver.setStringProperty("ROLE", "RECEIVER");
- conversation.send(assignReceiver);
+ receiverConversation.send(receiverControlTopic, assignReceiver);
// Wait for the senders and receivers to confirm their roles.
- conversation.receive();
- conversation.receive();
+ senderConversation.receive();
+ receiverConversation.receive();
// Start the test.
- Message start = conversation.getSession().createMessage();
+ Message start = session.createMessage();
start.setStringProperty("CONTROL_TYPE", "START");
- conversation.send(start);
+ senderConversation.send(senderControlTopic, start);
// Wait for the test sender to return its report.
- Message senderReport = conversation.receive();
+ Message senderReport = senderConversation.receive();
// Ask the receiver for its report.
- Message statusRequest = conversation.getSession().createMessage();
+ Message statusRequest = session.createMessage();
statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST");
- conversation.send(statusRequest);
+ receiverConversation.send(receiverControlTopic, statusRequest);
// Wait for the receiver to send its report.
- Message receiverReport = conversation.receive();
+ Message receiverReport = receiverConversation.receive();
return new Message[] { senderReport, receiverReport };
}
Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java?view=diff&rev=534903&r1=534902&r2=534903
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java Thu May 3 08:09:18 2007
@@ -38,7 +38,7 @@
import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P;
import org.apache.qpid.util.ClasspathScanner;
import org.apache.qpid.util.CommandLineParser;
-import org.apache.qpid.util.ConversationHelper;
+import org.apache.qpid.util.ConversationFactory;
import org.apache.qpid.util.PrettyPrintingUtils;
import uk.co.thebadgerset.junit.extensions.TestRunnerImprovedErrorHandling;
@@ -51,7 +51,7 @@
*
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Find out what test clients are available. <td> {@link ConversationHelper}
+ * <tr><td> Find out what test clients are available. <td> {@link ConversationFactory}
* <tr><td> Decorate available tests to run all available clients. <td> {@link InvitingTestDecorator}
* <tr><td> Attach XML test result logger.
* <tr><td> Terminate the interop testing framework.
@@ -73,7 +73,9 @@
Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
/** Holds the conversation helper for the control conversation. */
- private ConversationHelper conversation;
+ private ConversationFactory conversationFactory;
+
+ /** Holds the connection that the coordinating messages are sent over. */
private Connection connection;
/**
@@ -185,14 +187,15 @@
Destination controlTopic = session.createTopic("iop.control");
Destination responseQueue = session.createQueue("coordinator");
- conversation = new ConversationHelper(connection, controlTopic, responseQueue, LinkedBlockingQueue.class);
+ conversationFactory = new ConversationFactory(connection, responseQueue, LinkedBlockingQueue.class);
+ ConversationFactory.Conversation conversation = conversationFactory.startConversation();
// Broadcast the compulsory invitation to find out what clients are available to test.
Message invite = session.createMessage();
invite.setStringProperty("CONTROL_TYPE", "INVITE");
invite.setJMSReplyTo(responseQueue);
- conversation.send(invite);
+ conversation.send(controlTopic, invite);
// Wait for a short time, to give test clients an opportunity to reply to the invitation.
Collection<Message> enlists = conversation.receiveAll(0, 10000);
@@ -206,7 +209,7 @@
Message terminate = session.createMessage();
terminate.setStringProperty("CONTROL_TYPE", "TERMINATE");
- conversation.send(terminate);
+ conversation.send(controlTopic, terminate);
return result;
}
@@ -283,7 +286,7 @@
}
// Wrap the tests in an inviting test decorator, to perform the invite/test cycle.
- targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversation);
+ targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
return super.doRun(targetTest, wait);
}
Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java?view=diff&rev=534903&r1=534902&r2=534903
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java Thu May 3 08:09:18 2007
@@ -23,6 +23,8 @@
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
+import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -32,14 +34,14 @@
import org.apache.log4j.Logger;
-import org.apache.qpid.util.ConversationHelper;
+import org.apache.qpid.util.ConversationFactory;
import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
/**
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Broadcast test invitations and collect enlists. <td> {@link ConversationHelper}.
+ * <tr><td> Broadcast test invitations and collect enlists. <td> {@link ConversationFactory}.
* <tr><td> Output test failures for clients unwilling to run the test case. <td> {@link Coordinator}
* <tr><td> Execute coordinated test cases. <td> {@link CoordinatingTestCase}
* </table>
@@ -52,7 +54,10 @@
Set<TestClientDetails> allClients;
/** Holds the conversation helper for the control level conversation for coordinating the test through. */
- ConversationHelper conversation;
+ ConversationFactory conversationFactory;
+
+ /** Holds the connection that the control conversation is held over. */
+ Connection connection;
/** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */
WrappedSuiteTestDecorator testSuite;
@@ -61,11 +66,12 @@
* Creates a wrapped suite test decorator from another one.
*
* @param suite The test suite.
- * @param availableClients The list of all clients that responded to the compulsory invite.
+ * @param availableClients The list of all clients that responded to the compulsory invite.
* @param controlConversation The conversation helper for the control level, test coordination conversation.
+ * @param controlConnection The connection that the coordination messages are sent over.
*/
public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> availableClients,
- ConversationHelper controlConversation)
+ ConversationFactory controlConversation, Connection controlConnection)
{
super(suite);
@@ -74,7 +80,8 @@
testSuite = suite;
allClients = availableClients;
- conversation = controlConversation;
+ conversationFactory = controlConversation;
+ connection = controlConnection;
}
/**
@@ -103,11 +110,14 @@
Set<TestClientDetails> enlists = null;
try
{
- Message invite = conversation.getSession().createMessage();
+ Message invite = conversationFactory.getSession().createMessage();
+ Destination controlTopic = conversationFactory.getSession().createTopic("iop.control");
+ ConversationFactory.Conversation conversation = conversationFactory.startConversation();
+
invite.setStringProperty("CONTROL_TYPE", "INVITE");
invite.setStringProperty("TEST_NAME", coordTest.getName());
- conversation.send(invite);
+ conversation.send(controlTopic, invite);
// Wait for a short time, to give test clients an opportunity to reply to the invitation.
Collection<Message> replies = conversation.receiveAll(allClients.size(), 10000);
@@ -142,6 +152,9 @@
// Set the sending and receiving client details on the test case.
coordTest.setSender(enlistedPair.get(0));
coordTest.setReceiver(enlistedPair.get(1));
+
+ // Pass down the connection to hold the coordination conversation over.
+ coordTest.setConversationFactory(conversationFactory);
// Execute the test case.
coordTest.run(testResult);
Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java?view=auto&rev=534903
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java Thu May 3 08:09:18 2007
@@ -0,0 +1,390 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.*;
+
+/**
+ * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
+ * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
+ * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.
+ *
+ * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a
+ * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation
+ * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order
+ * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded
+ * conversation (the conversation methods can be called many times in parallel):
+ *
+ * <p/><pre>
+ * class Initiator
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, null,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * initiateConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
+ * Message greeting = conversation.receive();
+ *
+ * // Exchange goodbyes.
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * Message goodbye = conversation.receive();
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ *
+ * class Responder
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
+ * java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * respondToConversation()
+ * {
+ * try {
+ * // Exchange greetings.
+ * Message greeting = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Hello."));
+ *
+ * // Exchange goodbyes.
+ * Message goodbye = conversation.receive();
+ * conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ * } finally {
+ * conversation.end();
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * <p/>Conversation correlation id's are generated on a per thread basis.
+ *
+ * <p/>The same session is shared amongst all conversations. Calls to send are therefore synchronized because JMS
+ * sessions are not multi-threaded.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><th> Associate messages to an ongoing conversation using correlation ids.
+ * <tr><td> Auto manage sessions for conversations.
+ * <tr><td> Store messages not in a conversation in dead letter box.
+ * </table>
+ */
+public class ConversationFactory
+{
+ /** Holds a map from correlation id's to queues. */
+ private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>();
+
+ /** Holds the connection over which the conversation is conducted. */
+ private Connection connection;
+
+ /** Holds the session over which the conversation is conduxted. */
+ private Session session;
+
+ /** The message consumer for incoming messages. */
+ MessageConsumer consumer;
+
+ /** The message producer for outgoing messages. */
+ MessageProducer producer;
+
+ /** The well-known or temporary destination to receive replies on. */
+ Destination receiveDestination;
+
+ /** Holds the queue implementation class for the reply queue. */
+ Class<? extends BlockingQueue> queueClass;
+
+ /** Used to hold any replies that are received outside of the context of a conversation. */
+ BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>();
+
+ /* Used to hold conversation state on a per thread basis. */
+ /*
+ ThreadLocal<Conversation> threadLocals =
+ new ThreadLocal<Conversation>()
+ {
+ protected Conversation initialValue()
+ {
+ Conversation settings = new Conversation();
+ settings.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return settings;
+ }
+ };
+ */
+
+ /** Generates new coversation id's as needed. */
+ AtomicLong conversationIdGenerator = new AtomicLong();
+
+ /**
+ * Creates a conversation helper on the specified connection with the default sending destination, and listening
+ * to the specified receiving destination.
+ *
+ * @param connection The connection to build the conversation helper on.
+ * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary
+ * queue.
+ * @param queueClass The queue implementation class.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ */
+ public ConversationFactory(Connection connection, Destination receiveDestination,
+ Class<? extends BlockingQueue> queueClass) throws JMSException
+ {
+ this.connection = connection;
+ this.queueClass = queueClass;
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Check if a well-known receive destination has been provided, or use a temporary queue if not.
+ this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue();
+
+ consumer = session.createConsumer(receiveDestination);
+ producer = session.createProducer(null);
+
+ consumer.setMessageListener(new Receiver());
+ }
+
+ /**
+ * Creates a new conversation context.
+ *
+ * @return A new conversation context.
+ */
+ public Conversation startConversation()
+ {
+ Conversation conversation = new Conversation();
+ conversation.conversationId = conversationIdGenerator.getAndIncrement();
+
+ return conversation;
+ }
+
+ /**
+ * Ensures that the reply queue for a conversation exists.
+ *
+ * @param conversationId The conversation correlation id.
+ */
+ private void initQueueForId(long conversationId)
+ {
+ if (!idsToQueues.containsKey(conversationId))
+ {
+ idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass));
+ }
+ }
+
+ /**
+ * Clears the dead letter box, returning all messages that were in it.
+ *
+ * @return All messages in the dead letter box.
+ */
+ public Collection<Message> emptyDeadLetterBox()
+ {
+ Collection<Message> result = new ArrayList<Message>();
+ deadLetterBox.drainTo(result);
+
+ return result;
+ }
+
+ /**
+ * Gets the session over which the conversation is conducted.
+ *
+ * @return The session over which the conversation is conducted.
+ */
+ public Session getSession()
+ {
+ // Conversation settings = threadLocals.get();
+
+ return session;
+ }
+
+ /**
+ * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply
+ * destination automatically updated to the last received reply-to destination.
+ */
+ public class Conversation
+ {
+ /** Holds the correlation id for the context. */
+ long conversationId;
+
+ /**
+ * Holds the send destination for the context. This will automatically be updated to the most recently received
+ * reply-to destination.
+ */
+ Destination sendDestination;
+
+ /**
+ * Sends a message to the default sending location. The correlation id of the message will be assigned by this
+ * method, overriding any previously set value.
+ *
+ * @param sendDestination The destination to send to. This may be null to use the last received reply-to
+ * destination.
+ * @param message The message to send.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no
+ * send destination is specified and there is no most recent reply-to destination available
+ * to use.
+ */
+ public void send(Destination sendDestination, Message message) throws JMSException
+ {
+ // Conversation settings = threadLocals.get();
+ // long conversationId = conversationId;
+ message.setJMSCorrelationID(Long.toString(conversationId));
+ message.setJMSReplyTo(receiveDestination);
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ // Check if an overriding send to destination has been set or use the last reply-to if not.
+ Destination sendTo = null;
+
+ if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else if (sendDestination != null)
+ {
+ sendTo = sendDestination;
+ }
+ else
+ {
+ throw new JMSException("The send destination was specified, and no most recent reply-to available to use.");
+ }
+
+ // Send the message.
+ synchronized (this)
+ {
+ producer.send(sendTo, message);
+ }
+ }
+
+ /**
+ * Gets the next message in an ongoing conversation. This method may block until such a message is received.
+ *
+ * @return The next incoming message in the conversation.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message
+ * did not have its reply-to destination set up.
+ */
+ public Message receive() throws JMSException
+ {
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+
+ // Ensure that the reply queue for this conversation exists.
+ initQueueForId(conversationId);
+
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+ try
+ {
+ Message result = queue.take();
+
+ // Keep the reply-to destination to send replies to.
+ sendDestination = result.getJMSReplyTo();
+
+ return result;
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are
+ * received they will be returned. If a timeout is specified, then all messages up to the limit, received within
+ * that timespan will be returned.
+ *
+ * @param num The number of messages to receive, or all if this is less than 1.
+ * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1.
+ *
+ * @return All messages received within the count limit and the timeout.
+ *
+ * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ */
+ public Collection<Message> receiveAll(int num, long timeout) throws JMSException
+ {
+ Collection<Message> result = new ArrayList<Message>();
+
+ for (int i = 0; i < num; i++)
+ {
+ result.add(receive());
+ }
+
+ return result;
+ }
+
+ /**
+ * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any
+ * incoming messages using them will go to the dead letter box.
+ */
+ public void end()
+ {
+ // Ensure that the thread local for the current thread is cleaned up.
+ // Conversation settings = threadLocals.get();
+ // long conversationId = settings.conversationId;
+ // threadLocals.remove();
+
+ // Ensure that its queue is removed from the queue map.
+ BlockingQueue<Message> queue = idsToQueues.remove(conversationId);
+
+ // Move any outstanding messages on the threads conversation id into the dead letter box.
+ queue.drainTo(deadLetterBox);
+ }
+ }
+
+ /**
+ * Implements the message listener for this conversation handler.
+ */
+ protected class Receiver implements MessageListener
+ {
+ /**
+ * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id
+ * and placed into queues.
+ *
+ * @param message The incoming message.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ Long conversationId = Long.parseLong(message.getJMSCorrelationID());
+
+ // Find the converstaion queue to place the message on. If there is no conversation for the message id,
+ // the the dead letter box queue is used.
+ BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+ queue = (queue == null) ? deadLetterBox : queue;
+
+ queue.put(message);
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}