You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/06/09 08:52:28 UTC
[1/2] camel git commit: CAMEL-8538 Add inOut support to the
camel-rabitmq component
Repository: camel
Updated Branches:
refs/heads/master 43b79533d -> 8fe4288f2
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
new file mode 100644
index 0000000..6cae778
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.Queue.DeclareOk;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Envelope;
+
+
+/**
+ * A {@link ReplyManager} when using temporary queues.
+ *
+ * @version
+ */
+public class TemporaryQueueReplyManager extends ReplyManagerSupport {
+
+ private RabbitConsumer consumer;
+
+ public TemporaryQueueReplyManager(CamelContext camelContext) {
+ super(camelContext);
+ }
+
+ protected ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+ String originalCorrelationId, String correlationId, long requestTimeout) {
+ return new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, correlationId, requestTimeout);
+ }
+
+ public void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout) {
+ log.trace("Updated provisional correlationId [{}] to expected correlationId [{}]", correlationId, newCorrelationId);
+
+ ReplyHandler handler = correlation.remove(correlationId);
+ if (handler != null) {
+ correlation.put(newCorrelationId, handler, requestTimeout);
+ }
+ }
+
+ @Override
+ protected void handleReplyMessage(String correlationID, AMQP.BasicProperties properties, byte[] message) {
+ ReplyHandler handler = correlation.get(correlationID);
+ if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
+ handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
+ }
+ if (handler != null) {
+ correlation.remove(correlationID);
+ handler.onReply(correlationID, properties, message);
+ } else {
+ // we could not correlate the received reply message to a matching request and therefore
+ // we cannot continue routing the unknown message
+ // log a warn and then ignore the message
+ log.warn("Reply received for unknown correlationID [{}]. The message will be ignored: {}", correlationID, message);
+ }
+ }
+
+ @Override
+ protected Connection createListenerContainer() throws Exception {
+
+ log.debug("Creating connection");
+ Connection conn = endpoint.connect(executorService);
+
+ log.debug("Creating channel");
+ Channel channel = conn.createChannel();
+ // setup the basicQos
+ if (endpoint.isPrefetchEnabled()) {
+ channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
+ endpoint.isPrefetchGlobal());
+ }
+
+ //Let the server pick a random name for us
+ DeclareOk result = channel.queueDeclare();
+ log.debug("Temporary queue name {}", result.getQueue());
+ setReplyTo(result.getQueue());
+
+ //TODO check for the RabbitMQConstants.EXCHANGE_NAME header
+ channel.queueBind(getReplyTo(), endpoint.getExchangeName(), getReplyTo());
+
+ consumer = new RabbitConsumer(this, channel);
+ consumer.start();
+
+ return conn;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ consumer.stop();
+ }
+
+ //TODO combine with class in RabbitMQConsumer
+ class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer {
+
+ private final TemporaryQueueReplyManager consumer;
+ private final Channel channel;
+ private String tag;
+
+ /**
+ * Constructs a new instance and records its association to the
+ * passed-in channel.
+ *
+ * @param channel the channel to which this consumer is attached
+ */
+ public RabbitConsumer(TemporaryQueueReplyManager consumer, Channel channel) {
+ super(channel);
+ this.consumer = consumer;
+ this.channel = channel;
+ }
+
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope,
+ AMQP.BasicProperties properties, byte[] body) throws IOException {
+
+ consumer.onMessage(properties, body);
+ }
+
+ /**
+ * Bind consumer to channel
+ */
+ private void start() throws IOException {
+ tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);
+ }
+
+ /**
+ * Unbind consumer from channel
+ */
+ private void stop() throws IOException {
+ if (channel.isOpen()) {
+ if (tag != null) {
+ channel.basicCancel(tag);
+ }
+ channel.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java
new file mode 100644
index 0000000..3521bec
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import java.util.UUID;
+
+import com.rabbitmq.client.Connection;
+
+
+
+/**
+ * Callback to be used when using the option <tt>useMessageIDAsCorrelationID</tt>.
+ * <p/>
+ * This callback will keep the correlation registration in {@link ReplyManager} up-to-date with
+ * the <tt>JMSMessageID</tt> which was assigned and used when the message was sent.
+ *
+ * @version
+ */
+public class UseMessageIdAsCorrelationIdMessageSentCallback implements MessageSentCallback {
+
+ private ReplyManager replyManager;
+ private String correlationId;
+ private long requestTimeout;
+
+ public UseMessageIdAsCorrelationIdMessageSentCallback(ReplyManager replyManager, String correlationId, long requestTimeout) {
+ this.replyManager = replyManager;
+ this.correlationId = correlationId;
+ this.requestTimeout = requestTimeout;
+ }
+
+ public void sent(Connection session, byte[] message, String destination) {
+ String newCorrelationID = UUID.randomUUID().toString();
+ if (newCorrelationID != null) {
+ replyManager.updateCorrelationId(correlationId, newCorrelationID, requestTimeout);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 52e76c8..19c580f 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.rabbitmq;
import java.io.IOException;
import java.math.BigDecimal;
+import java.sql.Timestamp;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -25,17 +26,18 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Address;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.impl.LongStringHelper;
import org.apache.camel.Exchange;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.mockito.Mockito;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Address;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.impl.LongStringHelper;
+
public class RabbitMQEndpointTest extends CamelTestSupport {
private Envelope envelope = Mockito.mock(Envelope.class);
@@ -107,6 +109,10 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
customHeaders.put("dateHeader", new Date(0));
customHeaders.put("byteArrayHeader", "foo".getBytes());
customHeaders.put("longStringHeader", LongStringHelper.asLongString("Some really long string"));
+ customHeaders.put("timestampHeader", new Timestamp(4200));
+ customHeaders.put("byteHeader", new Byte((byte) 0));
+ customHeaders.put("floatHeader", new Float(42.4242));
+ customHeaders.put("longHeader", new Long(420000000000000000L));
Mockito.when(properties.getHeaders()).thenReturn(customHeaders);
byte[] body = new byte[20];
@@ -122,6 +128,10 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
assertEquals(new Date(0), exchange.getIn().getHeader("dateHeader"));
assertArrayEquals("foo".getBytes(), (byte[]) exchange.getIn().getHeader("byteArrayHeader"));
assertEquals("Some really long string", exchange.getIn().getHeader("longStringHeader"));
+ assertEquals(new Timestamp(4200), exchange.getIn().getHeader("timestampHeader"));
+ assertEquals(new Byte((byte) 0), exchange.getIn().getHeader("byteHeader"));
+ assertEquals(new Float(42.4242), exchange.getIn().getHeader("floatHeader"));
+ assertEquals(new Long(420000000000000000L), exchange.getIn().getHeader("longHeader"));
assertEquals(body, exchange.getIn().getBody());
}
@@ -219,4 +229,22 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
assertEquals(654, connectionFactory.getNetworkRecoveryInterval());
assertFalse(connectionFactory.isTopologyRecoveryEnabled());
}
+
+ @Test
+ public void createEndpointWithTransferExceptionEnabled() throws Exception {
+ RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?transferException=true", RabbitMQEndpoint.class);
+ assertEquals(true, endpoint.isTransferException());
+ }
+
+ @Test
+ public void createEndpointWithReplyTimeout() throws Exception {
+ RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?requestTimeout=2000", RabbitMQEndpoint.class);
+ assertEquals(2000, endpoint.getRequestTimeout());
+ }
+
+ @Test
+ public void createEndpointWithRequestTimeoutCheckerInterval() throws Exception {
+ RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?requestTimeoutCheckerInterval=1000", RabbitMQEndpoint.class);
+ assertEquals(1000, endpoint.getRequestTimeoutCheckerInterval());
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
new file mode 100644
index 0000000..5c1223e
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.rabbitmq.testbeans.TestSerializableObject;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class RabbitMQInOutIntTest extends CamelTestSupport {
+
+ private static final String EXCHANGE = "ex5";
+ public static final String ROUTING_KEY = "rk5";
+ public static final long TIMEOUT_MS = 2000;
+
+ @Produce(uri = "direct:start")
+ protected ProducerTemplate template;
+
+ @Produce(uri = "direct:rabbitMQ")
+ protected ProducerTemplate directProducer;
+
+ @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?exchangeType=direct&username=cameltest&password=cameltest" + "&autoAck=true&queue=q4&routingKey=" + ROUTING_KEY
+ + "&transferException=true&requestTimeout=" + TIMEOUT_MS)
+ private Endpoint rabbitMQEndpoint;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+
+ from("direct:rabbitMQ").id("producingRoute").setHeader("routeHeader", simple("routeHeader")).inOut(rabbitMQEndpoint);
+
+ from(rabbitMQEndpoint).id("consumingRoute").log("Receiving message").process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ if (exchange.getIn().getBody(TestSerializableObject.class) != null) {
+ TestSerializableObject foo = exchange.getIn().getBody(TestSerializableObject.class);
+ foo.setDescription("foobar");
+ }
+
+ else if (exchange.getIn().getBody(String.class) != null) {
+ if (exchange.getIn().getBody(String.class).contains("header")) {
+ assertEquals(exchange.getIn().getHeader("String"), "String");
+ assertEquals(exchange.getIn().getHeader("routeHeader"), "routeHeader");
+ }
+
+ if (exchange.getIn().getBody(String.class).contains("Exception")) {
+ throw new IllegalArgumentException("Boom");
+ }
+
+ if (exchange.getIn().getBody(String.class).contains("TimeOut")) {
+ Thread.sleep(TIMEOUT_MS * 2);
+ }
+
+ exchange.getIn().setBody(exchange.getIn().getBody(String.class) + " response");
+ }
+
+ }
+ });
+ }
+ };
+ }
+
+ @Test
+ public void inOutRaceConditionTest1() throws InterruptedException, IOException {
+ String reply = template.requestBodyAndHeader("direct:rabbitMQ", "test1", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class);
+ assertEquals("test1 response", reply);
+ }
+
+ @Test
+ public void inOutRaceConditionTest2() throws InterruptedException, IOException {
+ String reply = template.requestBodyAndHeader("direct:rabbitMQ", "test2", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class);
+ assertEquals("test2 response", reply);
+ }
+
+ @Test
+ public void headerTest() throws InterruptedException, IOException {
+ Map<String, Object> headers = new HashMap<>();
+
+ TestSerializableObject testObject = new TestSerializableObject();
+ testObject.setName("header");
+
+ headers.put("String", "String");
+ headers.put("Boolean", new Boolean(false));
+
+ // This will blow up the connection if not removed before sending the message
+ headers.put("TestObject1", testObject);
+ // This will blow up the connection if not removed before sending the message
+ headers.put("class", testObject.getClass());
+ // This will mess up de-serialization if not removed before sending the message
+ headers.put("CamelSerialize", true);
+
+ // populate a map and an arrayList
+ Map<Object, Object> tmpMap = new HashMap<>();
+ List<String> tmpList = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ String name = "header" + i;
+ tmpList.add(name);
+ tmpMap.put(name, name);
+ }
+ // This will blow up the connection if not removed before sending the message
+ headers.put("arrayList", tmpList);
+ // This will blow up the connection if not removed before sending the message
+ headers.put("map", tmpMap);
+
+ String reply = template.requestBodyAndHeaders("direct:rabbitMQ", "header", headers, String.class);
+ assertEquals("header response", reply);
+ }
+
+ @Test
+ public void serializeTest() throws InterruptedException, IOException {
+ TestSerializableObject foo = new TestSerializableObject();
+ foo.setName("foobar");
+
+ TestSerializableObject reply = template.requestBodyAndHeader("direct:rabbitMQ", foo, RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, TestSerializableObject.class);
+ assertEquals("foobar", reply.getName());
+ assertEquals("foobar", reply.getDescription());
+ }
+
+ @Test
+ public void testSerializableObject() throws IOException {
+ TestSerializableObject foo = new TestSerializableObject();
+ foo.setName("foobar");
+
+ byte[] body = null;
+ try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) {
+ o.writeObject(foo);
+ body = b.toByteArray();
+ }
+
+ TestSerializableObject newFoo = null;
+ try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b)) {
+ newFoo = (TestSerializableObject) o.readObject();
+ } catch (IOException | ClassNotFoundException e) {
+ }
+ assertEquals(foo.getName(), newFoo.getName());
+ }
+
+ @Test
+ public void inOutExceptionTest() {
+ try {
+ template.requestBodyAndHeader("direct:rabbitMQ", "Exception", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class);
+ fail("This should have thrown an exception");
+ } catch (CamelExecutionException e) {
+ assertEquals(e.getCause().getClass(), IllegalArgumentException.class);
+ } catch (Exception e) {
+ fail("This should have caught CamelExecutionException");
+ }
+ }
+
+ @Test
+ public void inOutTimeOutTest() {
+ try {
+ template.requestBodyAndHeader("direct:rabbitMQ", "TimeOut", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class);
+ fail("This should have thrown a timeOut exception");
+ } catch (CamelExecutionException e) {
+ // expected
+ } catch (Exception e) {
+ fail("This should have caught CamelExecutionException");
+ }
+ }
+
+ @Test
+ public void inOutNullTest() {
+ template.requestBodyAndHeader("direct:rabbitMQ", null, RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, Object.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
index 7b2df60..cefece5 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
@@ -51,6 +51,7 @@ public class RabbitMQProducerTest {
Mockito.when(exchange.getIn()).thenReturn(message);
Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
Mockito.when(conn.createChannel()).thenReturn(null);
+ Mockito.when(endpoint.getMessageConverter()).thenReturn(new RabbitMQMessageConverter());
}
@Test
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
new file mode 100644
index 0000000..1fdffd0
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java
@@ -0,0 +1,26 @@
+package org.apache.camel.component.rabbitmq.testbeans;
+
+import java.io.Serializable;
+
+public class TestSerializableObject implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private String description;
+ private String name;
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+}
\ No newline at end of file
[2/2] camel git commit: CAMEL-8538 Add inOut support to the
camel-rabitmq component
Posted by da...@apache.org.
CAMEL-8538 Add inOut support to the camel-rabitmq component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8fe4288f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8fe4288f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8fe4288f
Branch: refs/heads/master
Commit: 8fe4288f2a19d1b8894f65b04a7ac871983b8938
Parents: 43b7953
Author: Brad Reitmeyer <br...@cisco.com>
Authored: Tue Apr 28 13:57:39 2015 -0500
Committer: Brad Reitmeyer <gi...@bradreitmeyer.com>
Committed: Mon Jun 8 16:24:39 2015 -0500
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQComponent.java | 4 +-
.../component/rabbitmq/RabbitMQConstants.java | 1 +
.../component/rabbitmq/RabbitMQConsumer.java | 71 +----
.../component/rabbitmq/RabbitMQEndpoint.java | 228 +++++++++++++-
.../rabbitmq/RabbitMQMessageConverter.java | 219 ++++++++++++++
.../component/rabbitmq/RabbitMQProducer.java | 296 ++++++++++---------
.../camel/component/rabbitmq/ReplyToType.java | 26 ++
.../rabbitmq/reply/CorrelationListener.java | 44 +++
.../rabbitmq/reply/CorrelationTimeoutMap.java | 120 ++++++++
.../rabbitmq/reply/MessageSentCallback.java | 38 +++
.../rabbitmq/reply/QueueReplyHandler.java | 34 +++
.../component/rabbitmq/reply/ReplyHandler.java | 43 +++
.../component/rabbitmq/reply/ReplyHolder.java | 123 ++++++++
.../component/rabbitmq/reply/ReplyManager.java | 76 +++++
.../rabbitmq/reply/ReplyManagerSupport.java | 238 +++++++++++++++
.../reply/TemporaryQueueReplyHandler.java | 70 +++++
.../reply/TemporaryQueueReplyManager.java | 156 ++++++++++
...ageIdAsCorrelationIdMessageSentCallback.java | 51 ++++
.../rabbitmq/RabbitMQEndpointTest.java | 38 ++-
.../rabbitmq/RabbitMQInOutIntTest.java | 200 +++++++++++++
.../rabbitmq/RabbitMQProducerTest.java | 1 +
.../testbeans/TestSerializableObject.java | 26 ++
22 files changed, 1898 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 54dd2bf..c125421 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -18,14 +18,16 @@ package org.apache.camel.component.rabbitmq;
import java.net.URI;
import java.util.Map;
+
import javax.net.ssl.TrustManager;
-import com.rabbitmq.client.ConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.UriEndpointComponent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.rabbitmq.client.ConnectionFactory;
+
public class RabbitMQComponent extends UriEndpointComponent {
private static final Logger LOG = LoggerFactory.getLogger(RabbitMQComponent.class);
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
index f2e5568..cf4ab45 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
@@ -28,6 +28,7 @@ public final class RabbitMQConstants {
public static final String DELIVERY_MODE = "rabbitmq.DELIVERY_MODE";
public static final String USERID = "rabbitmq.USERID";
public static final String CLUSTERID = "rabbitmq.CLUSTERID";
+ public static final String REQUEST_TIMEOUT = "rabbitmq.REQUEST_TIMEOUT";
public static final String REPLY_TO = "rabbitmq.REPLY_TO";
public static final String CONTENT_ENCODING = "rabbitmq.CONTENT_ENCODING";
public static final String TYPE = "rabbitmq.TYPE";
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index a4a6362..bf142ce 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -27,15 +27,17 @@ import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
+
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
+
public class RabbitMQConsumer extends DefaultConsumer {
- ExecutorService executor;
- Connection conn;
+ private ExecutorService executor;
+ private Connection conn;
private int closeTimeout = 30 * 1000;
private final RabbitMQEndpoint endpoint;
@@ -55,7 +57,6 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
@Override
-
public RabbitMQEndpoint getEndpoint() {
return (RabbitMQEndpoint) super.getEndpoint();
}
@@ -79,7 +80,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
// setup the basicQos
if (endpoint.isPrefetchEnabled()) {
channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
- endpoint.isPrefetchGlobal());
+ endpoint.isPrefetchGlobal());
}
return channel;
}
@@ -182,10 +183,11 @@ public class RabbitMQConsumer extends DefaultConsumer {
AMQP.BasicProperties properties, byte[] body) throws IOException {
Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
- mergeAmqpProperties(exchange, properties);
+ endpoint.getMessageConverter().mergeAmqpProperties(exchange, properties);
boolean sendReply = properties.getReplyTo() != null;
if (sendReply && !exchange.getPattern().isOutCapable()) {
+ log.debug("In an inOut capable route");
exchange.setPattern(ExchangePattern.InOut);
}
@@ -208,17 +210,20 @@ public class RabbitMQConsumer extends DefaultConsumer {
if (!exchange.isFailed()) {
// processing success
if (sendReply && exchange.getPattern().isOutCapable()) {
- AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
- .headers(msg.getHeaders())
- .correlationId(properties.getCorrelationId())
- .build();
- channel.basicPublish("", properties.getReplyTo(), replyProps, msg.getBody(byte[].class));
+ endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo());
}
if (!consumer.endpoint.isAutoAck()) {
log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
channel.basicAck(deliveryTag, false);
}
- } else {
+ }
+ else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) {
+ // the inOut exchange failed so put the exception in the body and send back
+ msg.setBody(exchange.getException());
+ exchange.setOut(msg);
+ endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo());
+ }
+ else {
boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
// processing failed, then reject and handle the exception
if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) {
@@ -236,49 +241,6 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
/**
- * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
- */
- private void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) {
-
- if (properties.getType() != null) {
- exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType());
- }
- if (properties.getAppId() != null) {
- exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId());
- }
- if (properties.getClusterId() != null) {
- exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId());
- }
- if (properties.getContentEncoding() != null) {
- exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding());
- }
- if (properties.getContentType() != null) {
- exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType());
- }
- if (properties.getCorrelationId() != null) {
- exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId());
- }
- if (properties.getExpiration() != null) {
- exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration());
- }
- if (properties.getMessageId() != null) {
- exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId());
- }
- if (properties.getPriority() != null) {
- exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority());
- }
- if (properties.getReplyTo() != null) {
- exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo());
- }
- if (properties.getTimestamp() != null) {
- exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp());
- }
- if (properties.getUserId() != null) {
- exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId());
- }
- }
-
- /**
* Bind consumer to channel
*/
public void start() throws IOException {
@@ -333,5 +295,4 @@ public class RabbitMQConsumer extends DefaultConsumer {
return null;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 7638682..6cd0bca 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -16,7 +16,13 @@
*/
package org.apache.camel.component.rabbitmq;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
@@ -25,6 +31,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+
import javax.net.ssl.TrustManager;
import com.rabbitmq.client.AMQP;
@@ -34,11 +41,14 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.LongString;
+
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
+import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.TypeConversionException;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.DefaultMessage;
@@ -46,9 +56,12 @@ import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@UriEndpoint(scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName", consumerClass = RabbitMQConsumer.class, label = "messaging")
public class RabbitMQEndpoint extends DefaultEndpoint {
+ private static final Logger LOG = LoggerFactory.getLogger(RabbitMQEndpoint.class);
@UriPath @Metadata(required = "true")
private String hostname;
@@ -135,6 +148,24 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
@UriParam
private ArgsConfigurer exchangeArgsConfigurer;
+ @UriParam
+ private long requestTimeout = 20000;
+ @UriParam
+ private long requestTimeoutCheckerInterval = 1000;
+ @UriParam
+ private boolean transferException = false;
+ // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
+ private boolean useMessageIDAsCorrelationID = true;
+ // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
+ private String replyToType = ReplyToType.Temporary.name();
+ // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
+ private String replyTo = null;
+
+ private RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter();
+
+ // header to indicate that the message body needs to be de-serialized
+ private static final String SERIALIZE_HEADER = "CamelSerialize";
+
public RabbitMQEndpoint() {
}
@@ -150,12 +181,34 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern());
- Message message = new DefaultMessage();
- exchange.setIn(message);
+ setRabbitExchange(exchange, envelope, properties, body);
+ return exchange;
+ }
- message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey());
- message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange());
- message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag());
+ /**
+ * Gets the message converter to convert between rabbit and camel
+ * @return
+ */
+ protected RabbitMQMessageConverter getMessageConverter() {
+ return messageConverter;
+ }
+
+ public void setRabbitExchange(Exchange camelExchange, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
+ Message message;
+ if (camelExchange.getIn() != null) {
+ // Use the existing message so we keep the headers
+ message = camelExchange.getIn();
+ }
+ else {
+ message = new DefaultMessage();
+ camelExchange.setIn(message);
+ }
+
+ if (envelope != null) {
+ message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey());
+ message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange());
+ message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag());
+ }
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
@@ -169,9 +222,109 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
}
}
- message.setBody(body);
+ if (hasSerializeHeader(properties)) {
+ Object messageBody = null;
+ try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b)) {
+ messageBody = o.readObject();
+ } catch (IOException | ClassNotFoundException e) {
+ LOG.warn("Could not deserialize the object");
+ }
+ if (messageBody instanceof Throwable) {
+ LOG.debug("Reply was an Exception. Setting the Exception on the Exchange");
+ camelExchange.setException((Throwable) messageBody);
+ } else {
+ message.setBody(messageBody);
+ }
+ } else {
+ // Set the body as a byte[] and let the type converter deal with it
+ message.setBody(body);
+ }
- return exchange;
+ }
+
+ private boolean hasSerializeHeader(AMQP.BasicProperties properties) {
+ if (properties == null || properties.getHeaders() == null) {
+ return false;
+ }
+ if (properties.getHeaders().containsKey(SERIALIZE_HEADER) && properties.getHeaders().get(SERIALIZE_HEADER).equals(true)) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Sends the body that is on the exchange
+ * @param camelExchange
+ * @param channel
+ * @param properties
+ * @throws IOException
+ */
+ public void publishExchangeToChannel(Exchange camelExchange, Channel channel, String routingKey) throws IOException {
+ Message msg;
+ if (camelExchange.hasOut()) {
+ msg = camelExchange.getOut();
+ } else {
+ msg = camelExchange.getIn();
+ }
+
+ // Remove the SERIALIZE_HEADER in case it was previously set
+ if (msg.getHeaders() != null && msg.getHeaders().containsKey(SERIALIZE_HEADER)) {
+ LOG.debug("Removing the {} header", SERIALIZE_HEADER);
+ msg.getHeaders().remove(SERIALIZE_HEADER);
+ }
+
+ AMQP.BasicProperties properties;
+ byte[] body;
+ try {
+ // To maintain backwards compatibility try the TypeConverter (The DefaultTypeConverter seems to only work on Strings)
+ body = camelExchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, camelExchange, msg.getBody());
+
+ properties = getMessageConverter().buildProperties(camelExchange).build();
+ } catch (NoTypeConversionAvailableException | TypeConversionException e) {
+ if (msg.getBody() instanceof Serializable) {
+ // Add the header so the reply processor knows to de-serialize it
+ msg.getHeaders().put(SERIALIZE_HEADER, true);
+
+ properties = getMessageConverter().buildProperties(camelExchange).build();
+
+ try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) {
+ o.writeObject(msg.getBody());
+ body = b.toByteArray();
+ }
+ }
+ else if (msg.getBody() == null) {
+ properties = getMessageConverter().buildProperties(camelExchange).build();
+ body = null;
+ }
+ else {
+ LOG.warn("Could not convert {} to byte[]", msg.getBody());
+ throw new IOException(e);
+ }
+ }
+ String rabbitExchange = getExchangeName(msg);
+
+ Boolean mandatory = camelExchange.getIn().getHeader(RabbitMQConstants.MANDATORY, isMandatory(), Boolean.class);
+ Boolean immediate = camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, isImmediate(), Boolean.class);
+
+
+ LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId());
+
+ channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body);
+ }
+
+ /**
+ * Extracts name of the rabbitmq exchange
+ *
+ * @param msg
+ * @return
+ */
+ protected String getExchangeName(Message msg) {
+ String exchangeName = msg.getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class);
+ // If it is BridgeEndpoint we should ignore the message header of EXCHANGE_NAME
+ if (exchangeName == null || isBridgeEndpoint()) {
+ exchangeName = getExchangeName();
+ }
+ return exchangeName;
}
@Override
@@ -712,9 +865,6 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
return channelPoolMaxSize;
}
- /**
- * Set maximum number of opened channel in pool
- */
public void setChannelPoolMaxSize(int channelPoolMaxSize) {
this.channelPoolMaxSize = channelPoolMaxSize;
}
@@ -763,14 +913,14 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
public ArgsConfigurer getQueueArgsConfigurer() {
return queueArgsConfigurer;
}
-
+
/**
* Set the configurer for setting the queue args in Channel.queueDeclare
*/
public void setQueueArgsConfigurer(ArgsConfigurer queueArgsConfigurer) {
this.queueArgsConfigurer = queueArgsConfigurer;
}
-
+
public ArgsConfigurer getExchangeArgsConfigurer() {
return exchangeArgsConfigurer;
}
@@ -781,4 +931,58 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
public void setExchangeArgsConfigurer(ArgsConfigurer exchangeArgsConfigurer) {
this.exchangeArgsConfigurer = exchangeArgsConfigurer;
}
+
+ /**
+ * Set timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds)
+ */
+ public void setRequestTimeout(long requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ /**
+ * Set requestTimeoutCheckerInterval for inOut exchange
+ */
+ public void setRequestTimeoutCheckerInterval(long requestTimeoutCheckerInterval) {
+ this.requestTimeoutCheckerInterval = requestTimeoutCheckerInterval;
+ }
+
+ public long getRequestTimeoutCheckerInterval() {
+ return requestTimeoutCheckerInterval;
+ }
+
+ /**
+ * Get useMessageIDAsCorrelationID for inOut exchange
+ */
+ public boolean isUseMessageIDAsCorrelationID() {
+ return useMessageIDAsCorrelationID;
+ }
+
+ /**
+ * When true and an inOut Exchange failed on the consumer side send the caused Exception back in the response
+ */
+ public void setTransferException(boolean transferException) {
+ this.transferException = transferException;
+ }
+
+ public boolean isTransferException() {
+ return transferException;
+ }
+
+ /**
+ * Get replyToType for inOut exchange
+ */
+ public String getReplyToType() {
+ return replyToType;
+ }
+
+ /**
+ * Gets the Queue to reply to if you dont want to use temporary reply queues
+ */
+ public String getReplyTo() {
+ return replyTo;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
new file mode 100644
index 0000000..95abe81
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.LongString;
+
+public class RabbitMQMessageConverter {
+ protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessageConverter.class);
+
+ /**
+ * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
+ */
+ public void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) {
+
+ if (properties.getType() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType());
+ }
+ if (properties.getAppId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId());
+ }
+ if (properties.getClusterId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId());
+ }
+ if (properties.getContentEncoding() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding());
+ }
+ if (properties.getContentType() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType());
+ }
+ if (properties.getCorrelationId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId());
+ }
+ if (properties.getExpiration() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration());
+ }
+ if (properties.getMessageId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId());
+ }
+ if (properties.getPriority() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority());
+ }
+ if (properties.getReplyTo() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo());
+ }
+ if (properties.getTimestamp() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp());
+ }
+ if (properties.getUserId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId());
+ }
+ }
+
+ public AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
+ AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
+
+ Message msg;
+ if (exchange.hasOut()) {
+ msg = exchange.getOut();
+ } else {
+ msg = exchange.getIn();
+ }
+
+ final Object contentType = msg.getHeader(RabbitMQConstants.CONTENT_TYPE);
+ if (contentType != null) {
+ properties.contentType(contentType.toString());
+ }
+
+ final Object priority = msg.getHeader(RabbitMQConstants.PRIORITY);
+ if (priority != null) {
+ properties.priority(Integer.parseInt(priority.toString()));
+ }
+
+ final Object messageId = msg.getHeader(RabbitMQConstants.MESSAGE_ID);
+ if (messageId != null) {
+ properties.messageId(messageId.toString());
+ }
+
+ final Object clusterId = msg.getHeader(RabbitMQConstants.CLUSTERID);
+ if (clusterId != null) {
+ properties.clusterId(clusterId.toString());
+ }
+
+ final Object replyTo = msg.getHeader(RabbitMQConstants.REPLY_TO);
+ if (replyTo != null) {
+ properties.replyTo(replyTo.toString());
+ }
+
+ final Object correlationId = msg.getHeader(RabbitMQConstants.CORRELATIONID);
+ if (correlationId != null) {
+ properties.correlationId(correlationId.toString());
+ }
+
+ final Object deliveryMode = msg.getHeader(RabbitMQConstants.DELIVERY_MODE);
+ if (deliveryMode != null) {
+ properties.deliveryMode(Integer.parseInt(deliveryMode.toString()));
+ }
+
+ final Object userId = msg.getHeader(RabbitMQConstants.USERID);
+ if (userId != null) {
+ properties.userId(userId.toString());
+ }
+
+ final Object type = msg.getHeader(RabbitMQConstants.TYPE);
+ if (type != null) {
+ properties.type(type.toString());
+ }
+
+ final Object contentEncoding = msg.getHeader(RabbitMQConstants.CONTENT_ENCODING);
+ if (contentEncoding != null) {
+ properties.contentEncoding(contentEncoding.toString());
+ }
+
+ final Object expiration = msg.getHeader(RabbitMQConstants.EXPIRATION);
+ if (expiration != null) {
+ properties.expiration(expiration.toString());
+ }
+
+ final Object appId = msg.getHeader(RabbitMQConstants.APP_ID);
+ if (appId != null) {
+ properties.appId(appId.toString());
+ }
+
+ final Object timestamp = msg.getHeader(RabbitMQConstants.TIMESTAMP);
+ if (timestamp != null) {
+ properties.timestamp(new Date(Long.parseLong(timestamp.toString())));
+ }
+
+ final Map<String, Object> headers = msg.getHeaders();
+ Map<String, Object> filteredHeaders = new HashMap<String, Object>();
+
+ // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader
+ for (Map.Entry<String, Object> header : headers.entrySet()) {
+
+ // filter header values.
+ Object value = getValidRabbitMQHeaderValue(header.getValue());
+ if (value != null) {
+ filteredHeaders.put(header.getKey(), header.getValue());
+ } else if (LOG.isDebugEnabled()) {
+ if (header.getValue() == null) {
+ LOG.debug("Ignoring header: {} with null value", header.getKey());
+ } else {
+ LOG.debug("Ignoring header: {} of class: {} with value: {}",
+ new Object[] { header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue() });
+ }
+ }
+ }
+
+ properties.headers(filteredHeaders);
+
+ return properties;
+ }
+
+ /**
+ * Strategy to test if the given header is valid. Without this, the
+ * com.rabbitmq.client.impl.Frame.java class will throw an
+ * IllegalArgumentException (invalid value in table) and close the
+ * connection.
+ *
+ * @param headerValue the header value
+ * @return the value to use, <tt>null</tt> to ignore this header
+ * @see com.rabbitmq.client.impl.Frame#fieldValueSize
+ */
+ private Object getValidRabbitMQHeaderValue(Object headerValue) {
+ if (headerValue instanceof String) {
+ return headerValue;
+ } else if (headerValue instanceof BigDecimal) {
+ return headerValue;
+ } else if (headerValue instanceof Number) {
+ return headerValue;
+ } else if (headerValue instanceof Boolean) {
+ return headerValue;
+ } else if (headerValue instanceof Date) {
+ return headerValue;
+ } else if (headerValue instanceof byte[]) {
+ return headerValue;
+ } else if (headerValue instanceof LongString) {
+ return headerValue;
+ } else if (headerValue instanceof Timestamp) {
+ return headerValue;
+ } else if (headerValue instanceof Byte) {
+ return headerValue;
+ } else if (headerValue instanceof Double) {
+ return headerValue;
+ } else if (headerValue instanceof Float) {
+ return headerValue;
+ } else if (headerValue instanceof Long) {
+ return headerValue;
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 28858a6..b8c8ba2 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -17,28 +17,38 @@
package org.apache.camel.component.rabbitmq;
import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
+
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.component.rabbitmq.pool.PoolableChannelFactory;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.component.rabbitmq.reply.ReplyManager;
+import org.apache.camel.component.rabbitmq.reply.TemporaryQueueReplyManager;
+import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
-public class RabbitMQProducer extends DefaultProducer {
+public class RabbitMQProducer extends DefaultAsyncProducer {
private Connection conn;
private ObjectPool<Channel> channelPool;
private ExecutorService executorService;
private int closeTimeout = 30 * 1000;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-";
+ private ReplyManager replyManager;
public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
super(endpoint);
@@ -115,6 +125,7 @@ public class RabbitMQProducer extends DefaultProducer {
@Override
protected void doStop() throws Exception {
+ unInitReplyManager();
closeConnectionAndChannel();
if (executorService != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
@@ -122,13 +133,76 @@ public class RabbitMQProducer extends DefaultProducer {
}
}
- @Override
- public void process(Exchange exchange) throws Exception {
- String exchangeName = exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class);
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ // deny processing if we are not started
+ if (!isRunAllowed()) {
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ // we cannot process so invoke callback
+ callback.done(true);
+ return true;
+ }
+
+ try {
+ if (exchange.getPattern().isOutCapable()) {
+ // in out requires a bit more work than in only
+ return processInOut(exchange, callback);
+ } else {
+ // in only
+ return processInOnly(exchange, callback);
+ }
+ } catch (Throwable e) {
+ // must catch exception to ensure callback is invoked as expected
+ // to let Camel error handling deal with this
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+ }
+
+ protected boolean processInOut(final Exchange exchange, final AsyncCallback callback) throws Exception {
+ final org.apache.camel.Message in = exchange.getIn();
+
+ initReplyManager();
+
+ // the request timeout can be overruled by a header otherwise the endpoint configured value is used
+ final long timeout = exchange.getIn().getHeader(RabbitMQConstants.REQUEST_TIMEOUT, getEndpoint().getRequestTimeout(), long.class);
+
+ final String originalCorrelationId = in.getHeader(RabbitMQConstants.CORRELATIONID, String.class);
+
+ // we append the 'Camel-' prefix to know it was generated by us
+ String correlationId = GENERATED_CORRELATION_ID_PREFIX + getEndpoint().getCamelContext().getUuidGenerator().generateUuid();
+ in.setHeader(RabbitMQConstants.CORRELATIONID, correlationId);
+
+ in.setHeader(RabbitMQConstants.REPLY_TO, replyManager.getReplyTo());
+
+ String exchangeName = in.getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class);
// If it is BridgeEndpoint we should ignore the message header of EXCHANGE_NAME
if (exchangeName == null || getEndpoint().isBridgeEndpoint()) {
exchangeName = getEndpoint().getExchangeName();
}
+
+ String key = in.getHeader(RabbitMQConstants.ROUTING_KEY, null, String.class);
+ // we just need to make sure RoutingKey option take effect if it is not BridgeEndpoint
+ if (key == null || getEndpoint().isBridgeEndpoint()) {
+ key = getEndpoint().getRoutingKey() == null ? "" : getEndpoint().getRoutingKey();
+ }
+ if (ObjectHelper.isEmpty(key) && ObjectHelper.isEmpty(exchangeName)) {
+ throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + getEndpoint());
+ }
+ log.debug("Registering reply for {}", correlationId);
+
+ replyManager.registerReply(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout);
+
+ basicPublish(exchange, exchangeName, key);
+ // continue routing asynchronously (reply will be processed async when its received)
+ return false;
+ }
+
+ private boolean processInOnly(Exchange exchange, AsyncCallback callback) throws Exception {
+ String exchangeName = getEndpoint().getExchangeName(exchange.getIn());
+
String key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY, null, String.class);
// we just need to make sure RoutingKey option take effect if it is not BridgeEndpoint
if (key == null || getEndpoint().isBridgeEndpoint()) {
@@ -137,12 +211,13 @@ public class RabbitMQProducer extends DefaultProducer {
if (ObjectHelper.isEmpty(key) && ObjectHelper.isEmpty(exchangeName)) {
throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + getEndpoint());
}
- byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class);
- AMQP.BasicProperties properties = buildProperties(exchange).build();
- Boolean mandatory = exchange.getIn().getHeader(RabbitMQConstants.MANDATORY, getEndpoint().isMandatory(), Boolean.class);
- Boolean immediate = exchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, getEndpoint().isImmediate(), Boolean.class);
-
- basicPublish(exchangeName, key, mandatory, immediate, properties, messageBodyBytes);
+
+ basicPublish(exchange, exchangeName, key);
+ if (callback != null) {
+ // we are synchronous so return true
+ callback.done(true);
+ }
+ return true;
}
/**
@@ -150,13 +225,10 @@ public class RabbitMQProducer extends DefaultProducer {
*
* @param exchange Target exchange
* @param routingKey Routing key
- * @param mandatory This flag tells the server how to react if the message cannot be routed to a queue.
- * @param immediate This flag tells the server how to react if the message cannot be routed to a queue consumer immediately.
* @param properties Header properties
* @param body Body content
*/
- private void basicPublish(final String exchange, final String routingKey, final boolean mandatory, final boolean immediate,
- final AMQP.BasicProperties properties, final byte[] body) throws Exception {
+ private void basicPublish(final Exchange camelExchange, final String rabbitExchange, final String routingKey) throws Exception {
if (channelPool == null) {
// Open connection and channel lazily
openConnectionAndChannelPool();
@@ -164,135 +236,95 @@ public class RabbitMQProducer extends DefaultProducer {
execute(new ChannelCallback<Void>() {
@Override
public Void doWithChannel(Channel channel) throws Exception {
- channel.basicPublish(exchange, routingKey, mandatory, immediate, properties, body);
+ getEndpoint().publishExchangeToChannel(camelExchange, channel, routingKey);
return null;
}
});
}
AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
- AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
-
- final Object contentType = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_TYPE);
- if (contentType != null) {
- properties.contentType(contentType.toString());
- }
-
- final Object priority = exchange.getIn().getHeader(RabbitMQConstants.PRIORITY);
- if (priority != null) {
- properties.priority(Integer.parseInt(priority.toString()));
- }
-
- final Object messageId = exchange.getIn().getHeader(RabbitMQConstants.MESSAGE_ID);
- if (messageId != null) {
- properties.messageId(messageId.toString());
- }
-
- final Object clusterId = exchange.getIn().getHeader(RabbitMQConstants.CLUSTERID);
- if (clusterId != null) {
- properties.clusterId(clusterId.toString());
- }
-
- final Object replyTo = exchange.getIn().getHeader(RabbitMQConstants.REPLY_TO);
- if (replyTo != null) {
- properties.replyTo(replyTo.toString());
- }
-
- final Object correlationId = exchange.getIn().getHeader(RabbitMQConstants.CORRELATIONID);
- if (correlationId != null) {
- properties.correlationId(correlationId.toString());
- }
-
- final Object deliveryMode = exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_MODE);
- if (deliveryMode != null) {
- properties.deliveryMode(Integer.parseInt(deliveryMode.toString()));
- }
-
- final Object userId = exchange.getIn().getHeader(RabbitMQConstants.USERID);
- if (userId != null) {
- properties.userId(userId.toString());
- }
-
- final Object type = exchange.getIn().getHeader(RabbitMQConstants.TYPE);
- if (type != null) {
- properties.type(type.toString());
- }
-
- final Object contentEncoding = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_ENCODING);
- if (contentEncoding != null) {
- properties.contentEncoding(contentEncoding.toString());
- }
-
- final Object expiration = exchange.getIn().getHeader(RabbitMQConstants.EXPIRATION);
- if (expiration != null) {
- properties.expiration(expiration.toString());
- }
+ return getEndpoint().getMessageConverter().buildProperties(exchange);
+ }
- final Object appId = exchange.getIn().getHeader(RabbitMQConstants.APP_ID);
- if (appId != null) {
- properties.appId(appId.toString());
- }
+ public int getCloseTimeout() {
+ return closeTimeout;
+ }
- final Object timestamp = exchange.getIn().getHeader(RabbitMQConstants.TIMESTAMP);
- if (timestamp != null) {
- properties.timestamp(new Date(Long.parseLong(timestamp.toString())));
- }
+ public void setCloseTimeout(int closeTimeout) {
+ this.closeTimeout = closeTimeout;
+ }
- final Map<String, Object> headers = exchange.getIn().getHeaders();
- Map<String, Object> filteredHeaders = new HashMap<String, Object>();
-
- // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader
- for (Map.Entry<String, Object> header : headers.entrySet()) {
-
- // filter header values.
- Object value = getValidRabbitMQHeaderValue(header.getValue());
- if (value != null) {
- filteredHeaders.put(header.getKey(), header.getValue());
- } else if (log.isDebugEnabled()) {
- if (header.getValue() == null) {
- log.debug("Ignoring header: {} with null value", header.getKey());
- } else {
- log.debug("Ignoring header: {} of class: {} with value: {}",
- new Object[]{header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue()});
+ protected void initReplyManager() {
+ if (!started.get()) {
+ synchronized (this) {
+ if (started.get()) {
+ return;
}
+ log.debug("Starting reply manager");
+ // must use the classloader from the application context when creating reply manager,
+ // as it should inherit the classloader from app context and not the current which may be
+ // a different classloader
+ ClassLoader current = Thread.currentThread().getContextClassLoader();
+ ClassLoader ac = getEndpoint().getCamelContext().getApplicationContextClassLoader();
+ try {
+ if (ac != null) {
+ Thread.currentThread().setContextClassLoader(ac);
+ }
+ // validate that replyToType and replyTo is configured accordingly
+ if (getEndpoint().getReplyToType() != null) {
+ // setting temporary with a fixed replyTo is not supported
+ if (getEndpoint().getReplyTo() != null && getEndpoint().getReplyToType().equals(ReplyToType.Temporary.name())) {
+ throw new IllegalArgumentException("ReplyToType " + ReplyToType.Temporary
+ + " is not supported when replyTo " + getEndpoint().getReplyTo() + " is also configured.");
+ }
+ }
+
+ if (getEndpoint().getReplyTo() != null) {
+ // specifying reply queues is not currently supported
+ throw new IllegalArgumentException("Specifying replyTo " + getEndpoint().getReplyTo() + " is currently not supported.");
+ } else {
+ replyManager = createReplyManager();
+ log.debug("Using RabbitMQReplyManager: {} to process replies from temporary queue", replyManager);
+ }
+ } catch (Exception e) {
+ throw new FailedToCreateProducerException(getEndpoint(), e);
+ } finally {
+ if (ac != null) {
+ Thread.currentThread().setContextClassLoader(current);
+ }
+ }
+ started.set(true);
}
}
-
- properties.headers(filteredHeaders);
-
- return properties;
}
- /**
- * Strategy to test if the given header is valid
- *
- * @param headerValue the header value
- * @return the value to use, <tt>null</tt> to ignore this header
- * @see com.rabbitmq.client.impl.Frame#fieldValueSize
- */
- private Object getValidRabbitMQHeaderValue(Object headerValue) {
- if (headerValue instanceof String) {
- return headerValue;
- } else if (headerValue instanceof BigDecimal) {
- return headerValue;
- } else if (headerValue instanceof Number) {
- return headerValue;
- } else if (headerValue instanceof Boolean) {
- return headerValue;
- } else if (headerValue instanceof Date) {
- return headerValue;
- } else if (headerValue instanceof byte[]) {
- return headerValue;
+ protected void unInitReplyManager() {
+ try {
+ if (replyManager != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Stopping JmsReplyManager: {} from processing replies from: {}", replyManager,
+ getEndpoint().getReplyTo() != null ? getEndpoint().getReplyTo() : "temporary queue");
+ }
+ ServiceHelper.stopService(replyManager);
+ }
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } finally {
+ started.set(false);
}
- return null;
}
- public int getCloseTimeout() {
- return closeTimeout;
- }
+ protected ReplyManager createReplyManager() throws Exception {
+ // use a temporary queue
+ ReplyManager replyManager = new TemporaryQueueReplyManager(getEndpoint().getCamelContext());
+ replyManager.setEndpoint(getEndpoint());
- public void setCloseTimeout(int closeTimeout) {
- this.closeTimeout = closeTimeout;
- }
+ String name = "RabbitMQReplyManagerTimeoutChecker[" + getEndpoint().getExchangeName() + "]";
+ ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name);
+ replyManager.setScheduledExecutorService(replyManagerExecutorService);
+ log.info("Starting reply manager service " + name);
+ ServiceHelper.startService(replyManager);
-}
+ return replyManager;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ReplyToType.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ReplyToType.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ReplyToType.java
new file mode 100644
index 0000000..dc805d5
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ReplyToType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+/**
+ * Types for replyTo queues
+ *
+ * @version
+ */
+public enum ReplyToType {
+ Temporary, Shared, Exclusive
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationListener.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationListener.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationListener.java
new file mode 100644
index 0000000..c87381f
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationListener.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+/**
+ * Listener for events when correlation id's changes.
+ */
+public interface CorrelationListener {
+
+ /**
+ * Callback when a new correlation id is added
+ *
+ * @param key the correlation id
+ */
+ void onPut(String key);
+
+ /**
+ * Callback when a correlation id is removed
+ *
+ * @param key the correlation id
+ */
+ void onRemove(String key);
+
+ /**
+ * Callback when a correlation id is evicted due timeout
+ *
+ * @param key the correlation id
+ */
+ void onEviction(String key);
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
new file mode 100644
index 0000000..fad4fc0
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
@@ -0,0 +1,120 @@
+package org.apache.camel.component.rabbitmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.support.DefaultTimeoutMap;
+
+/**
+ * A {@link org.apache.camel.TimeoutMap} which is used to track reply messages which
+ * has been timed out, and thus should trigger the waiting {@link org.apache.camel.Exchange} to
+ * timeout as well.
+ *
+ * @version
+ */
+public class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
+
+ private CorrelationListener listener;
+
+ public CorrelationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+ super(executor, requestMapPollTimeMillis);
+ }
+
+ public void setListener(CorrelationListener listener) {
+ // there is only one listener needed
+ this.listener = listener;
+ }
+
+ public boolean onEviction(String key, ReplyHandler value) {
+ try {
+ if (listener != null) {
+ listener.onEviction(key);
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+
+ // trigger timeout
+ try {
+ value.onTimeout(key);
+ } catch (Throwable e) {
+ // must ignore so we ensure we evict the element
+ log.warn("Error processing onTimeout for correlationID: " + key + " due: " + e.getMessage() + ". This exception is ignored.", e);
+ }
+
+ // return true to remove the element
+ log.trace("Evicted correlationID: {}", key);
+ return true;
+ }
+
+ @Override
+ public ReplyHandler get(String key) {
+ ReplyHandler answer = super.get(key);
+ log.trace("Get correlationID: {} -> {}", key, answer != null);
+ return answer;
+ }
+
+ @Override
+ public ReplyHandler put(String key, ReplyHandler value, long timeoutMillis) {
+ try {
+ if (listener != null) {
+ listener.onPut(key);
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+
+ ReplyHandler result;
+ if (timeoutMillis <= 0) {
+ // no timeout (must use Integer.MAX_VALUE)
+ result = super.put(key, value, Integer.MAX_VALUE);
+ } else {
+ result = super.put(key, value, timeoutMillis);
+ }
+ log.info("Added correlationID: {} to timeout after: {} millis", key, timeoutMillis);
+ return result;
+ }
+
+ @Override
+ public ReplyHandler putIfAbsent(String key, ReplyHandler value, long timeoutMillis) {
+ log.info("in putIfAbsent with key {}", key);
+
+ try {
+ if (listener != null) {
+ listener.onPut(key);
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+
+ ReplyHandler result;
+ if (timeoutMillis <= 0) {
+ // no timeout (must use Integer.MAX_VALUE)
+ result = super.putIfAbsent(key, value, Integer.MAX_VALUE);
+ } else {
+ result = super.putIfAbsent(key, value, timeoutMillis);
+ }
+ if (result == null) {
+ log.trace("Added correlationID: {} to timeout after: {} millis", key, timeoutMillis);
+ } else {
+ log.trace("Duplicate correlationID: {} detected", key);
+ }
+ return result;
+ }
+
+ @Override
+ public ReplyHandler remove(String key) {
+ try {
+ if (listener != null) {
+ listener.onRemove(key);
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+
+ ReplyHandler answer = super.remove(key);
+ log.trace("Removed correlationID: {} -> {}", key, answer != null);
+ return answer;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java
new file mode 100644
index 0000000..09c2130
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import com.rabbitmq.client.Connection;
+
+
+
+/**
+ * Callback when a {@link Message} has been sent.
+ *
+ * @version
+ */
+public interface MessageSentCallback {
+
+ /**
+ * Callback when the message has been sent.
+ *
+ * @param session the session
+ * @param message the message
+ * @param destination the destination
+ */
+ void sent(Connection session, byte[] message, String destination);
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java
new file mode 100644
index 0000000..b4f5804
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * {@link ReplyHandler} to handle processing replies when using regular queues.
+ *
+ * @version
+ */
+public class QueueReplyHandler extends TemporaryQueueReplyHandler {
+
+ public QueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+ String originalCorrelationId, String correlationId, long timeout) {
+ super(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java
new file mode 100644
index 0000000..a8fe319
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import com.rabbitmq.client.AMQP;
+
+
+/**
+ * Handles a reply.
+ *
+ * @version
+ */
+public interface ReplyHandler {
+
+ /**
+ * The reply message was received
+ *
+ * @param correlationId the correlation id
+ * @param reply the reply message
+ */
+ void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply);
+
+ /**
+ * The reply message was not received and a timeout triggered
+ *
+ * @param correlationId the correlation id
+ */
+ void onTimeout(String correlationId);
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
new file mode 100644
index 0000000..d2890d0
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+import com.rabbitmq.client.AMQP;
+
+/**
+ * Holder which contains the {@link Exchange} and {@link org.apache.camel.AsyncCallback} to be used
+ * when the reply arrives, so we can set the reply on the {@link Exchange} and continue routing using the callback.
+ *
+ * @version
+ */
+public class ReplyHolder {
+
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+ private final byte[] message;
+ private final String originalCorrelationId;
+ private final String correlationId;
+ private long timeout;
+ private AMQP.BasicProperties properties;
+
+ /**
+ * Constructor to use when a reply message was received
+ */
+ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
+ String correlationId,AMQP.BasicProperties properties, byte[] message) {
+ this.exchange = exchange;
+ this.callback = callback;
+ this.originalCorrelationId = originalCorrelationId;
+ this.correlationId = correlationId;
+ this.properties = properties;
+ this.message = message;
+ }
+
+ /**
+ * Constructor to use when a timeout occurred
+ */
+ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
+ String correlationId, long timeout) {
+ this(exchange, callback, originalCorrelationId, correlationId, null, null);
+ this.timeout = timeout;
+ }
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+
+ public AsyncCallback getCallback() {
+ return callback;
+ }
+
+ /**
+ * Gets the original correlation id, if one was set when sending the message.
+ * <p/>
+ * Some JMS brokers will mess with the correlation id and send back a different/empty correlation id.
+ * So we need to remember it so we can restore the correlation id.
+ */
+ public String getOriginalCorrelationId() {
+ return originalCorrelationId;
+ }
+
+ /**
+ * Gets the correlation id
+ *
+ * @see #getOriginalCorrelationId()
+ */
+ public String getCorrelationId() {
+ return correlationId;
+ }
+
+ /**
+ * Gets the received message
+ *
+ * @return the received message, or <tt>null</tt> if timeout occurred and no message has been received
+ * @see #isTimeout()
+ */
+ public byte[] getMessage() {
+ return message;
+ }
+
+ /**
+ * Whether timeout triggered or not.
+ * <p/>
+ * A timeout is triggered if <tt>requestTimeout</tt> option has been configured, and a reply message has <b>not</b> been
+ * received within that time frame.
+ */
+ public boolean isTimeout() {
+ return message == null;
+ }
+
+ /**
+ * The timeout value
+ */
+ public long getRequestTimeout() {
+ return timeout;
+ }
+
+ /**
+ * The message properties
+ * @return
+ */
+ public AMQP.BasicProperties getProperties(){
+ return properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
new file mode 100644
index 0000000..7c1c015
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
@@ -0,0 +1,76 @@
+package org.apache.camel.component.rabbitmq.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
+
+
+/**
+ * The {@link ReplyManager} is responsible for handling <a href="http://camel.apache.org/request-reply.html">request-reply</a>
+ * over RabbitMQ.
+ *
+ * @version
+ */
+public interface ReplyManager {
+
+ /**
+ * Sets the belonging {@link org.apache.camel.component.jms.JmsEndpoint}.
+ */
+ void setEndpoint(RabbitMQEndpoint endpoint);
+
+ /**
+ * Sets the reply to queue the manager should listen for replies.
+ * <p/>
+ * The queue is either a temporary or a persistent queue.
+ */
+ void setReplyTo(String replyTo);
+
+ /**
+ * Gets the reply to queue being used
+ */
+ String getReplyTo();
+
+ /**
+ * Register a reply
+ *
+ * @param replyManager the reply manager being used
+ * @param exchange the exchange
+ * @param callback the callback
+ * @param originalCorrelationId an optional original correlation id
+ * @param correlationId the correlation id to expect being used
+ * @param requestTimeout the timeout
+ * @return the correlation id used
+ */
+ String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+ String originalCorrelationId, String correlationId, long requestTimeout);
+
+ /**
+ * Sets the scheduled to use when checking for timeouts (no reply received within a given time period)
+ */
+ void setScheduledExecutorService(ScheduledExecutorService executorService);
+
+
+ /**
+ * Updates the correlation id to the new correlation id.
+ * <p/>
+ * This is only used when <tt>useMessageIDasCorrelationID</tt> option is used, which means a
+ * provisional correlation id is first used, then after the message has been sent, the real
+ * correlation id is known. This allows us then to update the internal mapping to expect the
+ * real correlation id.
+ *
+ * @param correlationId the provisional correlation id
+ * @param newCorrelationId the real correlation id
+ * @param requestTimeout the timeout
+ */
+ void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout);
+
+
+ /**
+ * Process the reply
+ *
+ * @param holder containing needed data to process the reply and continue routing
+ */
+ void processReply(ReplyHolder holder);
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
new file mode 100644
index 0000000..6e41f99
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -0,0 +1,238 @@
+package org.apache.camel.component.rabbitmq.reply;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.component.rabbitmq.RabbitMQConstants;
+import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Connection;
+
+
+public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager{
+
+ protected final Logger log = LoggerFactory.getLogger(ReplyManagerSupport.class);
+ protected final CamelContext camelContext;
+
+ protected ScheduledExecutorService executorService;
+ protected RabbitMQEndpoint endpoint;
+ protected String replyTo;
+ protected Connection listenerContainer;
+ private int closeTimeout = 30 * 1000;
+ protected final CountDownLatch replyToLatch = new CountDownLatch(1);
+ protected final long replyToTimeout = 1000;
+ protected CorrelationTimeoutMap correlation;
+
+ public ReplyManagerSupport(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ public void setScheduledExecutorService(ScheduledExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public void setEndpoint(RabbitMQEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public void setReplyTo(String replyTo) {
+ log.debug("ReplyTo destination: {}", replyTo);
+ this.replyTo = replyTo;
+ // trigger latch as the reply to has been resolved and set
+ replyToLatch.countDown();
+ }
+
+ public String getReplyTo() {
+ if (replyTo != null) {
+ return replyTo;
+ }
+ try {
+ // the reply to destination has to be resolved using a DestinationResolver using
+ // the MessageListenerContainer which occurs asynchronously so we have to wait
+ // for that to happen before we can retrieve the reply to destination to be used
+ log.trace("Waiting for replyTo to be set");
+ boolean done = replyToLatch.await(replyToTimeout, TimeUnit.MILLISECONDS);
+ if (!done) {
+ log.warn("ReplyTo destination was not set and timeout occurred");
+ } else {
+ log.trace("Waiting for replyTo to be set done");
+ }
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ return replyTo;
+ }
+
+ public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+ String originalCorrelationId, String correlationId, long requestTimeout) {
+ log.debug("in registerReply");
+ // add to correlation map
+ QueueReplyHandler handler = new QueueReplyHandler(replyManager, exchange, callback,
+ originalCorrelationId, correlationId, requestTimeout);
+ // Just make sure we don't override the old value of the correlationId
+ ReplyHandler result = correlation.putIfAbsent(correlationId, handler, requestTimeout);
+ if (result != null) {
+ String logMessage = String.format("The correlationId [%s] is not unique.", correlationId);
+ throw new IllegalArgumentException(logMessage);
+ }
+ return correlationId;
+ }
+
+
+ protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+ String originalCorrelationId, String correlationId, long requestTimeout);
+
+ public void onMessage(AMQP.BasicProperties properties, byte[] message) {
+ String correlationID = properties.getCorrelationId();
+
+ if (correlationID == null) {
+ log.warn("Ignoring message with no correlationID: {}", message);
+ return;
+ }
+
+ log.debug("Received reply message with correlationID [{}] -> {}", correlationID, message);
+
+ // handle the reply message
+ handleReplyMessage(correlationID, properties, message);
+ }
+
+ public void processReply(ReplyHolder holder) {
+ log.info("in processReply");
+ if (holder != null && isRunAllowed()) {
+ try {
+ Exchange exchange = holder.getExchange();
+
+ boolean timeout = holder.isTimeout();
+ if (timeout) {
+ // timeout occurred do a WARN log so its easier to spot in the logs
+ if (log.isWarnEnabled()) {
+ log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}."
+ + " Setting ExchangeTimedOutException on {} and continue routing.",
+ new Object[]{holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange)});
+ }
+
+ // no response, so lets set a timed out exception
+ String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo;
+ exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg));
+ } else {
+
+ endpoint.setRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage());
+
+ // restore correlation id in case the remote server messed with it
+ if (holder.getOriginalCorrelationId() != null) {
+ if(exchange.getOut() != null){
+ exchange.getOut().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId());
+ }
+ else{
+ exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId());
+ }
+ }
+
+ }
+ } finally {
+ // notify callback
+ AsyncCallback callback = holder.getCallback();
+ callback.done(false);
+ }
+ }
+ }
+
+
+
+ protected abstract void handleReplyMessage(String correlationID, AMQP.BasicProperties properties, byte[] message);
+
+ protected abstract Connection createListenerContainer() throws Exception;
+
+ /**
+ * <b>IMPORTANT:</b> This logic is only being used due to high performance in-memory only
+ * testing using InOut over JMS. Its unlikely to happen in a real life situation with communication
+ * to a remote broker, which always will be slower to send back reply, before Camel had a chance
+ * to update it's internal correlation map.
+ */
+ protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, byte[] message) {
+ // race condition, when using messageID as correlationID then we store a provisional correlation id
+ // at first, which gets updated with the JMSMessageID after the message has been sent. And in the unlikely
+ // event that the reply comes back really really fast, and the correlation map hasn't yet been updated
+ // from the provisional id to the JMSMessageID. If so we have to wait a bit and lookup again.
+ if (log.isWarnEnabled()) {
+ log.warn("Early reply received with correlationID [{}] -> {}", correlationID, message);
+ }
+
+ ReplyHandler answer = null;
+
+ // wait up till 5 seconds
+ boolean done = false;
+ int counter = 0;
+ while (!done && counter++ < 50) {
+ log.trace("Early reply not found handler at attempt {}. Waiting a bit longer.", counter);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ // try again
+ answer = correlation.get(correlationID);
+ done = answer != null;
+
+ if (answer != null) {
+ if (log.isTraceEnabled()) {
+ log.trace("Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}",
+ new Object[]{correlationID, counter, answer});
+ }
+ }
+ }
+
+ return answer;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(executorService, "executorService", this);
+ ObjectHelper.notNull(endpoint, "endpoint", this);
+
+ // timeout map to use for purging messages which have timed out, while waiting for an expected reply
+ // when doing request/reply over JMS
+ log.debug("Using timeout checker interval with {} millis", endpoint.getRequestTimeoutCheckerInterval());
+ correlation = new CorrelationTimeoutMap(executorService, endpoint.getRequestTimeoutCheckerInterval());
+ ServiceHelper.startService(correlation);
+
+ // create JMS listener and start it
+
+ listenerContainer = createListenerContainer();
+
+ log.debug("Using executor {}", executorService);
+
+ }
+
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(correlation);
+
+ if (listenerContainer != null) {
+ log.debug("Closing connection: {} with timeout: {} ms.", listenerContainer, closeTimeout);
+ listenerContainer.close(closeTimeout);
+ listenerContainer = null;
+ }
+
+ // must also stop executor service
+ if (executorService != null) {
+ camelContext.getExecutorServiceManager().shutdownGraceful(executorService);
+ executorService = null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
new file mode 100644
index 0000000..c7fcf41
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.reply;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.rabbitmq.RabbitMQConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.AMQP;
+
+/**
+ * {@link ReplyHandler} to handle processing replies when using temporary queues.
+ *
+ * @version
+ */
+public class TemporaryQueueReplyHandler implements ReplyHandler {
+
+ protected final Logger log = LoggerFactory.getLogger(TemporaryQueueReplyHandler.class);
+
+ // task queue to add the holder so we can process the reply
+ protected final ReplyManager replyManager;
+ protected final Exchange exchange;
+ protected final AsyncCallback callback;
+ // remember the original correlation id, in case the server returns back a reply with a messed up correlation id
+ protected final String originalCorrelationId;
+ protected final String correlationId;
+ protected final long timeout;
+
+ public TemporaryQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+ String originalCorrelationId, String correlationId, long timeout) {
+ this.replyManager = replyManager;
+ this.exchange = exchange;
+ this.originalCorrelationId = originalCorrelationId;
+ this.correlationId = correlationId;
+ this.callback = callback;
+ this.timeout = timeout;
+ }
+
+ public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) {
+ // create holder object with the the reply
+ log.info("in onReply with correlationId= {}", correlationId);
+ ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply);
+ // process the reply
+ replyManager.processReply(holder);
+ }
+
+ public void onTimeout(String correlationId) {
+ // create holder object without the reply which means a timeout occurred
+ log.info("in onTimeout with correlationId= {}", correlationId);
+ ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, timeout);
+ // process timeout
+ replyManager.processReply(holder);
+ }
+}