You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/10/23 05:10:54 UTC

git commit: CAMEL-4494 Supports the ReplyToOverride option in camel-jms with thanks to Jens

Repository: camel
Updated Branches:
  refs/heads/master 128da0928 -> 3107a09f0


CAMEL-4494 Supports the ReplyToOverride option in camel-jms with thanks to Jens


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3107a09f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3107a09f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3107a09f

Branch: refs/heads/master
Commit: 3107a09f0fae52a5df345f4f36d34629f49810e2
Parents: 128da09
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Oct 23 11:09:38 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Oct 23 11:09:38 2014 +0800

----------------------------------------------------------------------
 .../camel/component/jms/JmsConfiguration.java   |   9 ++
 .../apache/camel/component/jms/JmsProducer.java |  73 ++++++++------
 .../jms/JmsRequestReplyReplyToOverrideTest.java | 101 +++++++++++++++++++
 3 files changed, 154 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3107a09f/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index 04c1306..ab3c6fe 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -162,6 +162,7 @@ public class JmsConfiguration implements Cloneable {
     private String replyToDestination;
     @UriParam
     private String replyToDestinationSelectorName;
+    private String replyToOverride;
     private JmsMessageType jmsMessageType;
     private JmsKeyFormatStrategy jmsKeyFormatStrategy;
     @UriParam
@@ -1228,6 +1229,14 @@ public class JmsConfiguration implements Cloneable {
         }
     }
 
