You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/01/04 14:18:49 UTC

[1/3] git commit: CAMEL-7049: Fixed jms JMSReplyTo header with a topic did not work correctly.

Updated Branches:
  refs/heads/camel-2.11.x 12f1adf20 -> 145322664
  refs/heads/camel-2.12.x 0d429cdc0 -> 0e4b1458f
  refs/heads/master f6715cb0d -> 10a5f1b3a


CAMEL-7049: Fixed jms JMSReplyTo header with a topic did not work correctly.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/10a5f1b3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/10a5f1b3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/10a5f1b3

Branch: refs/heads/master
Commit: 10a5f1b3aab6bf9b73fb68de616235f75de8aefc
Parents: f6715cb
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Jan 4 14:15:33 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Jan 4 14:18:46 2014 +0100

----------------------------------------------------------------------
 .../apache/camel/component/jms/JmsBinding.java  |  5 +-
 .../camel/component/jms/JmsMessageHelper.java   | 46 +++++++++++-
 .../JmsInOnlyWithReplyToHeaderTopicTest.java    | 74 ++++++++++++++++++++
 3 files changed, 120 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/10a5f1b3/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index 9cb393a..b86e362 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -319,8 +319,9 @@ public class JmsBinding {
                 jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
             } else if (headerName.equals("JMSReplyTo") && headerValue != null) {
                 if (headerValue instanceof String) {
-                    // if the value is a String we must normalize it first
-                    headerValue = normalizeDestinationName((String) headerValue);
+                    // if the value is a String we must normalize it first, and must include the prefix
+                    // as ActiveMQ requires that when converting the String to a javax.jms.Destination type
+                    headerValue = normalizeDestinationName((String) headerValue, true);
                 }
                 Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue);
                 JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo);

http://git-wip-us.apache.org/repos/asf/camel/blob/10a5f1b3/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
index 8b8f15b..52f6af5 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
@@ -30,6 +30,8 @@ import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 
 import static org.apache.camel.component.jms.JmsConfiguration.QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TEMP_QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TEMP_TOPIC_PREFIX;
 import static org.apache.camel.component.jms.JmsConfiguration.TOPIC_PREFIX;
 import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
 
