You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/12/25 09:49:19 UTC

svn commit: r1052759 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ test/java/org/apache/camel/component/jms/

Author: davsclaus
Date: Sat Dec 25 08:49:19 2010
New Revision: 1052759

URL: http://svn.apache.org/viewvc?rev=1052759&view=rev
Log:
CAMEL-3462: JmsProducer in InOnly should support JMSReplyTo if provided as header or endpoint configuration.
            Now it sends JMSReplyTo information, but will of course not wait for any reply.
            You can use this to still provide a reply to destination, and have some other consumer pickup and process the replies.
CAMEL-3463: JMSReplyTo provided in header should be normalized to remove any queue or topic prefix.

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToAsHeaderTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToDisabledTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java
Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=1052759&r1=1052758&r2=1052759&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Sat Dec 25 08:49:19 2010
@@ -54,6 +54,7 @@ import org.apache.camel.util.ObjectHelpe
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
 import static org.apache.camel.component.jms.JmsMessageType.Bytes;
 import static org.apache.camel.component.jms.JmsMessageType.Map;
 import static org.apache.camel.component.jms.JmsMessageType.Object;
@@ -318,6 +319,10 @@ public class JmsBinding {
             if (headerName.equals("JMSCorrelationID")) {
                 jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
             } else if (headerName.equals("JMSReplyTo") && headerValue != null) {
+                if (headerValue instanceof String) {
+                    // if the value is a String we must normalize it first
+                    headerValue = normalizeDestinationName((String) headerValue);
+                }
                 jmsMessage.setJMSReplyTo(ExchangeHelper.convertToType(exchange, Destination.class, headerValue));
             } else if (headerName.equals("JMSType")) {
                 jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1052759&r1=1052758&r2=1052759&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Sat Dec 25 08:49:19 2010
@@ -45,6 +45,7 @@ import org.springframework.jms.support.d
 import org.springframework.transaction.PlatformTransactionManager;
 import org.springframework.util.Assert;
 
+import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
 import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
 
 /**
@@ -1075,13 +1076,7 @@ public class JmsConfiguration implements
     }
 
     public void setReplyTo(String replyToDestination) {
-        if (replyToDestination.startsWith(QUEUE_PREFIX)) {
-            this.replyToDestination = removeStartingCharacters(replyToDestination.substring(QUEUE_PREFIX.length()), '/');
-        } else if (replyToDestination.startsWith(TOPIC_PREFIX)) {
-            this.replyToDestination = removeStartingCharacters(replyToDestination.substring(TOPIC_PREFIX.length()), '/');
-        } else {
-            this.replyToDestination = replyToDestination;
-        }
+        this.replyToDestination = normalizeDestinationName(replyToDestination);
     }
 
     public String getReplyToDestinationSelectorName() {

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java?rev=1052759&r1=1052758&r2=1052759&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java Sat Dec 25 08:49:19 2010
@@ -22,6 +22,12 @@ import java.util.Map;
 import javax.jms.JMSException;
 import javax.jms.Message;
 
+import org.apache.camel.util.ObjectHelper;
+
+import static org.apache.camel.component.jms.JmsConfiguration.QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TOPIC_PREFIX;
+import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
+
 /**
  * Utility class for {@link javax.jms.Message}.
  *
@@ -140,4 +146,23 @@ public final class JmsMessageHelper {
             // ignore
         }
     }
+
+    /**
+     * Normalizes the destination name, by removing any leading queue or topic prefixes.
+     *
+     * @param destination the destination
+     * @return the normalized destination
+     */
+    public static String normalizeDestinationName(String destination) {
+        if (ObjectHelper.isEmpty(destination)) {
+            return destination;
+        }
+        if (destination.startsWith(QUEUE_PREFIX)) {
+            return removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+        } else if (destination.startsWith(TOPIC_PREFIX)) {
+            return removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+        } else {
+            return destination;
+        }
+    }
 }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=1052759&r1=1052758&r2=1052759&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Sat Dec 25 08:49:19 2010
@@ -40,6 +40,8 @@ import org.apache.commons.logging.LogFac
 import org.springframework.jms.core.JmsOperations;
 import org.springframework.jms.core.MessageCreator;
 
+import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
+
 /**
  * @version $Revision$
  */
@@ -229,12 +231,11 @@ public class JmsProducer extends Default
             if (replyTo != null) {
                 // we are routing an existing JmsMessage, origin from another JMS endpoint
                 // then we need to remove the existing JMSReplyTo
-                // as we are not out capable and thus do not expect a reply, and therefore
-                // the consumer of this message we send should not return a reply
+                // as we are not OUT capable and thus do not expect a reply, and therefore
+                // the consumer of this message should not return a reply
                 String to = destinationName != null ? destinationName : "" + destination;
                 LOG.warn("Disabling JMSReplyTo as this Exchange is not OUT capable with JMSReplyTo: " + replyTo
-                        + " for destination: " + to + ". Use preserveMessageQos=true to force Camel to keep the JMSReplyTo."
-                        + " Exchange: " + exchange);
+                        + " for destination: " + to + ". Use preserveMessageQos=true to force Camel to keep the JMSReplyTo on: " + exchange);
                 exchange.getIn().setHeader("JMSReplyTo", null);
             }
         }
