You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/06/09 08:52:28 UTC

[1/2] camel git commit: CAMEL-8538 Add inOut support to the camel-rabitmq component

Repository: camel
Updated Branches:
  refs/heads/master 43b79533d -> 8fe4288f2


http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
new file mode 100644
index 0000000..6cae778
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.Queue.DeclareOk;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Envelope;
+
+
+/**
+ * A {@link ReplyManager} when using temporary queues.
+ *
+ * @version 
+ */
+public class TemporaryQueueReplyManager extends ReplyManagerSupport {
+    
+    private RabbitConsumer consumer;
+
+    public TemporaryQueueReplyManager(CamelContext camelContext) {
+        super(camelContext);
+    }
+    
+    protected ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+                                              String originalCorrelationId, String correlationId, long requestTimeout) {
+        return new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, correlationId, requestTimeout);
+    }
+
+    public void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout) {
+        log.trace("Updated provisional correlationId [{}] to expected correlationId [{}]", correlationId, newCorrelationId);
+
+        ReplyHandler handler = correlation.remove(correlationId);
+        if (handler != null) {
+            correlation.put(newCorrelationId, handler, requestTimeout);
+        }
+    }
+
+    @Override
+    protected void handleReplyMessage(String correlationID, AMQP.BasicProperties properties, byte[] message) {
+        ReplyHandler handler = correlation.get(correlationID);
+        if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
+            handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
+        }
+        if (handler != null) {
+            correlation.remove(correlationID);
+            handler.onReply(correlationID, properties, message);
+        } else {
+            // we could not correlate the received reply message to a matching request and therefore
+            // we cannot continue routing the unknown message
+            // log a warn and then ignore the message
+            log.warn("Reply received for unknown correlationID [{}]. The message will be ignored: {}", correlationID, message);
+        }
+    }
+
+    @Override
+    protected Connection createListenerContainer() throws Exception {
+
+        log.debug("Creating connection");
+        Connection conn = endpoint.connect(executorService);
+
+        log.debug("Creating channel");
+        Channel channel = conn.createChannel();
+        // setup the basicQos
+        if (endpoint.isPrefetchEnabled()) {
+            channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
+                    endpoint.isPrefetchGlobal());
+        }
+
+        //Let the server pick a random name for us
+        DeclareOk result = channel.queueDeclare();
+        log.debug("Temporary queue name {}", result.getQueue());
+        setReplyTo(result.getQueue());
+
+        //TODO check for the RabbitMQConstants.EXCHANGE_NAME header 
+        channel.queueBind(getReplyTo(), endpoint.getExchangeName(), getReplyTo());
+
+        consumer = new RabbitConsumer(this, channel);
+        consumer.start();
+
+    	return conn;
+    }
+    
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        consumer.stop();
+    }
+
+	//TODO combine with class in RabbitMQConsumer
+    class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer {
+
+        private final TemporaryQueueReplyManager consumer;
+        private final Channel channel;
+        private String tag;
+
+        /**
+         * Constructs a new instance and records its association to the
+         * passed-in channel.
+         *
+         * @param channel the channel to which this consumer is attached
+         */
+        public RabbitConsumer(TemporaryQueueReplyManager consumer, Channel channel) {
+            super(channel);
+            this.consumer = consumer;
+            this.channel = channel;
+        }
+
+        @Override
+        public void handleDelivery(String consumerTag, Envelope envelope,
+                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
+
+        	consumer.onMessage(properties, body);
+        }
+        
+        /**
+         * Bind consumer to channel
+         */
+        private void start() throws IOException {
+            tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);
+        }
+
+        /**
+         * Unbind consumer from channel
+         */
+        private void stop() throws IOException {
+            if (channel.isOpen()) {
+                if (tag != null) {
+                channel.basicCancel(tag);
+            }
+            channel.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java
new file mode 100644
index 0000000..3521bec
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import java.util.UUID;
+
+import com.rabbitmq.client.Connection;
+
+
+
+/**
+ * Callback to be used when using the option <tt>useMessageIDAsCorrelationID</tt>.
+ * <p/>
+ * This callback will keep the correlation registration in {@link ReplyManager} up-to-date with
+ * the <tt>JMSMessageID</tt> which was assigned and used when the message was sent.
+ *
+ * @version 
+ */
+public class UseMessageIdAsCorrelationIdMessageSentCallback implements MessageSentCallback {
+
+    private ReplyManager replyManager;
+    private String correlationId;
+    private long requestTimeout;
+
+    public UseMessageIdAsCorrelationIdMessageSentCallback(ReplyManager replyManager, String correlationId, long requestTimeout) {
+        this.replyManager = replyManager;
+        this.correlationId = correlationId;
+        this.requestTimeout = requestTimeout;
+    }
+
+    public void sent(Connection session, byte[] message, String destination) {
+        String newCorrelationID = UUID.randomUUID().toString();
+        if (newCorrelationID != null) {
+            replyManager.updateCorrelationId(correlationId, newCorrelationID, requestTimeout);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 52e76c8..19c580f 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.sql.Timestamp;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
@@ -25,17 +26,18 @@ import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Address;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.impl.LongStringHelper;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Address;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.impl.LongStringHelper;
+
 public class RabbitMQEndpointTest extends CamelTestSupport {
 
     private Envelope envelope = Mockito.mock(Envelope.class);
@@ -107,6 +109,10 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
         customHeaders.put("dateHeader", new Date(0));
         customHeaders.put("byteArrayHeader", "foo".getBytes());
         customHeaders.put("longStringHeader", LongStringHelper.asLongString("Some really long string"));
+        customHeaders.put("timestampHeader", new Timestamp(4200));
+        customHeaders.put("byteHeader", new Byte((byte) 0));
+        customHeaders.put("floatHeader", new Float(42.4242));
+        customHeaders.put("longHeader", new Long(420000000000000000L));
         Mockito.when(properties.getHeaders()).thenReturn(customHeaders);
 
         byte[] body = new byte[20];
@@ -122,6 +128,10 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
         assertEquals(new Date(0), exchange.getIn().getHeader("dateHeader"));
         assertArrayEquals("foo".getBytes(), (byte[]) exchange.getIn().getHeader("byteArrayHeader"));
         assertEquals("Some really long string", exchange.getIn().getHeader("longStringHeader"));
+        assertEquals(new Timestamp(4200), exchange.getIn().getHeader("timestampHeader"));
+        assertEquals(new Byte((byte) 0), exchange.getIn().getHeader("byteHeader"));
+        assertEquals(new Float(42.4242), exchange.getIn().getHeader("floatHeader"));
+        assertEquals(new Long(420000000000000000L), exchange.getIn().getHeader("longHeader"));
         assertEquals(body, exchange.getIn().getBody());
     }
 
@@ -219,4 +229,22 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
         assertEquals(654, connectionFactory.getNetworkRecoveryInterval());
         assertFalse(connectionFactory.isTopologyRecoveryEnabled());
     }
+
+    @Test
+    public void createEndpointWithTransferExceptionEnabled() throws Exception {
+        RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?transferException=true", RabbitMQEndpoint.class);
+        assertEquals(true, endpoint.isTransferException());
+    }
+
+    @Test
+    public void createEndpointWithReplyTimeout() throws Exception {
+        RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?requestTimeout=2000", RabbitMQEndpoint.class);
+        assertEquals(2000, endpoint.getRequestTimeout());
+    }
+
+    @Test
+    public void createEndpointWithRequestTimeoutCheckerInterval() throws Exception {
+        RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?requestTimeoutCheckerInterval=1000", RabbitMQEndpoint.class);
+        assertEquals(1000, endpoint.getRequestTimeoutCheckerInterval());
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
new file mode 100644
index 0000000..5c1223e
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.rabbitmq.testbeans.TestSerializableObject;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class RabbitMQInOutIntTest extends CamelTestSupport {
+
+    private static final String EXCHANGE = "ex5";
+    public static final String ROUTING_KEY = "rk5";
+    public static final long TIMEOUT_MS = 2000;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate template;
+
+    @Produce(uri = "direct:rabbitMQ")
+    protected ProducerTemplate directProducer;
+
+    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?exchangeType=direct&username=cameltest&password=cameltest" + "&autoAck=true&queue=q4&routingKey=" + ROUTING_KEY
+                    + "&transferException=true&requestTimeout=" + TIMEOUT_MS)
+    private Endpoint rabbitMQEndpoint;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+
+                from("direct:rabbitMQ").id("producingRoute").setHeader("routeHeader", simple("routeHeader")).inOut(rabbitMQEndpoint);
+
+                from(rabbitMQEndpoint).id("consumingRoute").log("Receiving message").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        if (exchange.getIn().getBody(TestSerializableObject.class) != null) {
+                            TestSerializableObject foo = exchange.getIn().getBody(TestSerializableObject.class);
+                            foo.setDescription("foobar");
+                        }
+
+                        else if (exchange.getIn().getBody(String.class) != null) {
+                            if (exchange.getIn().getBody(String.class).contains("header")) {
+                                assertEquals(exchange.getIn().getHeader("String"), "String");
+                                assertEquals(exchange.getIn().getHeader("routeHeader"), "routeHeader");
+                            }
+
+                            if (exchange.getIn().getBody(String.class).contains("Exception")) {
+                                throw new IllegalArgumentException("Boom");
+                            }
+
+                            if (exchange.getIn().getBody(String.class).contains("TimeOut")) {
+                                Thread.sleep(TIMEOUT_MS * 2);
+                            }
+
+                            exchange.getIn().setBody(exchange.getIn().getBody(String.class) + " response");
+                        }
+
+                    }
+                });
+            }
+        };
+    }
+
+    @Test
+    public void inOutRaceConditionTest1() throws InterruptedException, IOException {
+        String reply = template.requestBodyAndHeader("direct:rabbitMQ", "test1", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class);
+        assertEquals("test1 response", reply);
+    }
+
+    @Test
+    public void inOutRaceConditionTest2() throws InterruptedException, IOException {
+        String reply = template.requestBodyAndHeader("direct:rabbitMQ", "test2", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class);
+        assertEquals("test2 response", reply);
+    }
+
+    @Test
+    public void headerTest() throws InterruptedException, IOException {
+        Map<String, Object> headers = new HashMap<>();
+
+        TestSerializableObject testObject = new TestSerializableObject();
+        testObject.setName("header");
+
+        headers.put("String", "String");
+        headers.put("Boolean", new Boolean(false));
+
+        // This will blow up the connection if not removed before sending the message
+        headers.put("TestObject1", testObject);
+        // This will blow up the connection if not removed before sending the message
+        headers.put("class", testObject.getClass());
+        // This will mess up de-serialization if not removed before sending the message
+        headers.put("CamelSerialize", true);
+
+        // populate a map and an arrayList
+        Map<Object, Object> tmpMap = new HashMap<>();
+        List<String> tmpList = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            String name = "header" + i;
+            tmpList.add(name);
+            tmpMap.put(name, name);
+        }
+        // This will blow up the connection if not removed before sending the message
+        headers.put("arrayList", tmpList);
+        // This will blow up the connection if not removed before sending the message
+        headers.put("map", tmpMap);
+
+        String reply = template.requestBodyAndHeaders("direct:rabbitMQ", "header", headers, String.class);
+        assertEquals("header response", reply);
+    }
+
+    @Test
+    public void serializeTest() throws InterruptedException, IOException {
+        TestSerializableObject foo = new TestSerializableObject();
+        foo.setName("foobar");
+
+        TestSerializableObject reply = template.requestBodyAndHeader("direct:rabbitMQ", foo, RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, TestSerializableObject.class);
+        assertEquals("foobar", reply.getName());
+        assertEquals("foobar", reply.getDescription());
+    }
+
+    @Test
+    public void testSerializableObject() throws IOException {
+        TestSerializableObject foo = new TestSerializableObject();
+        foo.setName("foobar");
+
+        byte[] body = null;
+        try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) {
+            o.writeObject(foo);
+            body = b.toByteArray();
+        }
+
+        TestSerializableObject newFoo = null;
+        try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b)) {
+            newFoo = (TestSerializableObject) o.readObject();
+        } catch (IOException | ClassNotFoundException e) {
+        }
+        assertEquals(foo.getName(), newFoo.getName());
+    }
+
+    @Test
+    public void inOutExceptionTest() {
+        try {
+            template.requestBodyAndHeader("direct:rabbitMQ", "Exception", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class);
+            fail("This should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
+        } catch (Exception e) {
+            fail("This should have caught CamelExecutionException");
+        }
+    }
+
+    @Test
+    public void inOutTimeOutTest() {
+        try {
+            template.requestBodyAndHeader("direct:rabbitMQ", "TimeOut", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class);
+            fail("This should have thrown a timeOut exception");
+        } catch (CamelExecutionException e) {
+            // expected
+        } catch (Exception e) {
+            fail("This should have caught CamelExecutionException");
+        }
+    }
+
+    @Test
+    public void inOutNullTest() {
+        template.requestBodyAndHeader("direct:rabbitMQ", null, RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, Object.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
index 7b2df60..cefece5 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
@@ -51,6 +51,7 @@ public class RabbitMQProducerTest {
         Mockito.when(exchange.getIn()).thenReturn(message);
         Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
         Mockito.when(conn.createChannel()).thenReturn(null);
+        Mockito.when(endpoint.getMessageConverter()).thenReturn(new RabbitMQMessageConverter());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
new file mode 100644
index 0000000..1fdffd0
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
@@ -0,0 +1,26 @@
+package org.apache.camel.component.rabbitmq.testbeans;
+
+import java.io.Serializable;
+
+public class TestSerializableObject implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String description;
+    private String name;
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+}
\ No newline at end of file


[2/2] camel git commit: CAMEL-8538 Add inOut support to the camel-rabitmq component

Posted by da...@apache.org.
CAMEL-8538 Add inOut support to the camel-rabitmq component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8fe4288f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8fe4288f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8fe4288f

Branch: refs/heads/master
Commit: 8fe4288f2a19d1b8894f65b04a7ac871983b8938
Parents: 43b7953
Author: Brad Reitmeyer <br...@cisco.com>
Authored: Tue Apr 28 13:57:39 2015 -0500
Committer: Brad Reitmeyer <gi...@bradreitmeyer.com>
Committed: Mon Jun 8 16:24:39 2015 -0500

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQComponent.java   |   4 +-
 .../component/rabbitmq/RabbitMQConstants.java   |   1 +
 .../component/rabbitmq/RabbitMQConsumer.java    |  71 +----
 .../component/rabbitmq/RabbitMQEndpoint.java    | 228 +++++++++++++-
 .../rabbitmq/RabbitMQMessageConverter.java      | 219 ++++++++++++++
 .../component/rabbitmq/RabbitMQProducer.java    | 296 ++++++++++---------
 .../camel/component/rabbitmq/ReplyToType.java   |  26 ++
 .../rabbitmq/reply/CorrelationListener.java     |  44 +++
 .../rabbitmq/reply/CorrelationTimeoutMap.java   | 120 ++++++++
 .../rabbitmq/reply/MessageSentCallback.java     |  38 +++
 .../rabbitmq/reply/QueueReplyHandler.java       |  34 +++
 .../component/rabbitmq/reply/ReplyHandler.java  |  43 +++
 .../component/rabbitmq/reply/ReplyHolder.java   | 123 ++++++++
 .../component/rabbitmq/reply/ReplyManager.java  |  76 +++++
 .../rabbitmq/reply/ReplyManagerSupport.java     | 238 +++++++++++++++
 .../reply/TemporaryQueueReplyHandler.java       |  70 +++++
 .../reply/TemporaryQueueReplyManager.java       | 156 ++++++++++
 ...ageIdAsCorrelationIdMessageSentCallback.java |  51 ++++
 .../rabbitmq/RabbitMQEndpointTest.java          |  38 ++-
 .../rabbitmq/RabbitMQInOutIntTest.java          | 200 +++++++++++++
 .../rabbitmq/RabbitMQProducerTest.java          |   1 +
 .../testbeans/TestSerializableObject.java       |  26 ++
 22 files changed, 1898 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 54dd2bf..c125421 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -18,14 +18,16 @@ package org.apache.camel.component.rabbitmq;
 
 import java.net.URI;
 import java.util.Map;
+
 import javax.net.ssl.TrustManager;
 
-import com.rabbitmq.client.ConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.UriEndpointComponent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.rabbitmq.client.ConnectionFactory;
+
 public class RabbitMQComponent extends UriEndpointComponent {
 
     private static final Logger LOG = LoggerFactory.getLogger(RabbitMQComponent.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
index f2e5568..cf4ab45 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
@@ -28,6 +28,7 @@ public final class RabbitMQConstants {
     public static final String DELIVERY_MODE = "rabbitmq.DELIVERY_MODE";
     public static final String USERID = "rabbitmq.USERID";
     public static final String CLUSTERID = "rabbitmq.CLUSTERID";
+    public static final String REQUEST_TIMEOUT = "rabbitmq.REQUEST_TIMEOUT";
     public static final String REPLY_TO = "rabbitmq.REPLY_TO";
     public static final String CONTENT_ENCODING = "rabbitmq.CONTENT_ENCODING";
     public static final String TYPE = "rabbitmq.TYPE";

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index a4a6362..bf142ce 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -27,15 +27,17 @@ import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Envelope;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 
+
 public class RabbitMQConsumer extends DefaultConsumer {
-    ExecutorService executor;
-    Connection conn;
+    private ExecutorService executor;
+    private Connection conn;
     private int closeTimeout = 30 * 1000;
     private final RabbitMQEndpoint endpoint;
 
@@ -55,7 +57,6 @@ public class RabbitMQConsumer extends DefaultConsumer {
     }
 
     @Override
-
     public RabbitMQEndpoint getEndpoint() {
         return (RabbitMQEndpoint) super.getEndpoint();
     }
@@ -79,7 +80,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
         // setup the basicQos
         if (endpoint.isPrefetchEnabled()) {
             channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
-                    endpoint.isPrefetchGlobal());
+                            endpoint.isPrefetchGlobal());
         }
         return channel;
     }
@@ -182,10 +183,11 @@ public class RabbitMQConsumer extends DefaultConsumer {
                                    AMQP.BasicProperties properties, byte[] body) throws IOException {
 
             Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
-            mergeAmqpProperties(exchange, properties);
+            endpoint.getMessageConverter().mergeAmqpProperties(exchange, properties);
 
             boolean sendReply = properties.getReplyTo() != null;
             if (sendReply && !exchange.getPattern().isOutCapable()) {
+                log.debug("In an inOut capable route");
                 exchange.setPattern(ExchangePattern.InOut);
             }
 
@@ -208,17 +210,20 @@ public class RabbitMQConsumer extends DefaultConsumer {
             if (!exchange.isFailed()) {
                 // processing success
                 if (sendReply && exchange.getPattern().isOutCapable()) {
-                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
-                            .headers(msg.getHeaders())
-                            .correlationId(properties.getCorrelationId())
-                            .build();
-                    channel.basicPublish("", properties.getReplyTo(), replyProps, msg.getBody(byte[].class));
+                    endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo());
                 }
                 if (!consumer.endpoint.isAutoAck()) {
                     log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
                     channel.basicAck(deliveryTag, false);
                 }
-            } else {
+            }
+            else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) {
+                // the inOut exchange failed so put the exception in the body and send back
+                msg.setBody(exchange.getException());
+                exchange.setOut(msg);
+                endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo());
+            }
+            else {
                 boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
                 // processing failed, then reject and handle the exception
                 if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) {
@@ -236,49 +241,6 @@ public class RabbitMQConsumer extends DefaultConsumer {
         }
 
         /**
-         * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
-         */
-        private void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) {
-
-            if (properties.getType() != null) {
-                exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType());
-            }
-            if (properties.getAppId() != null) {
-                exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId());
-            }
-            if (properties.getClusterId() != null) {
-                exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId());
-            }
-            if (properties.getContentEncoding() != null) {
-                exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding());
-            }
-            if (properties.getContentType() != null) {
-                exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType());
-            }
-            if (properties.getCorrelationId() != null) {
-                exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId());
-            }
-            if (properties.getExpiration() != null) {
-                exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration());
-            }
-            if (properties.getMessageId() != null) {
-                exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId());
-            }
-            if (properties.getPriority() != null) {
-                exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority());
-            }
-            if (properties.getReplyTo() != null) {
-                exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo());
-            }
-            if (properties.getTimestamp() != null) {
-                exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp());
-            }
-            if (properties.getUserId() != null) {
-                exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId());
-            }
-        }
-
-        /**
          * Bind consumer to channel
          */
         public void start() throws IOException {
@@ -333,5 +295,4 @@ public class RabbitMQConsumer extends DefaultConsumer {
             return null;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 7638682..6cd0bca 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -16,7 +16,13 @@
  */
 package org.apache.camel.component.rabbitmq;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.net.URISyntaxException;
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
@@ -25,6 +31,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+
 import javax.net.ssl.TrustManager;
 
 import com.rabbitmq.client.AMQP;
@@ -34,11 +41,14 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.LongString;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.TypeConversionException;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.DefaultMessage;
@@ -46,9 +56,12 @@ import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @UriEndpoint(scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName", consumerClass = RabbitMQConsumer.class, label = "messaging")
 public class RabbitMQEndpoint extends DefaultEndpoint {
+    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQEndpoint.class);
 
     @UriPath @Metadata(required = "true")
     private String hostname;
@@ -135,6 +148,24 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     @UriParam
     private ArgsConfigurer exchangeArgsConfigurer;
 
+    @UriParam
+    private long requestTimeout = 20000;
+    @UriParam
+    private long requestTimeoutCheckerInterval = 1000;
+    @UriParam
+    private boolean transferException = false;
+    // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
+    private boolean useMessageIDAsCorrelationID = true;
+    // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
+    private String replyToType = ReplyToType.Temporary.name();
+    // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
+    private String replyTo = null;
+
+    private RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter();
+
+    // header to indicate that the message body needs to be de-serialized
+    private static final String SERIALIZE_HEADER = "CamelSerialize";
+
     public RabbitMQEndpoint() {
     }
 
@@ -150,12 +181,34 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
         Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern());
 
-        Message message = new DefaultMessage();
-        exchange.setIn(message);
+        setRabbitExchange(exchange, envelope, properties, body);
+        return exchange;
+    }
 
-        message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey());
-        message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange());
-        message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag());
+    /**
+     * Gets the message converter to convert between rabbit and camel
+     * @return
+     */
+    protected RabbitMQMessageConverter getMessageConverter() {
+        return messageConverter;
+    }
+
+    public void setRabbitExchange(Exchange camelExchange, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
+        Message message;
+        if (camelExchange.getIn() != null) {
+            // Use the existing message so we keep the headers
+            message = camelExchange.getIn();
+        }
+        else {
+            message = new DefaultMessage();
+            camelExchange.setIn(message);
+        }
+
+        if (envelope != null) {
+            message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey());
+            message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange());
+            message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag());
+        }
 
         Map<String, Object> headers = properties.getHeaders();
         if (headers != null) {
@@ -169,9 +222,109 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
             }
         }
 
