You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/05/03 17:09:19 UTC

svn commit: r534903 - in /incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid: interop/coordinator/ util/

Author: rgreig
Date: Thu May  3 08:09:18 2007
New Revision: 534903

URL: http://svn.apache.org/viewvc?view=rev&rev=534903
Log:
More interop test stuff.

Added:
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
Removed:
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationHelper.java
Modified:
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
    incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java?view=diff&rev=534903&r1=534902&r2=534903
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java Thu May  3 08:09:18 2007
@@ -23,13 +23,13 @@
 
 import java.util.Collection;
 import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import javax.jms.JMSException;
-import javax.jms.Message;
+import javax.jms.*;
 
 import junit.framework.TestCase;
 
-import org.apache.qpid.util.ConversationHelper;
+import org.apache.qpid.util.ConversationFactory;
 
 /**
  * An CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a
@@ -60,7 +60,8 @@
  * <p><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td> Accept notification of test case participants. <td> {@link InvitingTestDecorator}
- * <tr><td> Coordinate the test sequence amongst participants. <td> {@link ConversationHelper}
+ * <tr><td> Accpet JMS Connection to carry out the coordination over.
+ * <tr><td> Coordinate the test sequence amongst participants. <td> {@link ConversationFactory}
  * <tr><td> Supply test properties
  * </table>
  */
@@ -72,7 +73,8 @@
     /** Holds the contact details for the receving test client. */
     TestClientDetails receiver;
 
-    ConversationHelper conversation;
+    /** Holds the conversation factory over which to coordinate the test. */
+    ConversationFactory conversationFactory;
 
     /**
      * Creates a new coordinating test case with the specified name.
@@ -125,6 +127,16 @@
     }
 
     /**
+     * Accepts the conversation factory over which to hold the test coordinating conversation.
+     *
+     * @param conversationFactory The conversation factory to coordinate the test over.
+     */
+    public void setConversationFactory(ConversationFactory conversationFactory)
+    {
+        this.conversationFactory = conversationFactory;
+    }
+
+    /**
      * Holds a test coordinating conversation with the test clients. This is the basic implementation of the inner
      * loop of Use Case 5. It consists of assigning the test roles, begining the test and gathering the test reports
      * from the participants.
@@ -137,41 +149,48 @@
      */
     protected Message[] sequenceTest(Properties testProperties) throws JMSException
     {
+        Session session = conversationFactory.getSession();
+        Destination senderControlTopic = session.createTopic(sender.privateControlKey);
+        Destination receiverControlTopic = session.createTopic(receiver.privateControlKey);
+
+        ConversationFactory.Conversation senderConversation = conversationFactory.startConversation();
+        ConversationFactory.Conversation receiverConversation = conversationFactory.startConversation();
+
         // Assign the sender role to the sending test client.
-        Message assignSender = conversation.getSession().createMessage();
+        Message assignSender = conversationFactory.getSession().createMessage();
         assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
         assignSender.setStringProperty("ROLE", "SENDER");
 
-        conversation.send(assignSender);
+        senderConversation.send(senderControlTopic, assignSender);
 
         // Assign the receiver role the receiving client.
-        Message assignReceiver = conversation.getSession().createMessage();
+        Message assignReceiver = session.createMessage();
         assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
         assignReceiver.setStringProperty("ROLE", "RECEIVER");
 
-        conversation.send(assignReceiver);
+        receiverConversation.send(receiverControlTopic, assignReceiver);
 
         // Wait for the senders and receivers to confirm their roles.
-        conversation.receive();
-        conversation.receive();
+        senderConversation.receive();
+        receiverConversation.receive();
 
         // Start the test.
-        Message start = conversation.getSession().createMessage();
+        Message start = session.createMessage();
         start.setStringProperty("CONTROL_TYPE", "START");
 
-        conversation.send(start);
+        senderConversation.send(senderControlTopic, start);
 
         // Wait for the test sender to return its report.
-        Message senderReport = conversation.receive();
+        Message senderReport = senderConversation.receive();
 
         // Ask the receiver for its report.
-        Message statusRequest = conversation.getSession().createMessage();
+        Message statusRequest = session.createMessage();
         statusRequest.setStringProperty("CONTROL_TYPE", "STATUS_REQUEST");
 
-        conversation.send(statusRequest);
+        receiverConversation.send(receiverControlTopic, statusRequest);
 
         // Wait for the receiver to send its report.
-        Message receiverReport = conversation.receive();
+        Message receiverReport = receiverConversation.receive();
 
         return new Message[] { senderReport, receiverReport };
     }

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java?view=diff&rev=534903&r1=534902&r2=534903
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/Coordinator.java Thu May  3 08:09:18 2007
@@ -38,7 +38,7 @@
 import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P;
 import org.apache.qpid.util.ClasspathScanner;
 import org.apache.qpid.util.CommandLineParser;