@@ -169,19 +171,57 @@ public final class JmsMessageHelper {
     }
 
     /**
-     * Normalizes the destination name, by removing any leading queue or topic prefixes.
+     * Normalizes the destination name.
+     * <p/>
+     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+     * was intended as <tt>queue://foo</tt>.
      *
      * @param destination the destination
      * @return the normalized destination
      */
     public static String normalizeDestinationName(String destination) {
+        // do not include prefix which is the current behavior when using this method.
+        return normalizeDestinationName(destination, false);
+    }
+
+    /**
+     * Normalizes the destination name.
+     * <p/>
+     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+     * was intended as <tt>queue://foo</tt>.
+     *
+     * @param destination the destination
+     * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name
+     * @return the normalized destination
+     */
+    public static String normalizeDestinationName(String destination, boolean includePrefix) {
         if (ObjectHelper.isEmpty(destination)) {
             return destination;
         }
         if (destination.startsWith(QUEUE_PREFIX)) {
-            return removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+            String s = removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = QUEUE_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(TEMP_QUEUE_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(TEMP_QUEUE_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = TEMP_QUEUE_PREFIX + "//" + s;
+            }
+            return s;
         } else if (destination.startsWith(TOPIC_PREFIX)) {
-            return removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+            String s = removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = TOPIC_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(TEMP_TOPIC_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(TEMP_TOPIC_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = TEMP_TOPIC_PREFIX + "//" + s;
+            }
+            return s;
         } else {
             return destination;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/10a5f1b3/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
new file mode 100644
index 0000000..bb30ab1
--- /dev/null
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ * @version 
+ */
+public class JmsInOnlyWithReplyToHeaderTopicTest extends CamelTestSupport {
+
+    @Test
+    public void testJmsInOnlyWithReplyToHeader() throws Exception {
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedHeaderReceived("JMSReplyTo", "topic://bar");
+
+        template.send("activemq:queue:foo?preserveMessageQos=true", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("World");
+                exchange.getIn().setHeader("JMSReplyTo", "topic:bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("activemq:queue:foo")
+                    .transform(body().prepend("Hello "))
+                    .to("mock:result");
+
+                from("activemq:topic:bar").to("mock:bar");
+            }
+        };
+    }
+}
\ No newline at end of file


[2/3] git commit: CAMEL-7049: Fixed jms JMSReplyTo header with a topic did not work correctly.

Posted by da...@apache.org.
CAMEL-7049: Fixed jms JMSReplyTo header with a topic did not work correctly.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0e4b1458
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0e4b1458
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0e4b1458

Branch: refs/heads/camel-2.12.x
Commit: 0e4b1458f2d0e341d07c37c1d480a501b251a6dc
Parents: 0d429cd
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Jan 4 14:15:33 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Jan 4 14:22:04 2014 +0100

----------------------------------------------------------------------
 .../apache/camel/component/jms/JmsBinding.java  |  5 +-
 .../camel/component/jms/JmsMessageHelper.java   | 46 +++++++++++-
 .../JmsInOnlyWithReplyToHeaderTopicTest.java    | 74 ++++++++++++++++++++
 3 files changed, 120 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0e4b1458/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index 9cb393a..b86e362 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -319,8 +319,9 @@ public class JmsBinding {
                 jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
             } else if (headerName.equals("JMSReplyTo") && headerValue != null) {
                 if (headerValue instanceof String) {
-                    // if the value is a String we must normalize it first
-                    headerValue = normalizeDestinationName((String) headerValue);
+                    // if the value is a String we must normalize it first, and must include the prefix
+                    // as ActiveMQ requires that when converting the String to a javax.jms.Destination type
+                    headerValue = normalizeDestinationName((String) headerValue, true);
                 }
                 Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue);
                 JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo);

http://git-wip-us.apache.org/repos/asf/camel/blob/0e4b1458/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
index 8b8f15b..52f6af5 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
@@ -30,6 +30,8 @@ import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 
 import static org.apache.camel.component.jms.JmsConfiguration.QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TEMP_QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TEMP_TOPIC_PREFIX;
 import static org.apache.camel.component.jms.JmsConfiguration.TOPIC_PREFIX;
 import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
 
@@ -169,19 +171,57 @@ public final class JmsMessageHelper {
     }
 
     /**
-     * Normalizes the destination name, by removing any leading queue or topic prefixes.
+     * Normalizes the destination name.
+     * <p/>
+     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+     * was intended as <tt>queue://foo</tt>.
      *
      * @param destination the destination
      * @return the normalized destination
      */
     public static String normalizeDestinationName(String destination) {
+        // do not include prefix which is the current behavior when using this method.
+        return normalizeDestinationName(destination, false);
+    }
+
+    /**
+     * Normalizes the destination name.
+     * <p/>
+     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+     * was intended as <tt>queue://foo</tt>.
+     *
+     * @param destination the destination
+     * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name
+     * @return the normalized destination
+     */
+    public static String normalizeDestinationName(String destination, boolean includePrefix) {
         if (ObjectHelper.isEmpty(destination)) {
             return destination;
         }
         if (destination.startsWith(QUEUE_PREFIX)) {
-            return removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+            String s = removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = QUEUE_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(TEMP_QUEUE_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(TEMP_QUEUE_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = TEMP_QUEUE_PREFIX + "//" + s;
+            }
+            return s;
         } else if (destination.startsWith(TOPIC_PREFIX)) {
-            return removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+            String s = removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = TOPIC_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(TEMP_TOPIC_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(TEMP_TOPIC_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = TEMP_TOPIC_PREFIX + "//" + s;
+            }
+            return s;
         } else {
             return destination;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/0e4b1458/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
new file mode 100644
index 0000000..bb30ab1
--- /dev/null
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ * @version 
+ */
+public class JmsInOnlyWithReplyToHeaderTopicTest extends CamelTestSupport {
+
+    @Test
+    public void testJmsInOnlyWithReplyToHeader() throws Exception {
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedHeaderReceived("JMSReplyTo", "topic://bar");
+
+        template.send("activemq:queue:foo?preserveMessageQos=true", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("World");
+                exchange.getIn().setHeader("JMSReplyTo", "topic:bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("activemq:queue:foo")
+                    .transform(body().prepend("Hello "))
+                    .to("mock:result");
+
+                from("activemq:topic:bar").to("mock:bar");
+            }
+        };
+    }
+}
\ No newline at end of file


[3/3] git commit: CAMEL-7049: Fixed jms JMSReplyTo header with a topic did not work correctly.

Posted by da...@apache.org.
CAMEL-7049: Fixed jms JMSReplyTo header with a topic did not work correctly.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/14532266
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/14532266
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/14532266

Branch: refs/heads/camel-2.11.x
Commit: 1453226640489436d4f15c051ab3a6a4ad537aff
Parents: 12f1adf
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Jan 4 14:15:33 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Jan 4 14:22:21 2014 +0100

----------------------------------------------------------------------
 .../apache/camel/component/jms/JmsBinding.java  |  5 +-
 .../camel/component/jms/JmsMessageHelper.java   | 46 +++++++++++-
 .../JmsInOnlyWithReplyToHeaderTopicTest.java    | 74 ++++++++++++++++++++
 3 files changed, 120 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/14532266/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index 22fa523..a6fe7f2 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -319,8 +319,9 @@ public class JmsBinding {
                 jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
             } else if (headerName.equals("JMSReplyTo") && headerValue != null) {
                 if (headerValue instanceof String) {
-                    // if the value is a String we must normalize it first
-                    headerValue = normalizeDestinationName((String) headerValue);
+                    // if the value is a String we must normalize it first, and must include the prefix
+                    // as ActiveMQ requires that when converting the String to a javax.jms.Destination type
+                    headerValue = normalizeDestinationName((String) headerValue, true);
                 }
                 Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue);
                 JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo);

http://git-wip-us.apache.org/repos/asf/camel/blob/14532266/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
index 8b8f15b..52f6af5 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
@@ -30,6 +30,8 @@ import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 
 import static org.apache.camel.component.jms.JmsConfiguration.QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TEMP_QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TEMP_TOPIC_PREFIX;
 import static org.apache.camel.component.jms.JmsConfiguration.TOPIC_PREFIX;
 import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
 
@@ -169,19 +171,57 @@ public final class JmsMessageHelper {
     }
 
     /**
-     * Normalizes the destination name, by removing any leading queue or topic prefixes.
+     * Normalizes the destination name.
+     * <p/>
+     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+     * was intended as <tt>queue://foo</tt>.
      *
      * @param destination the destination
      * @return the normalized destination
      */
     public static String normalizeDestinationName(String destination) {
+        // do not include prefix which is the current behavior when using this method.
+        return normalizeDestinationName(destination, false);
+    }
+
+    /**
+     * Normalizes the destination name.
+     * <p/>
+     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+     * was intended as <tt>queue://foo</tt>.
+     *
+     * @param destination the destination
+     * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name
+     * @return the normalized destination
+     */
+    public static String normalizeDestinationName(String destination, boolean includePrefix) {
         if (ObjectHelper.isEmpty(destination)) {
             return destination;
         }
         if (destination.startsWith(QUEUE_PREFIX)) {
-            return removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+            String s = removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = QUEUE_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(TEMP_QUEUE_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(TEMP_QUEUE_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = TEMP_QUEUE_PREFIX + "//" + s;
+            }
+            return s;
         } else if (destination.startsWith(TOPIC_PREFIX)) {
-            return removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+            String s = removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = TOPIC_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(TEMP_TOPIC_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(TEMP_TOPIC_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = TEMP_TOPIC_PREFIX + "//" + s;
+            }
+            return s;
         } else {
             return destination;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/14532266/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
new file mode 100644
index 0000000..bb30ab1
--- /dev/null
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ * @version 
+ */
+public class JmsInOnlyWithReplyToHeaderTopicTest extends CamelTestSupport {
+
+    @Test
+    public void testJmsInOnlyWithReplyToHeader() throws Exception {
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedHeaderReceived("JMSReplyTo", "topic://bar");
+
+        template.send("activemq:queue:foo?preserveMessageQos=true", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("World");
+                exchange.getIn().setHeader("JMSReplyTo", "topic:bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("activemq:queue:foo")
+                    .transform(body().prepend("Hello "))
+                    .to("mock:result");
+
+                from("activemq:topic:bar").to("mock:bar");
+            }
+        };
+    }
+}
\ No newline at end of file