-        message.setBody(body);
+        if (hasSerializeHeader(properties)) {
+            Object messageBody = null;
+            try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b)) {
+                messageBody = o.readObject();
+            } catch (IOException | ClassNotFoundException e) {
+                LOG.warn("Could not deserialize the object");
+            }
+            if (messageBody instanceof Throwable) {
+                LOG.debug("Reply was an Exception. Setting the Exception on the Exchange");
+                camelExchange.setException((Throwable) messageBody);
+            } else {
+                message.setBody(messageBody);
+            }
+        } else {
+            // Set the body as a byte[] and let the type converter deal with it
+            message.setBody(body);
+        }
 
-        return exchange;
+    }
+
+    private boolean hasSerializeHeader(AMQP.BasicProperties properties) {
+        if (properties == null || properties.getHeaders() == null) {
+            return false;
+        }
+        if (properties.getHeaders().containsKey(SERIALIZE_HEADER) && properties.getHeaders().get(SERIALIZE_HEADER).equals(true)) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Sends the body that is on the exchange
+     * @param camelExchange
+     * @param channel
+     * @param properties
+     * @throws IOException
+     */
+    public void publishExchangeToChannel(Exchange camelExchange, Channel channel, String routingKey) throws IOException {
+        Message msg;
+        if (camelExchange.hasOut()) {
+            msg = camelExchange.getOut();
+        } else {
+            msg = camelExchange.getIn();
+        }
+
+        // Remove the SERIALIZE_HEADER in case it was previously set
+        if (msg.getHeaders() != null && msg.getHeaders().containsKey(SERIALIZE_HEADER)) {
+            LOG.debug("Removing the {} header", SERIALIZE_HEADER);
+            msg.getHeaders().remove(SERIALIZE_HEADER);
+        }
+
+        AMQP.BasicProperties properties;
+        byte[] body;
+        try {
+            // To maintain backwards compatibility try the TypeConverter (The DefaultTypeConverter seems to only work on Strings)
+            body = camelExchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, camelExchange, msg.getBody());
+
+            properties = getMessageConverter().buildProperties(camelExchange).build();
+        } catch (NoTypeConversionAvailableException | TypeConversionException e) {
+            if (msg.getBody() instanceof Serializable) {
+                // Add the header so the reply processor knows to de-serialize it
+                msg.getHeaders().put(SERIALIZE_HEADER, true);
+
+                properties = getMessageConverter().buildProperties(camelExchange).build();
+
+                try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) {
+                    o.writeObject(msg.getBody());
+                    body = b.toByteArray();
+                }
+            }
+            else if (msg.getBody() == null) {
+                properties = getMessageConverter().buildProperties(camelExchange).build();
+                body = null;
+            }
+            else {
+                LOG.warn("Could not convert {} to byte[]", msg.getBody());
+                throw new IOException(e);
+            }
+        }
+        String rabbitExchange = getExchangeName(msg);
+
+        Boolean mandatory = camelExchange.getIn().getHeader(RabbitMQConstants.MANDATORY, isMandatory(), Boolean.class);
+        Boolean immediate = camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, isImmediate(), Boolean.class);
+
+
+        LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId());
+
+        channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body);
+    }
+
+    /**
+     * Extracts name of the rabbitmq exchange
+     * 
+     * @param msg
+     * @return
+     */
+    protected String getExchangeName(Message msg) {
+        String exchangeName = msg.getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class);
+        // If it is BridgeEndpoint we should ignore the message header of EXCHANGE_NAME
+        if (exchangeName == null || isBridgeEndpoint()) {
+            exchangeName = getExchangeName();
+        }
+        return exchangeName;
     }
 
     @Override
