You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/10/23 05:10:54 UTC
git commit: CAMEL-4494 Supports the ReplyToOverride option in
camel-jms with thanks to Jens
Repository: camel
Updated Branches:
refs/heads/master 128da0928 -> 3107a09f0
CAMEL-4494 Supports the ReplyToOverride option in camel-jms with thanks to Jens
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3107a09f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3107a09f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3107a09f
Branch: refs/heads/master
Commit: 3107a09f0fae52a5df345f4f36d34629f49810e2
Parents: 128da09
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Oct 23 11:09:38 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Oct 23 11:09:38 2014 +0800
----------------------------------------------------------------------
.../camel/component/jms/JmsConfiguration.java | 9 ++
.../apache/camel/component/jms/JmsProducer.java | 73 ++++++++------
.../jms/JmsRequestReplyReplyToOverrideTest.java | 101 +++++++++++++++++++
3 files changed, 154 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3107a09f/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index 04c1306..ab3c6fe 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -162,6 +162,7 @@ public class JmsConfiguration implements Cloneable {
private String replyToDestination;
@UriParam
private String replyToDestinationSelectorName;
+ private String replyToOverride;
private JmsMessageType jmsMessageType;
private JmsKeyFormatStrategy jmsKeyFormatStrategy;
@UriParam
@@ -1228,6 +1229,14 @@ public class JmsConfiguration implements Cloneable {
}
}
+ public String getReplyToOverride() {
+ return replyToOverride;
+ }
+
+ public void setReplyToOverride(String replyToDestination) {
+ this.replyToOverride = normalizeDestinationName(replyToDestination);
+ }
+
public JmsMessageType getJmsMessageType() {
return jmsMessageType;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3107a09f/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
index a30c4f3..e1afdb1 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
@@ -209,8 +209,14 @@ public class JmsProducer extends DefaultAsyncProducer {
public Message createMessage(Session session) throws JMSException {
Message answer = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
- // get the reply to destination to be used from the reply manager
- Destination replyTo = replyManager.getReplyTo();
+ Destination replyTo = null;
+ String replyToOverride = endpoint.getConfiguration().getReplyToOverride();
+ if (replyToOverride != null) {
+ replyTo = resolveOrCreateDestination(replyToOverride, session);
+ } else {
+ // get the reply to destination to be used from the reply manager
+ replyTo = replyManager.getReplyTo();
+ }
if (replyTo == null) {
throw new RuntimeExchangeException("Failed to resolve replyTo destination", exchange);
}
@@ -336,37 +342,16 @@ public class JmsProducer extends DefaultAsyncProducer {
if (jmsReplyTo != null && jmsReplyTo instanceof String) {
String replyTo = (String) jmsReplyTo;
// we need to null it as we use the String to resolve it as a Destination instance
- jmsReplyTo = null;
- boolean isPubSub = isTopicPrefix(replyTo) || (!isQueuePrefix(replyTo) && endpoint.isPubSubDomain());
- // try using destination resolver to lookup the destination
- if (endpoint.getDestinationResolver() != null) {
- jmsReplyTo = endpoint.getDestinationResolver().resolveDestinationName(session, replyTo, isPubSub);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Resolved JMSReplyTo destination {} using DestinationResolver {} as PubSubDomain {} -> {}",
- new Object[]{replyTo, endpoint.getDestinationResolver(), isPubSub, jmsReplyTo});
- }
- }
- if (jmsReplyTo == null) {
- // must normalize the destination name
- String before = replyTo;
- replyTo = normalizeDestinationName(replyTo);
- LOG.trace("Normalized JMSReplyTo destination name {} -> {}", before, replyTo);
-
- // okay then fallback and create the queue/topic
- if (isPubSub) {
- LOG.debug("Creating JMSReplyTo topic: {}", replyTo);
- jmsReplyTo = session.createTopic(replyTo);
- } else {
- LOG.debug("Creating JMSReplyTo queue: {}", replyTo);
- jmsReplyTo = session.createQueue(replyTo);
- }
- }
+ jmsReplyTo = resolveOrCreateDestination(replyTo, session);
}
// set the JMSReplyTo on the answer if we are to use it
Destination replyTo = null;
- if (jmsReplyTo instanceof Destination) {
- replyTo = (Destination) jmsReplyTo;
+ String replyToOverride = endpoint.getConfiguration().getReplyToOverride();
+ if (replyToOverride != null) {
+ replyTo = resolveOrCreateDestination(replyToOverride, session);
+ } else if (jmsReplyTo instanceof Destination) {
+ replyTo = (Destination)jmsReplyTo;
}
if (replyTo != null) {
LOG.debug("Using JMSReplyTo destination: {}", replyTo);
@@ -436,6 +421,36 @@ public class JmsProducer extends DefaultAsyncProducer {
}
}
+ protected Destination resolveOrCreateDestination(String destinationName, Session session) throws JMSException {
+ Destination dest = null;
+
+ boolean isPubSub = isTopicPrefix(destinationName) || (!isQueuePrefix(destinationName) && endpoint.isPubSubDomain());
+ // try using destination resolver to lookup the destination
+ if (endpoint.getDestinationResolver() != null) {
+ dest = endpoint.getDestinationResolver().resolveDestinationName(session, destinationName, isPubSub);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Resolved JMSReplyTo destination {} using DestinationResolver {} as PubSubDomain {} -> {}",
+ new Object[]{destinationName, endpoint.getDestinationResolver(), isPubSub, dest});
+ }
+ }
+ if (dest == null) {
+ // must normalize the destination name
+ String before = destinationName;
+ destinationName = normalizeDestinationName(destinationName);
+ LOG.trace("Normalized JMSReplyTo destination name {} -> {}", before, destinationName);
+
+ // okay then fallback and create the queue/topic
+ if (isPubSub) {
+ LOG.debug("Creating JMSReplyTo topic: {}", destinationName);
+ dest = session.createTopic(destinationName);
+ } else {
+ LOG.debug("Creating JMSReplyTo queue: {}", destinationName);
+ dest = session.createQueue(destinationName);
+ }
+ }
+ return dest;
+ }
+
protected void setMessageId(Exchange exchange) {
if (exchange.hasOut()) {
JmsMessage out = exchange.getOut(JmsMessage.class);
http://git-wip-us.apache.org/repos/asf/camel/blob/3107a09f/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyReplyToOverrideTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyReplyToOverrideTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyReplyToOverrideTest.java
new file mode 100644
index 0000000..89f1fc2
--- /dev/null
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyReplyToOverrideTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+public class JmsRequestReplyReplyToOverrideTest extends CamelTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsRequestReplyReplyToOverrideTest.class);
+
+ private static final String REQUEST_BODY = "Something";
+ private static final String EXPECTED_REPLY_BODY = "Re: " + REQUEST_BODY;
+ private static final String EXPECTED_REPLY_HEADER = "queue://bar";
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ @Test
+ public void testJmsRequestReplyReplyToAndReplyToHeader() throws Exception {
+ // send request to foo, set replyTo to bar, but actually expect reply at baz
+ Thread sender = new Thread(new Responder());
+ sender.start();
+
+ Exchange reply = template.request("jms:queue:foo", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody(REQUEST_BODY);
+ }
+ });
+ assertEquals(EXPECTED_REPLY_BODY, reply.getOut().getBody());
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
+ JmsComponent jmsComponent = jmsComponentAutoAcknowledge(connectionFactory);
+ jmsComponent.getConfiguration().setReplyTo("baz");
+ jmsComponent.getConfiguration().setReplyToOverride("bar");
+ camelContext.addComponent("jms", jmsComponent);
+ return camelContext;
+ }
+
+ private class Responder implements Runnable {
+
+ public void run() {
+ try {
+ LOG.debug("Waiting for request");
+ Exchange request = consumer.receive("jms:queue:foo", 5000);
+
+ LOG.debug("Got request, sending reply");
+ final String body = request.getIn().getBody(String.class);
+ final String cid = request.getIn().getHeader("JMSCorrelationID", String.class);
+ final Destination replyTo = request.getIn().getHeader("JMSReplyTo", Destination.class);
+
+ assertEquals(EXPECTED_REPLY_HEADER, replyTo.toString());
+
+ // send reply
+ template.send("jms:dummy", ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) throws Exception {
+
+ Message in = exchange.getIn();
+ in.setBody("Re: " + body);
+ in.setHeader(JmsConstants.JMS_DESTINATION_NAME, "baz");
+ in.setHeader("JMSCorrelationID", cid);
+ }
+ });
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+}