+    public String getReplyToOverride() {
+        return replyToOverride;
+    }
+
+    public void setReplyToOverride(String replyToDestination) {
+        this.replyToOverride = normalizeDestinationName(replyToDestination);
+    }
+
     public JmsMessageType getJmsMessageType() {
         return jmsMessageType;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3107a09f/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
index a30c4f3..e1afdb1 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
@@ -209,8 +209,14 @@ public class JmsProducer extends DefaultAsyncProducer {
             public Message createMessage(Session session) throws JMSException {
                 Message answer = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
 
-                // get the reply to destination to be used from the reply manager
-                Destination replyTo = replyManager.getReplyTo();
+                Destination replyTo = null;
+                String replyToOverride = endpoint.getConfiguration().getReplyToOverride();
+                if (replyToOverride != null) {
+                	replyTo = resolveOrCreateDestination(replyToOverride, session);
+                } else {
+	                // get the reply to destination to be used from the reply manager
+	                replyTo = replyManager.getReplyTo();
+                }
                 if (replyTo == null) {
                     throw new RuntimeExchangeException("Failed to resolve replyTo destination", exchange);
                 }
@@ -336,37 +342,16 @@ public class JmsProducer extends DefaultAsyncProducer {
                 if (jmsReplyTo != null && jmsReplyTo instanceof String) {
                     String replyTo = (String) jmsReplyTo;
                     // we need to null it as we use the String to resolve it as a Destination instance
-                    jmsReplyTo = null;
-                    boolean isPubSub = isTopicPrefix(replyTo) || (!isQueuePrefix(replyTo) && endpoint.isPubSubDomain());
-                    // try using destination resolver to lookup the destination
-                    if (endpoint.getDestinationResolver() != null) {
-                        jmsReplyTo = endpoint.getDestinationResolver().resolveDestinationName(session, replyTo, isPubSub);
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Resolved JMSReplyTo destination {} using DestinationResolver {} as PubSubDomain {} -> {}",
-                                    new Object[]{replyTo, endpoint.getDestinationResolver(), isPubSub, jmsReplyTo});
-                        }
-                    }
-                    if (jmsReplyTo == null) {
-                        // must normalize the destination name
-                        String before = replyTo;
-                        replyTo = normalizeDestinationName(replyTo);
-                        LOG.trace("Normalized JMSReplyTo destination name {} -> {}", before, replyTo);
-
-                        // okay then fallback and create the queue/topic
-                        if (isPubSub) {
-                            LOG.debug("Creating JMSReplyTo topic: {}", replyTo);
-                            jmsReplyTo = session.createTopic(replyTo);
-                        } else {
-                            LOG.debug("Creating JMSReplyTo queue: {}", replyTo);
-                            jmsReplyTo = session.createQueue(replyTo);
-                        }
-                    }
+                    jmsReplyTo = resolveOrCreateDestination(replyTo, session);
                 }
 
                 // set the JMSReplyTo on the answer if we are to use it
                 Destination replyTo = null;
-                if (jmsReplyTo instanceof Destination) {
-                    replyTo = (Destination) jmsReplyTo;
+                String replyToOverride = endpoint.getConfiguration().getReplyToOverride();
+                if (replyToOverride != null) {
+                    replyTo = resolveOrCreateDestination(replyToOverride, session);
+                } else if (jmsReplyTo instanceof Destination) {
+                    replyTo = (Destination)jmsReplyTo;
                 }
                 if (replyTo != null) {
                     LOG.debug("Using JMSReplyTo destination: {}", replyTo);
@@ -436,6 +421,36 @@ public class JmsProducer extends DefaultAsyncProducer {
         }
     }
 
+    protected Destination resolveOrCreateDestination(String destinationName, Session session) throws JMSException {
+    	Destination dest = null;
+    	
+    	boolean isPubSub = isTopicPrefix(destinationName) || (!isQueuePrefix(destinationName) && endpoint.isPubSubDomain());
+        // try using destination resolver to lookup the destination
+        if (endpoint.getDestinationResolver() != null) {
+            dest = endpoint.getDestinationResolver().resolveDestinationName(session, destinationName, isPubSub);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Resolved JMSReplyTo destination {} using DestinationResolver {} as PubSubDomain {} -> {}",
+                        new Object[]{destinationName, endpoint.getDestinationResolver(), isPubSub, dest});
+            }
+        }
+        if (dest == null) {
+            // must normalize the destination name
+            String before = destinationName;
+            destinationName = normalizeDestinationName(destinationName);
+            LOG.trace("Normalized JMSReplyTo destination name {} -> {}", before, destinationName);
+
+            // okay then fallback and create the queue/topic
+            if (isPubSub) {
+                LOG.debug("Creating JMSReplyTo topic: {}", destinationName);
+                dest = session.createTopic(destinationName);
+            } else {
+                LOG.debug("Creating JMSReplyTo queue: {}", destinationName);
+                dest = session.createQueue(destinationName);
+            }
+        }
+    	return dest;
+    }
+
     protected void setMessageId(Exchange exchange) {
         if (exchange.hasOut()) {
             JmsMessage out = exchange.getOut(JmsMessage.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/3107a09f/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyReplyToOverrideTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyReplyToOverrideTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyReplyToOverrideTest.java
new file mode 100644
index 0000000..89f1fc2
--- /dev/null
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyReplyToOverrideTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+public class JmsRequestReplyReplyToOverrideTest extends CamelTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsRequestReplyReplyToOverrideTest.class);
+
+    private static final String REQUEST_BODY = "Something";
+    private static final String EXPECTED_REPLY_BODY = "Re: " + REQUEST_BODY;
+    private static final String EXPECTED_REPLY_HEADER = "queue://bar";
+    
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testJmsRequestReplyReplyToAndReplyToHeader() throws Exception {
+    	// send request to foo, set replyTo to bar, but actually expect reply at baz
+        Thread sender = new Thread(new Responder());
+        sender.start();
+
+        Exchange reply = template.request("jms:queue:foo", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody(REQUEST_BODY);
+            }
+        });
+        assertEquals(EXPECTED_REPLY_BODY, reply.getOut().getBody());
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
+        JmsComponent jmsComponent = jmsComponentAutoAcknowledge(connectionFactory);
+        jmsComponent.getConfiguration().setReplyTo("baz");
+        jmsComponent.getConfiguration().setReplyToOverride("bar");
+        camelContext.addComponent("jms", jmsComponent);
+        return camelContext;
+    }
+
+    private class Responder implements Runnable {
+
+        public void run() {
+            try {
+                LOG.debug("Waiting for request");
+                Exchange request = consumer.receive("jms:queue:foo", 5000);
+
+                LOG.debug("Got request, sending reply");
+                final String body = request.getIn().getBody(String.class);
+                final String cid = request.getIn().getHeader("JMSCorrelationID", String.class);
+                final Destination replyTo = request.getIn().getHeader("JMSReplyTo", Destination.class);
+                
+                assertEquals(EXPECTED_REPLY_HEADER, replyTo.toString());
+                
+                // send reply
+                template.send("jms:dummy", ExchangePattern.InOnly, new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+
+                        Message in = exchange.getIn();
+                        in.setBody("Re: " + body);
+                        in.setHeader(JmsConstants.JMS_DESTINATION_NAME, "baz");
+                        in.setHeader("JMSCorrelationID", cid);
+                    }
+                });
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+    }
+}