@@ -243,33 +244,51 @@ public class JmsProducer extends Default
             public Message createMessage(Session session) throws JMSException {
                 Message answer = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
 
-                // if the binding did not create the reply to then we have to try to create it here
-                String replyTo = exchange.getIn().getHeader("JMSReplyTo", String.class);
-                if (replyTo != null && answer.getJMSReplyTo() == null) {
-                    Destination destination = null;
-                    // try using destination resolver to lookup the destination
-                    if (endpoint.getDestinationResolver() != null) {
-                        destination = endpoint.getDestinationResolver().resolveDestinationName(session, replyTo, endpoint.isPubSubDomain());
+                if (endpoint.isDisableReplyTo()) {
+                    // honor disable reply to configuration
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("ReplyTo is disabled on endpoint: " + endpoint);
                     }
-                    if (destination == null) {
-                        // okay then fallback and create the queue
-                        if (endpoint.isPubSubDomain()) {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Creating JMSReplyTo topic: " + replyTo);
+                    answer.setJMSReplyTo(null);
+                } else {
+                    // if the binding did not create the reply to then we have to try to create it here
+                    if (answer.getJMSReplyTo() == null) {
+                        // prefer reply to from header over endpoint configured
+                        String replyTo = exchange.getIn().getHeader("JMSReplyTo", String.class);
+                        if (replyTo == null) {
+                            replyTo = endpoint.getReplyTo();
+                        }
+
+                        if (replyTo != null) {
+                            // must normalize the destination name
+                            replyTo = normalizeDestinationName(replyTo);
+
+                            Destination destination = null;
+                            // try using destination resolver to lookup the destination
+                            if (endpoint.getDestinationResolver() != null) {
+                                destination = endpoint.getDestinationResolver().resolveDestinationName(session, replyTo, endpoint.isPubSubDomain());
                             }
-                            destination = session.createTopic(replyTo);
-                        } else {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Creating JMSReplyTo queue: " + replyTo);
+                            if (destination == null) {
+                                // okay then fallback and create the queue
+                                if (endpoint.isPubSubDomain()) {
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.debug("Creating JMSReplyTo topic: " + replyTo);
+                                    }
+                                    destination = session.createTopic(replyTo);
+                                } else {
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.debug("Creating JMSReplyTo queue: " + replyTo);
+                                    }
+                                    destination = session.createQueue(replyTo);
+                                }
+                            }
+                            if (destination != null) {
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Using JMSReplyTo destination: " + destination);
+                                }
+                                answer.setJMSReplyTo(destination);
                             }
-                            destination = session.createQueue(replyTo);
-                        }
-                    }
-                    if (destination != null) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Using JMSReplyTo destination: " + destination);
                         }
-                        answer.setJMSReplyTo(destination);
                     }
                 }
 

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToAsHeaderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToAsHeaderTest.java?rev=1052759&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToAsHeaderTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToAsHeaderTest.java Sat Dec 25 08:49:19 2010
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * @version $Revision$
+ */
+public class JmsInOnlyWithReplyToAsHeaderTest extends CamelTestSupport {
+
+    @Test
+    public void testSendInOnlyWithReplyTo() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("World");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:done").expectedBodiesReceived("World");
+
+        template.sendBodyAndHeader("direct:start", "World", "JMSReplyTo", "queue:bar");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    // must enable preserveMessageQos to force Camel to use the JMSReplyTo header
+                    .to("activemq:queue:foo?preserveMessageQos=true")
+                    .to("mock:done");
+
+                from("activemq:queue:foo")
+                    .to("log:foo?showAll=true", "mock:foo")
+                    .transform(body().prepend("Bye "));
+
+                from("activemq:queue:bar")
+                    .to("log:bar?showAll=true", "mock:bar");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToDisabledTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToDisabledTest.java?rev=1052759&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToDisabledTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToDisabledTest.java Sat Dec 25 08:49:19 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * @version $Revision$
+ */
+public class JmsInOnlyWithReplyToDisabledTest extends CamelTestSupport {
+
+    @Test
+    public void testSendInOnlyWithReplyTo() throws Exception {
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(0);
+        getMockEndpoint("mock:done").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("activemq:queue:foo?replyTo=queue:bar&disableReplyTo=true")
+                    .to("mock:done");
+
+                from("activemq:queue:foo")
+                    .to("mock:foo")
+                    .transform(body().prepend("Bye "));
+
+                from("activemq:queue:bar")
+                    .to("mock:bar");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java?rev=1052759&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToTest.java Sat Dec 25 08:49:19 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * @version $Revision$
+ */
+public class JmsInOnlyWithReplyToTest extends CamelTestSupport {
+
+    @Test
+    public void testSendInOnlyWithReplyTo() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("World");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:done").expectedBodiesReceived("World");
+
+        template.sendBody("direct:start", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("activemq:queue:foo?replyTo=queue:bar")
+                    .to("mock:done");
+
+                from("activemq:queue:foo")
+                    .to("log:foo?showAll=true", "mock:foo")
+                    .transform(body().prepend("Bye "));
+
+                from("activemq:queue:bar")
+                    .to("log:bar?showAll=true", "mock:bar");
+            }
+        };
+    }
+}