@@ -712,9 +865,6 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
         return channelPoolMaxSize;
     }
 
-    /**
-     * Set maximum number of opened channel in pool
-     */
     public void setChannelPoolMaxSize(int channelPoolMaxSize) {
         this.channelPoolMaxSize = channelPoolMaxSize;
     }
@@ -763,14 +913,14 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     public ArgsConfigurer getQueueArgsConfigurer() {
         return queueArgsConfigurer;
     }
-    
+
     /**
      * Set the configurer for setting the queue args in Channel.queueDeclare
      */
     public void setQueueArgsConfigurer(ArgsConfigurer queueArgsConfigurer) {
         this.queueArgsConfigurer = queueArgsConfigurer;
     }
-    
+
     public ArgsConfigurer getExchangeArgsConfigurer() {
         return exchangeArgsConfigurer;
     }
@@ -781,4 +931,58 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     public void setExchangeArgsConfigurer(ArgsConfigurer exchangeArgsConfigurer) {
         this.exchangeArgsConfigurer = exchangeArgsConfigurer;
     }
+
+    /**
+     * Set timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds)
+     */
+    public void setRequestTimeout(long requestTimeout) {
+        this.requestTimeout = requestTimeout;
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    /**
+     * Set requestTimeoutCheckerInterval for inOut exchange
+     */
+    public void setRequestTimeoutCheckerInterval(long requestTimeoutCheckerInterval) {
+        this.requestTimeoutCheckerInterval = requestTimeoutCheckerInterval;
+    }
+
+    public long getRequestTimeoutCheckerInterval() {
+        return requestTimeoutCheckerInterval;
+    }
+
+    /**
+     * Get useMessageIDAsCorrelationID for inOut exchange
+     */
+    public boolean isUseMessageIDAsCorrelationID() {
+        return useMessageIDAsCorrelationID;
+    }
+
+    /**
+     * When true and an inOut Exchange failed on the consumer side send the caused Exception back in the response 
+     */
+    public void setTransferException(boolean transferException) {
+        this.transferException = transferException;
+    }
+
+    public boolean isTransferException() {
+        return transferException;
+    }
+
+    /**
+     * Get replyToType for inOut exchange
+     */
+    public String getReplyToType() {
+        return replyToType;
+    }
+
+    /**
+     * Gets the Queue to reply to if you dont want to use temporary reply queues
+     */
+    public String getReplyTo() {
+        return replyTo;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
new file mode 100644
index 0000000..95abe81
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.LongString;
+
+public class RabbitMQMessageConverter {
+    protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessageConverter.class);
+
+    /**
+     * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
+     */
+    public void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) {
+
+        if (properties.getType() != null) {
+            exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType());
+        }
+        if (properties.getAppId() != null) {
+            exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId());
+        }
+        if (properties.getClusterId() != null) {
+            exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId());
+        }
+        if (properties.getContentEncoding() != null) {
+            exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding());
+        }
+        if (properties.getContentType() != null) {
+            exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType());
+        }
+        if (properties.getCorrelationId() != null) {
+            exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId());
+        }
+        if (properties.getExpiration() != null) {
+            exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration());
+        }
+        if (properties.getMessageId() != null) {
+            exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId());
+        }
+        if (properties.getPriority() != null) {
+            exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority());
+        }
+        if (properties.getReplyTo() != null) {
+            exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo());
+        }
+        if (properties.getTimestamp() != null) {
+            exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp());
+        }
+        if (properties.getUserId() != null) {
+            exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId());
+        }
+    }
+
+    public AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
+        AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
+
+        Message msg;
+        if (exchange.hasOut()) {
+            msg = exchange.getOut();
+        } else {
+            msg = exchange.getIn();
+        }
+
+        final Object contentType = msg.getHeader(RabbitMQConstants.CONTENT_TYPE);
+        if (contentType != null) {
+            properties.contentType(contentType.toString());
+        }
+
+        final Object priority = msg.getHeader(RabbitMQConstants.PRIORITY);
+        if (priority != null) {
+            properties.priority(Integer.parseInt(priority.toString()));
+        }
+
+        final Object messageId = msg.getHeader(RabbitMQConstants.MESSAGE_ID);
+        if (messageId != null) {
+            properties.messageId(messageId.toString());
+        }
+
+        final Object clusterId = msg.getHeader(RabbitMQConstants.CLUSTERID);
+        if (clusterId != null) {
+            properties.clusterId(clusterId.toString());
+        }
+
+        final Object replyTo = msg.getHeader(RabbitMQConstants.REPLY_TO);
+        if (replyTo != null) {
+            properties.replyTo(replyTo.toString());
+        }
+
+        final Object correlationId = msg.getHeader(RabbitMQConstants.CORRELATIONID);
+        if (correlationId != null) {
+            properties.correlationId(correlationId.toString());
+        }
+
+        final Object deliveryMode = msg.getHeader(RabbitMQConstants.DELIVERY_MODE);
+        if (deliveryMode != null) {
+            properties.deliveryMode(Integer.parseInt(deliveryMode.toString()));
+        }
+
+        final Object userId = msg.getHeader(RabbitMQConstants.USERID);
+        if (userId != null) {
+            properties.userId(userId.toString());
+        }
+
+        final Object type = msg.getHeader(RabbitMQConstants.TYPE);
+        if (type != null) {
+            properties.type(type.toString());
+        }
+
+        final Object contentEncoding = msg.getHeader(RabbitMQConstants.CONTENT_ENCODING);
+        if (contentEncoding != null) {
+            properties.contentEncoding(contentEncoding.toString());
+        }
+
+        final Object expiration = msg.getHeader(RabbitMQConstants.EXPIRATION);
+        if (expiration != null) {
+            properties.expiration(expiration.toString());
+        }
+
+        final Object appId = msg.getHeader(RabbitMQConstants.APP_ID);
+        if (appId != null) {
+            properties.appId(appId.toString());
+        }
+
+        final Object timestamp = msg.getHeader(RabbitMQConstants.TIMESTAMP);
+        if (timestamp != null) {
+            properties.timestamp(new Date(Long.parseLong(timestamp.toString())));
+        }
+
+        final Map<String, Object> headers = msg.getHeaders();
+        Map<String, Object> filteredHeaders = new HashMap<String, Object>();
+
+        // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader
+        for (Map.Entry<String, Object> header : headers.entrySet()) {
+
+            // filter header values.
+            Object value = getValidRabbitMQHeaderValue(header.getValue());
+            if (value != null) {
+                filteredHeaders.put(header.getKey(), header.getValue());
+            } else if (LOG.isDebugEnabled()) {
+                if (header.getValue() == null) {
+                    LOG.debug("Ignoring header: {} with null value", header.getKey());
+                } else {
+                    LOG.debug("Ignoring header: {} of class: {} with value: {}",
+                                    new Object[] { header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue() });
+                }
+            }
+        }
+
+        properties.headers(filteredHeaders);
+
+        return properties;
+    }
+
+    /**
+     * Strategy to test if the given header is valid. Without this, the
+     * com.rabbitmq.client.impl.Frame.java class will throw an
+     * IllegalArgumentException (invalid value in table) and close the
+     * connection.
+     *
+     * @param headerValue the header value
+     * @return the value to use, <tt>null</tt> to ignore this header
+     * @see com.rabbitmq.client.impl.Frame#fieldValueSize
+     */
+    private Object getValidRabbitMQHeaderValue(Object headerValue) {
+        if (headerValue instanceof String) {
+            return headerValue;
+        } else if (headerValue instanceof BigDecimal) {
+            return headerValue;
+        } else if (headerValue instanceof Number) {
+            return headerValue;
+        } else if (headerValue instanceof Boolean) {
+            return headerValue;
+        } else if (headerValue instanceof Date) {
+            return headerValue;
+        } else if (headerValue instanceof byte[]) {
+            return headerValue;
+        } else if (headerValue instanceof LongString) {
+            return headerValue;
+        } else if (headerValue instanceof Timestamp) {
+            return headerValue;
+        } else if (headerValue instanceof Byte) {
+            return headerValue;
+        } else if (headerValue instanceof Double) {
+            return headerValue;
+        } else if (headerValue instanceof Float) {
+            return headerValue;
+        } else if (headerValue instanceof Long) {
+            return headerValue;
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 28858a6..b8c8ba2 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -17,28 +17,38 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
+
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.component.rabbitmq.pool.PoolableChannelFactory;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.component.rabbitmq.reply.ReplyManager;
+import org.apache.camel.component.rabbitmq.reply.TemporaryQueueReplyManager;
+import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.pool.ObjectPool;
 import org.apache.commons.pool.impl.GenericObjectPool;
 
-public class RabbitMQProducer extends DefaultProducer {
+public class RabbitMQProducer extends DefaultAsyncProducer {
 
     private Connection conn;
     private ObjectPool<Channel> channelPool;
     private ExecutorService executorService;
     private int closeTimeout = 30 * 1000;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-";
+    private ReplyManager replyManager;
 
     public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
         super(endpoint);
@@ -115,6 +125,7 @@ public class RabbitMQProducer extends DefaultProducer {
 
     @Override
     protected void doStop() throws Exception {
+        unInitReplyManager();
         closeConnectionAndChannel();
         if (executorService != null) {
             getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
@@ -122,13 +133,76 @@ public class RabbitMQProducer extends DefaultProducer {
         }
     }
 
-    @Override
-    public void process(Exchange exchange) throws Exception {
-        String exchangeName = exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class);
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // deny processing if we are not started
+        if (!isRunAllowed()) {
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException());
+            }
+            // we cannot process so invoke callback
+            callback.done(true);
+            return true;
+        }
+
+        try {
+            if (exchange.getPattern().isOutCapable()) {
+                // in out requires a bit more work than in only
+                return processInOut(exchange, callback);
+            } else {
+                // in only
+                return processInOnly(exchange, callback);
+            }
+        } catch (Throwable e) {
+            // must catch exception to ensure callback is invoked as expected
+            // to let Camel error handling deal with this
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+    }
+
+    protected boolean processInOut(final Exchange exchange, final AsyncCallback callback) throws Exception {
+        final org.apache.camel.Message in = exchange.getIn();
+
+        initReplyManager();
+
+        // the request timeout can be overruled by a header otherwise the endpoint configured value is used
+        final long timeout = exchange.getIn().getHeader(RabbitMQConstants.REQUEST_TIMEOUT, getEndpoint().getRequestTimeout(), long.class);
+
+        final String originalCorrelationId = in.getHeader(RabbitMQConstants.CORRELATIONID, String.class);
+
+        // we append the 'Camel-' prefix to know it was generated by us
+        String correlationId = GENERATED_CORRELATION_ID_PREFIX + getEndpoint().getCamelContext().getUuidGenerator().generateUuid();
+        in.setHeader(RabbitMQConstants.CORRELATIONID, correlationId);
+
+        in.setHeader(RabbitMQConstants.REPLY_TO, replyManager.getReplyTo());
+
+        String exchangeName = in.getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class);
         // If it is BridgeEndpoint we should ignore the message header of EXCHANGE_NAME
         if (exchangeName == null || getEndpoint().isBridgeEndpoint()) {
             exchangeName = getEndpoint().getExchangeName();
         }
+
+        String key = in.getHeader(RabbitMQConstants.ROUTING_KEY, null, String.class);
+        // we just need to make sure RoutingKey option take effect if it is not BridgeEndpoint
+        if (key == null || getEndpoint().isBridgeEndpoint()) {
+            key = getEndpoint().getRoutingKey() == null ? "" : getEndpoint().getRoutingKey();
+        }
+        if (ObjectHelper.isEmpty(key) && ObjectHelper.isEmpty(exchangeName)) {
+            throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + getEndpoint());
+        }
+        log.debug("Registering reply for {}", correlationId);
+
+        replyManager.registerReply(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout);
+
+        basicPublish(exchange, exchangeName, key);
+        // continue routing asynchronously (reply will be processed async when its received)
+        return false;
+    }
+
+    private boolean processInOnly(Exchange exchange, AsyncCallback callback) throws Exception {
+        String exchangeName = getEndpoint().getExchangeName(exchange.getIn());
+
         String key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY, null, String.class);
         // we just need to make sure RoutingKey option take effect if it is not BridgeEndpoint
         if (key == null || getEndpoint().isBridgeEndpoint()) {
@@ -137,12 +211,13 @@ public class RabbitMQProducer extends DefaultProducer {
         if (ObjectHelper.isEmpty(key) && ObjectHelper.isEmpty(exchangeName)) {
             throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + getEndpoint());
         }
-        byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class);
-        AMQP.BasicProperties properties = buildProperties(exchange).build();
-        Boolean mandatory = exchange.getIn().getHeader(RabbitMQConstants.MANDATORY, getEndpoint().isMandatory(), Boolean.class);
-        Boolean immediate = exchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, getEndpoint().isImmediate(), Boolean.class);
-        
-        basicPublish(exchangeName, key, mandatory, immediate, properties, messageBodyBytes);
+
+        basicPublish(exchange, exchangeName, key);
+        if (callback != null) {
+            // we are synchronous so return true
+            callback.done(true);
+        }
+        return true;
     }
 
     /**
@@ -150,13 +225,10 @@ public class RabbitMQProducer extends DefaultProducer {
      *
      * @param exchange   Target exchange
      * @param routingKey Routing key
-     * @param mandatory  This flag tells the server how to react if the message cannot be routed to a queue.
-     * @param immediate  This flag tells the server how to react if the message cannot be routed to a queue consumer immediately.
      * @param properties Header properties
      * @param body       Body content
      */