-import org.apache.qpid.util.ConversationHelper;
+import org.apache.qpid.util.ConversationFactory;
 import org.apache.qpid.util.PrettyPrintingUtils;
 
 import uk.co.thebadgerset.junit.extensions.TestRunnerImprovedErrorHandling;
@@ -51,7 +51,7 @@
  *
  * <p><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Find out what test clients are available. <td> {@link ConversationHelper}
+ * <tr><td> Find out what test clients are available. <td> {@link ConversationFactory}
  * <tr><td> Decorate available tests to run all available clients. <td> {@link InvitingTestDecorator}
  * <tr><td> Attach XML test result logger.
  * <tr><td> Terminate the interop testing framework.
@@ -73,7 +73,9 @@
     Set<TestClientDetails> enlistedClients = new HashSet<TestClientDetails>();
 
     /** Holds the conversation helper for the control conversation. */
-    private ConversationHelper conversation;
+    private ConversationFactory conversationFactory;
+
+    /** Holds the connection that the coordinating messages are sent over. */
     private Connection connection;
 
     /**
@@ -185,14 +187,15 @@
         Destination controlTopic = session.createTopic("iop.control");
         Destination responseQueue = session.createQueue("coordinator");
 
-        conversation = new ConversationHelper(connection, controlTopic, responseQueue, LinkedBlockingQueue.class);
+        conversationFactory = new ConversationFactory(connection, responseQueue, LinkedBlockingQueue.class);
+        ConversationFactory.Conversation conversation = conversationFactory.startConversation();
 
         // Broadcast the compulsory invitation to find out what clients are available to test.
         Message invite = session.createMessage();
         invite.setStringProperty("CONTROL_TYPE", "INVITE");
         invite.setJMSReplyTo(responseQueue);
 
-        conversation.send(invite);
+        conversation.send(controlTopic, invite);
 
         // Wait for a short time, to give test clients an opportunity to reply to the invitation.
         Collection<Message> enlists = conversation.receiveAll(0, 10000);
@@ -206,7 +209,7 @@
         Message terminate = session.createMessage();
         terminate.setStringProperty("CONTROL_TYPE", "TERMINATE");
 
-        conversation.send(terminate);
+        conversation.send(controlTopic, terminate);
 
         return result;
     }
@@ -283,7 +286,7 @@
         }
 
         // Wrap the tests in an inviting test decorator, to perform the invite/test cycle.
-        targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversation);
+        targetTest = new InvitingTestDecorator(targetTest, enlistedClients, conversationFactory, connection);
 
         return super.doRun(targetTest, wait);
     }

Modified: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java?view=diff&rev=534903&r1=534902&r2=534903
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java (original)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/InvitingTestDecorator.java Thu May  3 08:09:18 2007
@@ -23,6 +23,8 @@
 import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
@@ -32,14 +34,14 @@
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.util.ConversationHelper;
+import org.apache.qpid.util.ConversationFactory;
 
 import uk.co.thebadgerset.junit.extensions.WrappedSuiteTestDecorator;
 
 /**
  * <p><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Broadcast test invitations and collect enlists. <td> {@link ConversationHelper}.
+ * <tr><td> Broadcast test invitations and collect enlists. <td> {@link ConversationFactory}.
  * <tr><td> Output test failures for clients unwilling to run the test case. <td> {@link Coordinator}
  * <tr><td> Execute coordinated test cases. <td> {@link CoordinatingTestCase}
  * </table>
@@ -52,7 +54,10 @@
     Set<TestClientDetails> allClients;
 
     /** Holds the conversation helper for the control level conversation for coordinating the test through. */
-    ConversationHelper conversation;
+    ConversationFactory conversationFactory;
+
+    /** Holds the connection that the control conversation is held over. */
+    Connection connection;
 
     /** Holds the underlying {@link CoordinatingTestCase}s that this decorator wraps. */
     WrappedSuiteTestDecorator testSuite;
@@ -61,11 +66,12 @@
      * Creates a wrapped suite test decorator from another one.
      *
      * @param suite               The test suite.
-     * @param availableClients          The list of all clients that responded to the compulsory invite.
+     * @param availableClients    The list of all clients that responded to the compulsory invite.
      * @param controlConversation The conversation helper for the control level, test coordination conversation.
+     * @param controlConnection   The connection that the coordination messages are sent over.
      */
     public InvitingTestDecorator(WrappedSuiteTestDecorator suite, Set<TestClientDetails> availableClients,
-        ConversationHelper controlConversation)
+        ConversationFactory controlConversation, Connection controlConnection)
     {
         super(suite);
 
@@ -74,7 +80,8 @@
 
         testSuite = suite;
         allClients = availableClients;
-        conversation = controlConversation;
+        conversationFactory = controlConversation;
+        connection = controlConnection;
     }
 
     /**
@@ -103,11 +110,14 @@
             Set<TestClientDetails> enlists = null;
             try
             {
-                Message invite = conversation.getSession().createMessage();
+                Message invite = conversationFactory.getSession().createMessage();
+                Destination controlTopic = conversationFactory.getSession().createTopic("iop.control");
+                ConversationFactory.Conversation conversation = conversationFactory.startConversation();
+
                 invite.setStringProperty("CONTROL_TYPE", "INVITE");
                 invite.setStringProperty("TEST_NAME", coordTest.getName());
 
-                conversation.send(invite);
+                conversation.send(controlTopic, invite);
 
                 // Wait for a short time, to give test clients an opportunity to reply to the invitation.
                 Collection<Message> replies = conversation.receiveAll(allClients.size(), 10000);
@@ -142,6 +152,9 @@
                 // Set the sending and receiving client details on the test case.
                 coordTest.setSender(enlistedPair.get(0));
                 coordTest.setReceiver(enlistedPair.get(1));
+
+                // Pass down the connection to hold the coordination conversation over.
+                coordTest.setConversationFactory(conversationFactory);
 
                 // Execute the test case.
                 coordTest.run(testResult);

Added: incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java?view=auto&rev=534903
==============================================================================
--- incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java (added)
+++ incubator/qpid/branches/M2/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java Thu May  3 08:09:18 2007
@@ -0,0 +1,390 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.*;
+
+/**
+ * A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
+ * over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
+ * on the queue implementation). Clients of this de-multiplexer can wait on messages, defined by message correlation ids.
+ *
+ * <p/>One use of this is as a conversation synchronizer where multiple threads are carrying out conversations over a
+ * multiplexed messaging route. This can be usefull, as JMS sessions are not multi-threaded. Setting up the conversation
+ * with synchronous queues will allow these threads to be written in a synchronous style, but with their execution order
+ * governed by the asynchronous message flow. For example, something like the following code could run a multi-threaded
+ * conversation (the conversation methods can be called many times in parallel):
+ *
+ * <p/><pre>
+ * class Initiator
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, null,
+ *                                                          java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * initiateConversation()
+ * {
+ *  try {
+ *   // Exchange greetings.
+ *   conversation.send(sendDestination, conversation.getSession().createTextMessage("Hello."));
+ *   Message greeting = conversation.receive();
+ *
+ *   // Exchange goodbyes.
+ *   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ *   Message goodbye = conversation.receive();
+ *  } finally {
+ *   conversation.end();
+ *  }
+ * }
+ * }
+ *
+ * class Responder
+ * {
+ * ConversationHelper conversation = new ConversationHelper(connection, receiveDestination,
+ *                                                          java.util.concurrent.LinkedBlockingQueue.class);
+ *
+ * respondToConversation()
+ * {
+ *   try {
+ *   // Exchange greetings.
+ *   Message greeting = conversation.receive();
+ *   conversation.send(conversation.getSession().createTextMessage("Hello."));
+ *
+ *   // Exchange goodbyes.
+ *   Message goodbye = conversation.receive();
+ *   conversation.send(conversation.getSession().createTextMessage("Goodbye."));
+ *  } finally {
+ *   conversation.end();
+ *  }
+ * }
+ * }
+ * </pre>
+ *
+ * <p/>Conversation correlation id's are generated on a per thread basis.
+ *
+ * <p/>The same session is shared amongst all conversations. Calls to send are therefore synchronized because JMS
+ * sessions are not multi-threaded.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><th> Associate messages to an ongoing conversation using correlation ids.
+ * <tr><td> Auto manage sessions for conversations.
+ * <tr><td> Store messages not in a conversation in dead letter box.
+ * </table>
+ */
+public class ConversationFactory
+{
+    /** Holds a map from correlation id's to queues. */
+    private Map<Long, BlockingQueue<Message>> idsToQueues = new HashMap<Long, BlockingQueue<Message>>();
+
+    /** Holds the connection over which the conversation is conducted. */
+    private Connection connection;
+
+    /** Holds the session over which the conversation is conduxted. */
+    private Session session;
+
+    /** The message consumer for incoming messages. */
+    MessageConsumer consumer;
+
+    /** The message producer for outgoing messages. */
+    MessageProducer producer;
+
+    /** The well-known or temporary destination to receive replies on. */
+    Destination receiveDestination;
+
+    /** Holds the queue implementation class for the reply queue. */
+    Class<? extends BlockingQueue> queueClass;
+
+    /** Used to hold any replies that are received outside of the context of a conversation. */
+    BlockingQueue<Message> deadLetterBox = new LinkedBlockingQueue<Message>();
+
+    /* Used to hold conversation state on a per thread basis. */
+    /*
+    ThreadLocal<Conversation> threadLocals =
+        new ThreadLocal<Conversation>()
+        {
+            protected Conversation initialValue()
+            {
+                Conversation settings = new Conversation();
+                settings.conversationId = conversationIdGenerator.getAndIncrement();
+
+                return settings;
+            }
+        };
+     */
+
+    /** Generates new coversation id's as needed. */
+    AtomicLong conversationIdGenerator = new AtomicLong();
+
+    /**
+     * Creates a conversation helper on the specified connection with the default sending destination, and listening
+     * to the specified receiving destination.
+     *
+     * @param connection         The connection to build the conversation helper on.
+     * @param receiveDestination The destination to listen to for incoming messages. This may be null to use a temporary
+     *                           queue.
+     * @param queueClass         The queue implementation class.
+     *
+     * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+     */
+    public ConversationFactory(Connection connection, Destination receiveDestination,
+        Class<? extends BlockingQueue> queueClass) throws JMSException
+    {
+        this.connection = connection;
+        this.queueClass = queueClass;
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Check if a well-known receive destination has been provided, or use a temporary queue if not.
+        this.receiveDestination = (receiveDestination != null) ? receiveDestination : session.createTemporaryQueue();
+
+        consumer = session.createConsumer(receiveDestination);
+        producer = session.createProducer(null);
+
+        consumer.setMessageListener(new Receiver());
+    }
+
+    /**
+     * Creates a new conversation context.
+     *
+     * @return A new conversation context.
+     */
+    public Conversation startConversation()
+    {
+        Conversation conversation = new Conversation();
+        conversation.conversationId = conversationIdGenerator.getAndIncrement();
+
+        return conversation;
+    }
+
+    /**
+     * Ensures that the reply queue for a conversation exists.
+     *
+     * @param conversationId The conversation correlation id.
+     */
+    private void initQueueForId(long conversationId)
+    {
+        if (!idsToQueues.containsKey(conversationId))
+        {
+            idsToQueues.put(conversationId, ReflectionUtils.<BlockingQueue>newInstance(queueClass));
+        }
+    }
+
+    /**
+     * Clears the dead letter box, returning all messages that were in it.
+     *
+     * @return All messages in the dead letter box.
+     */
+    public Collection<Message> emptyDeadLetterBox()
+    {
+        Collection<Message> result = new ArrayList<Message>();
+        deadLetterBox.drainTo(result);
+
+        return result;
+    }
+
+    /**
+     * Gets the session over which the conversation is conducted.
+     *
+     * @return The session over which the conversation is conducted.
+     */
+    public Session getSession()
+    {
+        // Conversation settings = threadLocals.get();
+
+        return session;
+    }
+
+    /**
+     * Used to hold a conversation context. This consists of a correlating id for the conversation, and a reply
+     * destination automatically updated to the last received reply-to destination.
+     */
+    public class Conversation
+    {
+        /** Holds the correlation id for the context. */
+        long conversationId;
+
+        /**
+         * Holds the send destination for the context. This will automatically be updated to the most recently received
+         * reply-to destination.
+         */
+        Destination sendDestination;
+
+        /**
+         * Sends a message to the default sending location. The correlation id of the message will be assigned by this
+         * method, overriding any previously set value.
+         *
+         * @param sendDestination The destination to send to. This may be null to use the last received reply-to
+         *                        destination.
+         * @param message         The message to send.
+         *
+         * @throws JMSException All undelying JMSExceptions are allowed to fall through. This will also be thrown if no
+         *                      send destination is specified and there is no most recent reply-to destination available
+         *                      to use.
+         */
+        public void send(Destination sendDestination, Message message) throws JMSException
+        {
+            // Conversation settings = threadLocals.get();
+            // long conversationId = conversationId;
+            message.setJMSCorrelationID(Long.toString(conversationId));
+            message.setJMSReplyTo(receiveDestination);
+
+            // Ensure that the reply queue for this conversation exists.
+            initQueueForId(conversationId);
+
+            // Check if an overriding send to destination has been set or use the last reply-to if not.
+            Destination sendTo = null;
+
+            if (sendDestination != null)
+            {
+                sendTo = sendDestination;
+            }
+            else if (sendDestination != null)
+            {
+                sendTo = sendDestination;
+            }
+            else
+            {
+                throw new JMSException("The send destination was specified, and no most recent reply-to available to use.");
+            }
+
+            // Send the message.
+            synchronized (this)
+            {
+                producer.send(sendTo, message);
+            }
+        }
+
+        /**
+         * Gets the next message in an ongoing conversation. This method may block until such a message is received.
+         *
+         * @return The next incoming message in the conversation.
+         *
+         * @throws JMSException All undelying JMSExceptions are allowed to fall through. Thrown if the received message
+         *                      did not have its reply-to destination set up.
+         */
+        public Message receive() throws JMSException
+        {
+            // Conversation settings = threadLocals.get();
+            // long conversationId = settings.conversationId;
+
+            // Ensure that the reply queue for this conversation exists.
+            initQueueForId(conversationId);
+
+            BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+
+            try
+            {
+                Message result = queue.take();
+
+                // Keep the reply-to destination to send replies to.
+                sendDestination = result.getJMSReplyTo();
+
+                return result;
+            }
+            catch (InterruptedException e)
+            {
+                return null;
+            }
+        }
+
+        /**
+         * Gets many messages in an ongoing conversation. If a limit is specified, then once that many messages are
+         * received they will be returned. If a timeout is specified, then all messages up to the limit, received within
+         * that timespan will be returned.
+         *
+         * @param num     The number of messages to receive, or all if this is less than 1.
+         * @param timeout The timeout in milliseconds to receive the messages in, or forever if this is less than 1.
+         *
+         * @return All messages received within the count limit and the timeout.
+         *
+         * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+         */
+        public Collection<Message> receiveAll(int num, long timeout) throws JMSException
+        {
+            Collection<Message> result = new ArrayList<Message>();
+
+            for (int i = 0; i < num; i++)
+            {
+                result.add(receive());
+            }
+
+            return result;
+        }
+
+        /**
+         * Completes the conversation. Any correlation id's pertaining to the conversation are no longer valid, and any
+         * incoming messages using them will go to the dead letter box.
+         */
+        public void end()
+        {
+            // Ensure that the thread local for the current thread is cleaned up.
+            // Conversation settings = threadLocals.get();
+            // long conversationId = settings.conversationId;
+            // threadLocals.remove();
+
+            // Ensure that its queue is removed from the queue map.
+            BlockingQueue<Message> queue = idsToQueues.remove(conversationId);
+
+            // Move any outstanding messages on the threads conversation id into the dead letter box.
+            queue.drainTo(deadLetterBox);
+        }
+    }
+
+    /**
+     * Implements the message listener for this conversation handler.
+     */
+    protected class Receiver implements MessageListener
+    {
+        /**
+         * Handles all incoming messages in the ongoing conversations. These messages are split up by correaltion id
+         * and placed into queues.
+         *
+         * @param message The incoming message.
+         */
+        public void onMessage(Message message)
+        {
+            try
+            {
+                Long conversationId = Long.parseLong(message.getJMSCorrelationID());
+
+                // Find the converstaion queue to place the message on. If there is no conversation for the message id,
+                // the the dead letter box queue is used.
+                BlockingQueue<Message> queue = idsToQueues.get(conversationId);
+                queue = (queue == null) ? deadLetterBox : queue;
+
+                queue.put(message);
+            }
+            catch (JMSException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}