-    private void basicPublish(final String exchange, final String routingKey, final boolean mandatory, final boolean immediate,  
-                              final AMQP.BasicProperties properties, final byte[] body) throws Exception {
+    private void basicPublish(final Exchange camelExchange, final String rabbitExchange, final String routingKey) throws Exception {
         if (channelPool == null) {
             // Open connection and channel lazily
             openConnectionAndChannelPool();
@@ -164,135 +236,95 @@ public class RabbitMQProducer extends DefaultProducer {
         execute(new ChannelCallback<Void>() {
             @Override
             public Void doWithChannel(Channel channel) throws Exception {
-                channel.basicPublish(exchange, routingKey, mandatory, immediate, properties, body);
+                getEndpoint().publishExchangeToChannel(camelExchange, channel, routingKey);
                 return null;
             }
         });
     }
 
     AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
-        AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
-
-        final Object contentType = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_TYPE);
-        if (contentType != null) {
-            properties.contentType(contentType.toString());
-        }
-
-        final Object priority = exchange.getIn().getHeader(RabbitMQConstants.PRIORITY);
-        if (priority != null) {
-            properties.priority(Integer.parseInt(priority.toString()));
-        }
-
-        final Object messageId = exchange.getIn().getHeader(RabbitMQConstants.MESSAGE_ID);
-        if (messageId != null) {
-            properties.messageId(messageId.toString());
-        }
-
-        final Object clusterId = exchange.getIn().getHeader(RabbitMQConstants.CLUSTERID);
-        if (clusterId != null) {
-            properties.clusterId(clusterId.toString());
-        }
-
-        final Object replyTo = exchange.getIn().getHeader(RabbitMQConstants.REPLY_TO);
-        if (replyTo != null) {
-            properties.replyTo(replyTo.toString());
-        }
-
-        final Object correlationId = exchange.getIn().getHeader(RabbitMQConstants.CORRELATIONID);
-        if (correlationId != null) {
-            properties.correlationId(correlationId.toString());
-        }
-
-        final Object deliveryMode = exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_MODE);
-        if (deliveryMode != null) {
-            properties.deliveryMode(Integer.parseInt(deliveryMode.toString()));
-        }
-
-        final Object userId = exchange.getIn().getHeader(RabbitMQConstants.USERID);
-        if (userId != null) {
-            properties.userId(userId.toString());
-        }
-
-        final Object type = exchange.getIn().getHeader(RabbitMQConstants.TYPE);
-        if (type != null) {
-            properties.type(type.toString());
-        }
-
-        final Object contentEncoding = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_ENCODING);
-        if (contentEncoding != null) {
-            properties.contentEncoding(contentEncoding.toString());
-        }
-
-        final Object expiration = exchange.getIn().getHeader(RabbitMQConstants.EXPIRATION);
-        if (expiration != null) {
-            properties.expiration(expiration.toString());
-        }
+        return getEndpoint().getMessageConverter().buildProperties(exchange);
+    }
 
-        final Object appId = exchange.getIn().getHeader(RabbitMQConstants.APP_ID);
-        if (appId != null) {
-            properties.appId(appId.toString());
-        }
+    public int getCloseTimeout() {
+        return closeTimeout;
+    }
 
-        final Object timestamp = exchange.getIn().getHeader(RabbitMQConstants.TIMESTAMP);
-        if (timestamp != null) {
-            properties.timestamp(new Date(Long.parseLong(timestamp.toString())));
-        }
+    public void setCloseTimeout(int closeTimeout) {
+        this.closeTimeout = closeTimeout;
+    }
 
-        final Map<String, Object> headers = exchange.getIn().getHeaders();
-        Map<String, Object> filteredHeaders = new HashMap<String, Object>();
-
-        // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader
-        for (Map.Entry<String, Object> header : headers.entrySet()) {
-
-            // filter header values.
-            Object value = getValidRabbitMQHeaderValue(header.getValue());
-            if (value != null) {
-                filteredHeaders.put(header.getKey(), header.getValue());
-            } else if (log.isDebugEnabled()) {
-                if (header.getValue() == null) {
-                    log.debug("Ignoring header: {} with null value", header.getKey());
-                } else {
-                    log.debug("Ignoring header: {} of class: {} with value: {}",
-                            new Object[]{header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue()});
+    protected void initReplyManager() {
+        if (!started.get()) {
+            synchronized (this) {
+                if (started.get()) {
+                    return;
                 }
+                log.debug("Starting reply manager");
+                // must use the classloader from the application context when creating reply manager,
+                // as it should inherit the classloader from app context and not the current which may be
+                // a different classloader
+                ClassLoader current = Thread.currentThread().getContextClassLoader();
+                ClassLoader ac = getEndpoint().getCamelContext().getApplicationContextClassLoader();
+                try {
+                    if (ac != null) {
+                        Thread.currentThread().setContextClassLoader(ac);
+                    }
+                    // validate that replyToType and replyTo is configured accordingly
+                    if (getEndpoint().getReplyToType() != null) {
+                        // setting temporary with a fixed replyTo is not supported
+                        if (getEndpoint().getReplyTo() != null && getEndpoint().getReplyToType().equals(ReplyToType.Temporary.name())) {
+                            throw new IllegalArgumentException("ReplyToType " + ReplyToType.Temporary
+                                            + " is not supported when replyTo " + getEndpoint().getReplyTo() + " is also configured.");
+                        }
+                    }
+
+                    if (getEndpoint().getReplyTo() != null) {
+                        // specifying reply queues is not currently supported
+                        throw new IllegalArgumentException("Specifying replyTo " + getEndpoint().getReplyTo() + " is currently not supported.");
+                    } else {
+                        replyManager = createReplyManager();
+                        log.debug("Using RabbitMQReplyManager: {} to process replies from temporary queue", replyManager);
+                    }
+                } catch (Exception e) {
+                    throw new FailedToCreateProducerException(getEndpoint(), e);
+                } finally {
+                    if (ac != null) {
+                        Thread.currentThread().setContextClassLoader(current);
+                    }
+                }
+                started.set(true);
             }
         }
-
-        properties.headers(filteredHeaders);
-
-        return properties;
     }
 
-    /**
-     * Strategy to test if the given header is valid
-     *
-     * @param headerValue the header value
-     * @return the value to use, <tt>null</tt> to ignore this header
-     * @see com.rabbitmq.client.impl.Frame#fieldValueSize
-     */
-    private Object getValidRabbitMQHeaderValue(Object headerValue) {
-        if (headerValue instanceof String) {
-            return headerValue;
-        } else if (headerValue instanceof BigDecimal) {
-            return headerValue;
-        } else if (headerValue instanceof Number) {
-            return headerValue;
-        } else if (headerValue instanceof Boolean) {
-            return headerValue;
-        } else if (headerValue instanceof Date) {
-            return headerValue;
-        } else if (headerValue instanceof byte[]) {
-            return headerValue;
+    protected void unInitReplyManager() {
+        try {
+            if (replyManager != null) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Stopping JmsReplyManager: {} from processing replies from: {}", replyManager,
+                                    getEndpoint().getReplyTo() != null ? getEndpoint().getReplyTo() : "temporary queue");
+                }
+                ServiceHelper.stopService(replyManager);
+            }
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            started.set(false);
         }
-        return null;
     }
 
-    public int getCloseTimeout() {
-        return closeTimeout;
-    }
+    protected ReplyManager createReplyManager() throws Exception {
+        // use a temporary queue
+        ReplyManager replyManager = new TemporaryQueueReplyManager(getEndpoint().getCamelContext());
+        replyManager.setEndpoint(getEndpoint());
 
-    public void setCloseTimeout(int closeTimeout) {
-        this.closeTimeout = closeTimeout;
-    }
+        String name = "RabbitMQReplyManagerTimeoutChecker[" + getEndpoint().getExchangeName() + "]";
+        ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name);
+        replyManager.setScheduledExecutorService(replyManagerExecutorService);
+        log.info("Starting reply manager service " + name);
+        ServiceHelper.startService(replyManager);
 
-}
+        return replyManager;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ReplyToType.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ReplyToType.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ReplyToType.java
new file mode 100644
index 0000000..dc805d5
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ReplyToType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+/**
+ * Types for replyTo queues
+ *
+ * @version 
+ */
+public enum ReplyToType {
+    Temporary, Shared, Exclusive
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationListener.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationListener.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationListener.java
new file mode 100644
index 0000000..c87381f
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationListener.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+/**
+ * Listener for events when correlation id's changes.
+ */
+public interface CorrelationListener {
+
+    /**
+     * Callback when a new correlation id is added
+     *
+     * @param key the correlation id
+     */
+    void onPut(String key);
+
+    /**
+     * Callback when a correlation id is removed
+     *
+     * @param key the correlation id
+     */
+    void onRemove(String key);
+
+    /**
+     * Callback when a correlation id is evicted due timeout
+     *
+     * @param key the correlation id
+     */
+    void onEviction(String key);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
new file mode 100644
index 0000000..fad4fc0
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
@@ -0,0 +1,120 @@
+package org.apache.camel.component.rabbitmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.support.DefaultTimeoutMap;
+
+/**
+ * A {@link org.apache.camel.TimeoutMap} which is used to track reply messages which
+ * has been timed out, and thus should trigger the waiting {@link org.apache.camel.Exchange} to
+ * timeout as well.
+ *
+ * @version 
+ */
+public class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
+
+    private CorrelationListener listener;
+
+    public CorrelationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+        super(executor, requestMapPollTimeMillis);
+    }
+
+    public void setListener(CorrelationListener listener) {
+        // there is only one listener needed
+        this.listener = listener;
+    }
+
+    public boolean onEviction(String key, ReplyHandler value) {
+        try {
+            if (listener != null) {
+                listener.onEviction(key);
+            }
+        } catch (Throwable e) {
+            // ignore
+        }
+
+        // trigger timeout
+        try {
+            value.onTimeout(key);
+        } catch (Throwable e) {
+            // must ignore so we ensure we evict the element
+            log.warn("Error processing onTimeout for correlationID: " + key + " due: " + e.getMessage() + ". This exception is ignored.", e);
+        }
+
+        // return true to remove the element
+        log.trace("Evicted correlationID: {}", key);
+        return true;
+    }
+
+    @Override
+    public ReplyHandler get(String key) {
+        ReplyHandler answer = super.get(key);
+        log.trace("Get correlationID: {} -> {}", key, answer != null);
+        return answer;
+    }
+
+    @Override
+    public ReplyHandler put(String key, ReplyHandler value, long timeoutMillis) {
+        try {
+            if (listener != null) {
+                listener.onPut(key);
+            }
+        } catch (Throwable e) {
+            // ignore
+        }
+
+        ReplyHandler result;
+        if (timeoutMillis <= 0) {
+            // no timeout (must use Integer.MAX_VALUE)
+            result = super.put(key, value, Integer.MAX_VALUE);
+        } else {
+            result = super.put(key, value, timeoutMillis);
+        }
+        log.info("Added correlationID: {} to timeout after: {} millis", key, timeoutMillis);
+        return result;
+    }
+
+    @Override
+    public ReplyHandler putIfAbsent(String key, ReplyHandler value, long timeoutMillis) {
+    	log.info("in putIfAbsent with key {}", key);
+    	
+        try {
+            if (listener != null) {
+                listener.onPut(key);
+            }
+        } catch (Throwable e) {
+            // ignore
+        }
+
+        ReplyHandler result;
+        if (timeoutMillis <= 0) {
+            // no timeout (must use Integer.MAX_VALUE)
+            result = super.putIfAbsent(key, value, Integer.MAX_VALUE);
+        } else {
+            result = super.putIfAbsent(key, value, timeoutMillis);
+        }
+        if (result == null) {
+            log.trace("Added correlationID: {} to timeout after: {} millis", key, timeoutMillis);
+        } else {
+            log.trace("Duplicate correlationID: {} detected", key);
+        }
+        return result;
+    }
+
+    @Override
+    public ReplyHandler remove(String key) {
+        try {
+            if (listener != null) {
+                listener.onRemove(key);
+            }
+        } catch (Throwable e) {
+            // ignore
+        }
+
+        ReplyHandler answer = super.remove(key);
+        log.trace("Removed correlationID: {} -> {}", key, answer != null);
+        return answer;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java
new file mode 100644
index 0000000..09c2130
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import com.rabbitmq.client.Connection;
+
+
+
+/**
+ * Callback when a {@link Message} has been sent.
+ *
+ * @version 
+ */
+public interface MessageSentCallback {
+
+    /**
+     * Callback when the message has been sent.
+     *
+     * @param session     the session
+     * @param message     the message
+     * @param destination the destination
+     */
+    void sent(Connection session, byte[] message, String destination);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java
new file mode 100644
index 0000000..b4f5804
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * {@link ReplyHandler} to handle processing replies when using regular queues.
+ *
+ * @version 
+ */
+public class QueueReplyHandler extends TemporaryQueueReplyHandler {
+
+    public QueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+                             String originalCorrelationId, String correlationId, long timeout) {
+        super(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java
new file mode 100644
index 0000000..a8fe319
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import com.rabbitmq.client.AMQP;
+
+
+/**
+ * Handles a reply.
+ *
+ * @version 
+ */
+public interface ReplyHandler {
+
+    /**
+     * The reply message was received
+     *
+     * @param correlationId  the correlation id
+     * @param reply  the reply message
+     */
+    void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply);
+
+    /**
+     * The reply message was not received and a timeout triggered
+     *
+     * @param correlationId  the correlation id
+     */
+    void onTimeout(String correlationId);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
new file mode 100644
index 0000000..d2890d0
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+import com.rabbitmq.client.AMQP;
+
+/**
+ * Holder which contains the {@link Exchange} and {@link org.apache.camel.AsyncCallback} to be used
+ * when the reply arrives, so we can set the reply on the {@link Exchange} and continue routing using the callback.
+ *
+ * @version 
+ */
+public class ReplyHolder {
+
+    private final Exchange exchange;
+    private final AsyncCallback callback;
+    private final byte[] message;
+    private final String originalCorrelationId;
+    private final String correlationId;
+    private long timeout;
+    private AMQP.BasicProperties properties;
+
+    /**
+     * Constructor to use when a reply message was received
+     */
+    public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
+                       String correlationId,AMQP.BasicProperties properties, byte[] message) {
+        this.exchange = exchange;
+        this.callback = callback;
+        this.originalCorrelationId = originalCorrelationId;
+        this.correlationId = correlationId;
+        this.properties = properties;
+        this.message = message;
+    }
+
+    /**
+     * Constructor to use when a timeout occurred
+     */
+    public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
+                       String correlationId, long timeout) {
+        this(exchange, callback, originalCorrelationId, correlationId, null, null);
+        this.timeout = timeout;
+    }
+
+    public Exchange getExchange() {
+        return exchange;
+    }
+
+    public AsyncCallback getCallback() {
+        return callback;
+    }
+
+    /**
+     * Gets the original correlation id, if one was set when sending the message.
+     * <p/>
+     * Some JMS brokers will mess with the correlation id and send back a different/empty correlation id.
+     * So we need to remember it so we can restore the correlation id.
+     */
+    public String getOriginalCorrelationId() {
+        return originalCorrelationId;
+    }
+
+    /**
+     * Gets the correlation id
+     *
+     * @see #getOriginalCorrelationId()
+     */
+    public String getCorrelationId() {
+        return correlationId;
+    }
+
+    /**
+     * Gets the received message
+     *
+     * @return  the received message, or <tt>null</tt> if timeout occurred and no message has been received
+     * @see #isTimeout()
+     */
+    public byte[] getMessage() {
+        return message;
+    }
+
+    /**
+     * Whether timeout triggered or not.
+     * <p/>
+     * A timeout is triggered if <tt>requestTimeout</tt> option has been configured, and a reply message has <b>not</b> been
+     * received within that time frame.
+     */
+    public boolean isTimeout() {
+        return message == null;
+    }
+
+    /**
+     * The timeout value
+     */
+    public long getRequestTimeout() {
+        return timeout;
+    }
+    
+    /**
+     * The message properties
+     * @return
+     */
+    public AMQP.BasicProperties getProperties(){
+    	return properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
new file mode 100644
index 0000000..7c1c015
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
@@ -0,0 +1,76 @@
+package org.apache.camel.component.rabbitmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
+
+
+/**
+ * The {@link ReplyManager} is responsible for handling <a href="http://camel.apache.org/request-reply.html">request-reply</a>
+ * over RabbitMQ.
+ *
+ * @version 
+ */
+public interface ReplyManager {
+
+	/**
+     * Sets the belonging {@link org.apache.camel.component.jms.JmsEndpoint}.
+     */
+    void setEndpoint(RabbitMQEndpoint endpoint);
+    
+    /**
+     * Sets the reply to queue the manager should listen for replies.
+     * <p/>
+     * The queue is either a temporary or a persistent queue.
+     */
+    void setReplyTo(String replyTo);
+    
+    /**
+     * Gets the reply to queue being used
+     */
+    String getReplyTo();
+    
+	/**
+     * Register a reply
+     *
+     * @param replyManager    the reply manager being used
+     * @param exchange        the exchange
+     * @param callback        the callback
+     * @param originalCorrelationId  an optional original correlation id
+     * @param correlationId   the correlation id to expect being used
+     * @param requestTimeout  the timeout
+     * @return the correlation id used
+     */
+    String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+                         String originalCorrelationId, String correlationId, long requestTimeout);
+    
+    /**
+     * Sets the scheduled to use when checking for timeouts (no reply received within a given time period)
+     */
+    void setScheduledExecutorService(ScheduledExecutorService executorService);
+
+
+    /**
+     * Updates the correlation id to the new correlation id.
+     * <p/>
+     * This is only used when <tt>useMessageIDasCorrelationID</tt> option is used, which means a
+     * provisional correlation id is first used, then after the message has been sent, the real
+     * correlation id is known. This allows us then to update the internal mapping to expect the
+     * real correlation id.
+     *
+     * @param correlationId     the provisional correlation id
+     * @param newCorrelationId  the real correlation id
+     * @param requestTimeout    the timeout
+     */
+    void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout);
+    
+
+    /**
+     * Process the reply
+     *
+     * @param holder  containing needed data to process the reply and continue routing
+     */
+    void processReply(ReplyHolder holder);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
new file mode 100644
index 0000000..6e41f99
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -0,0 +1,238 @@
+package org.apache.camel.component.rabbitmq.reply;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.component.rabbitmq.RabbitMQConstants;
+import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Connection;
+
+
+public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager{
+
+	protected final Logger log = LoggerFactory.getLogger(ReplyManagerSupport.class);
+    protected final CamelContext camelContext;
+    
+    protected ScheduledExecutorService executorService;
+    protected RabbitMQEndpoint endpoint;
+    protected String replyTo;
+    protected Connection listenerContainer;
+    private int closeTimeout = 30 * 1000;
+    protected final CountDownLatch replyToLatch = new CountDownLatch(1);
+    protected final long replyToTimeout = 1000;
+    protected CorrelationTimeoutMap correlation;
+
+    public ReplyManagerSupport(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public void setScheduledExecutorService(ScheduledExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    public void setEndpoint(RabbitMQEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public void setReplyTo(String replyTo) {
+        log.debug("ReplyTo destination: {}", replyTo);
+        this.replyTo = replyTo;
+        // trigger latch as the reply to has been resolved and set
+        replyToLatch.countDown();
+    }
+
+    public String getReplyTo() {
+        if (replyTo != null) {
+            return replyTo;
+        }
+        try {
+            // the reply to destination has to be resolved using a DestinationResolver using
+            // the MessageListenerContainer which occurs asynchronously so we have to wait
+            // for that to happen before we can retrieve the reply to destination to be used
+            log.trace("Waiting for replyTo to be set");
+            boolean done = replyToLatch.await(replyToTimeout, TimeUnit.MILLISECONDS);
+            if (!done) {
+                log.warn("ReplyTo destination was not set and timeout occurred");
+            } else {
+                log.trace("Waiting for replyTo to be set done");
+            }
+        } catch (InterruptedException e) {
+            // ignore
+        }
+        return replyTo;
+    }
+    
+    public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+                                String originalCorrelationId, String correlationId, long requestTimeout) {
+    	log.debug("in registerReply");
+        // add to correlation map
+        QueueReplyHandler handler = new QueueReplyHandler(replyManager, exchange, callback,
+                originalCorrelationId, correlationId, requestTimeout);
+        // Just make sure we don't override the old value of the correlationId
+        ReplyHandler result = correlation.putIfAbsent(correlationId, handler, requestTimeout);
+        if (result != null) {
+            String logMessage = String.format("The correlationId [%s] is not unique.", correlationId);
+            throw new IllegalArgumentException(logMessage);
+        }
+        return correlationId;
+    }
+    
+    
+    protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+                                                       String originalCorrelationId, String correlationId, long requestTimeout);
+
+    public void onMessage(AMQP.BasicProperties properties, byte[] message) {
+        String correlationID = properties.getCorrelationId();
+
+        if (correlationID == null) {
+            log.warn("Ignoring message with no correlationID: {}", message);
+            return;
+        }
+
+        log.debug("Received reply message with correlationID [{}] -> {}", correlationID, message);
+
+        // handle the reply message
+        handleReplyMessage(correlationID, properties, message);
+    }
+
+    public void processReply(ReplyHolder holder) {
+    	log.info("in processReply");
+        if (holder != null && isRunAllowed()) {
+            try {
+                Exchange exchange = holder.getExchange();
+
+                boolean timeout = holder.isTimeout();
+                if (timeout) {
+                    // timeout occurred do a WARN log so its easier to spot in the logs
+                    if (log.isWarnEnabled()) {
+                        log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}."
+                                + " Setting ExchangeTimedOutException on {} and continue routing.",
+                                new Object[]{holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange)});
+                    }
+
+                    // no response, so lets set a timed out exception
+                    String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo;
+                    exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg));
+                } else {
+                	
+                	endpoint.setRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage());
+                	
+                	// restore correlation id in case the remote server messed with it
+                    if (holder.getOriginalCorrelationId() != null) {
+                    	if(exchange.getOut() != null){
+                            exchange.getOut().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId());
+                    	}
+                    	else{
+                            exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId());
+                    	}
+                    }
+                	
+                }
+            } finally {
+                // notify callback
+                AsyncCallback callback = holder.getCallback();
+                callback.done(false);
+            }
+        }
+    }
+    
+    
+
+    protected abstract void handleReplyMessage(String correlationID, AMQP.BasicProperties properties, byte[] message);
+
+    protected abstract Connection createListenerContainer() throws Exception;
+
+    /**
+     * <b>IMPORTANT:</b> This logic is only being used due to high performance in-memory only
+     * testing using InOut over JMS. Its unlikely to happen in a real life situation with communication
+     * to a remote broker, which always will be slower to send back reply, before Camel had a chance
+     * to update it's internal correlation map.
+     */
+    protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, byte[] message) {
+        // race condition, when using messageID as correlationID then we store a provisional correlation id
+        // at first, which gets updated with the JMSMessageID after the message has been sent. And in the unlikely
+        // event that the reply comes back really really fast, and the correlation map hasn't yet been updated
+        // from the provisional id to the JMSMessageID. If so we have to wait a bit and lookup again.
+        if (log.isWarnEnabled()) {
+            log.warn("Early reply received with correlationID [{}] -> {}", correlationID, message);
+        }
+
+        ReplyHandler answer = null;
+
+        // wait up till 5 seconds
+        boolean done = false;
+        int counter = 0;
+        while (!done && counter++ < 50) {
+            log.trace("Early reply not found handler at attempt {}. Waiting a bit longer.", counter);
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+
+            // try again
+            answer = correlation.get(correlationID);
+            done = answer != null;
+
+            if (answer != null) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}",
+                            new Object[]{correlationID, counter, answer});
+                }
+            }
+        }
+
+        return answer;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(executorService, "executorService", this);
+        ObjectHelper.notNull(endpoint, "endpoint", this);
+
+        // timeout map to use for purging messages which have timed out, while waiting for an expected reply
+        // when doing request/reply over JMS
+        log.debug("Using timeout checker interval with {} millis", endpoint.getRequestTimeoutCheckerInterval());
+        correlation = new CorrelationTimeoutMap(executorService, endpoint.getRequestTimeoutCheckerInterval());
+        ServiceHelper.startService(correlation);
+
+        // create JMS listener and start it
+        
+        listenerContainer = createListenerContainer();
+        
+        log.debug("Using executor {}", executorService);
+        
+    }
+   
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(correlation);
+
+        if (listenerContainer != null) {
+            log.debug("Closing connection: {} with timeout: {} ms.", listenerContainer, closeTimeout);
+            listenerContainer.close(closeTimeout);
+            listenerContainer = null;
+        }
+       
+        // must also stop executor service
+        if (executorService != null) {
+            camelContext.getExecutorServiceManager().shutdownGraceful(executorService);
+            executorService = null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
new file mode 100644
index 0000000..c7fcf41
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.rabbitmq.RabbitMQConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.AMQP;
+
+/**
+ * {@link ReplyHandler} to handle processing replies when using temporary queues.
+ *
+ * @version 
+ */
+public class TemporaryQueueReplyHandler implements ReplyHandler {
+
+	protected final Logger log = LoggerFactory.getLogger(TemporaryQueueReplyHandler.class);
+	
+    // task queue to add the holder so we can process the reply
+    protected final ReplyManager replyManager;
+    protected final Exchange exchange;
+    protected final AsyncCallback callback;
+    // remember the original correlation id, in case the server returns back a reply with a messed up correlation id
+    protected final String originalCorrelationId;
+    protected final String correlationId;
+    protected final long timeout;
+
+    public TemporaryQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+                                      String originalCorrelationId, String correlationId, long timeout) {
+        this.replyManager = replyManager;
+        this.exchange = exchange;
+        this.originalCorrelationId = originalCorrelationId;
+        this.correlationId = correlationId;
+        this.callback = callback;
+        this.timeout = timeout;
+    }
+
+    public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) {
+        // create holder object with the the reply
+    	log.info("in onReply with correlationId= {}", correlationId);
+        ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply);
+        // process the reply
+        replyManager.processReply(holder);
+    }
+
+    public void onTimeout(String correlationId) {
+        // create holder object without the reply which means a timeout occurred
+    	log.info("in onTimeout with correlationId= {}", correlationId);
+        ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, timeout);
+        // process timeout
+        replyManager.processReply(holder);
+    }
+}