You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/01/04 14:18:49 UTC
[1/3] git commit: CAMEL-7049: Fixed jms JMSReplyTo header with a
topic did not work correctly.
Updated Branches:
refs/heads/camel-2.11.x 12f1adf20 -> 145322664
refs/heads/camel-2.12.x 0d429cdc0 -> 0e4b1458f
refs/heads/master f6715cb0d -> 10a5f1b3a
CAMEL-7049: Fixed jms JMSReplyTo header with a topic did not work correctly.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/10a5f1b3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/10a5f1b3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/10a5f1b3
Branch: refs/heads/master
Commit: 10a5f1b3aab6bf9b73fb68de616235f75de8aefc
Parents: f6715cb
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Jan 4 14:15:33 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Jan 4 14:18:46 2014 +0100
----------------------------------------------------------------------
.../apache/camel/component/jms/JmsBinding.java | 5 +-
.../camel/component/jms/JmsMessageHelper.java | 46 +++++++++++-
.../JmsInOnlyWithReplyToHeaderTopicTest.java | 74 ++++++++++++++++++++
3 files changed, 120 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/10a5f1b3/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index 9cb393a..b86e362 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -319,8 +319,9 @@ public class JmsBinding {
jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
} else if (headerName.equals("JMSReplyTo") && headerValue != null) {
if (headerValue instanceof String) {
- // if the value is a String we must normalize it first
- headerValue = normalizeDestinationName((String) headerValue);
+ // if the value is a String we must normalize it first, and must include the prefix
+ // as ActiveMQ requires that when converting the String to a javax.jms.Destination type
+ headerValue = normalizeDestinationName((String) headerValue, true);
}
Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue);
JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo);
http://git-wip-us.apache.org/repos/asf/camel/blob/10a5f1b3/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
index 8b8f15b..52f6af5 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
@@ -30,6 +30,8 @@ import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import static org.apache.camel.component.jms.JmsConfiguration.QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TEMP_QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TEMP_TOPIC_PREFIX;
import static org.apache.camel.component.jms.JmsConfiguration.TOPIC_PREFIX;
import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
@@ -169,19 +171,57 @@ public final class JmsMessageHelper {
}
/**
- * Normalizes the destination name, by removing any leading queue or topic prefixes.
+ * Normalizes the destination name.
+ * <p/>
+ * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+ * was intended as <tt>queue://foo</tt>.
*
* @param destination the destination
* @return the normalized destination
*/
public static String normalizeDestinationName(String destination) {
+ // do not include prefix which is the current behavior when using this method.
+ return normalizeDestinationName(destination, false);
+ }
+
+ /**
+ * Normalizes the destination name.
+ * <p/>
+ * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+ * was intended as <tt>queue://foo</tt>.
+ *
+ * @param destination the destination
+ * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name
+ * @return the normalized destination
+ */
+ public static String normalizeDestinationName(String destination, boolean includePrefix) {
if (ObjectHelper.isEmpty(destination)) {
return destination;
}
if (destination.startsWith(QUEUE_PREFIX)) {
- return removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+ String s = removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = QUEUE_PREFIX + "//" + s;
+ }
+ return s;
+ } else if (destination.startsWith(TEMP_QUEUE_PREFIX)) {
+ String s = removeStartingCharacters(destination.substring(TEMP_QUEUE_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = TEMP_QUEUE_PREFIX + "//" + s;
+ }
+ return s;
} else if (destination.startsWith(TOPIC_PREFIX)) {
- return removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+ String s = removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = TOPIC_PREFIX + "//" + s;
+ }
+ return s;
+ } else if (destination.startsWith(TEMP_TOPIC_PREFIX)) {
+ String s = removeStartingCharacters(destination.substring(TEMP_TOPIC_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = TEMP_TOPIC_PREFIX + "//" + s;
+ }
+ return s;
} else {
return destination;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/10a5f1b3/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
new file mode 100644
index 0000000..bb30ab1
--- /dev/null
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ * @version
+ */
+public class JmsInOnlyWithReplyToHeaderTopicTest extends CamelTestSupport {
+
+ @Test
+ public void testJmsInOnlyWithReplyToHeader() throws Exception {
+ getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+ mock.expectedHeaderReceived("JMSReplyTo", "topic://bar");
+
+ template.send("activemq:queue:foo?preserveMessageQos=true", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody("World");
+ exchange.getIn().setHeader("JMSReplyTo", "topic:bar");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
+ camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("activemq:queue:foo")
+ .transform(body().prepend("Hello "))
+ .to("mock:result");
+
+ from("activemq:topic:bar").to("mock:bar");
+ }
+ };
+ }
+}
\ No newline at end of file
[2/3] git commit: CAMEL-7049: Fixed jms JMSReplyTo header with a
topic did not work correctly.
Posted by da...@apache.org.
CAMEL-7049: Fixed jms JMSReplyTo header with a topic did not work correctly.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0e4b1458
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0e4b1458
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0e4b1458
Branch: refs/heads/camel-2.12.x
Commit: 0e4b1458f2d0e341d07c37c1d480a501b251a6dc
Parents: 0d429cd
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Jan 4 14:15:33 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Jan 4 14:22:04 2014 +0100
----------------------------------------------------------------------
.../apache/camel/component/jms/JmsBinding.java | 5 +-
.../camel/component/jms/JmsMessageHelper.java | 46 +++++++++++-
.../JmsInOnlyWithReplyToHeaderTopicTest.java | 74 ++++++++++++++++++++
3 files changed, 120 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0e4b1458/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index 9cb393a..b86e362 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -319,8 +319,9 @@ public class JmsBinding {
jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
} else if (headerName.equals("JMSReplyTo") && headerValue != null) {
if (headerValue instanceof String) {
- // if the value is a String we must normalize it first
- headerValue = normalizeDestinationName((String) headerValue);
+ // if the value is a String we must normalize it first, and must include the prefix
+ // as ActiveMQ requires that when converting the String to a javax.jms.Destination type
+ headerValue = normalizeDestinationName((String) headerValue, true);
}
Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue);
JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo);
http://git-wip-us.apache.org/repos/asf/camel/blob/0e4b1458/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
index 8b8f15b..52f6af5 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
@@ -30,6 +30,8 @@ import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import static org.apache.camel.component.jms.JmsConfiguration.QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TEMP_QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TEMP_TOPIC_PREFIX;
import static org.apache.camel.component.jms.JmsConfiguration.TOPIC_PREFIX;
import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
@@ -169,19 +171,57 @@ public final class JmsMessageHelper {
}
/**
- * Normalizes the destination name, by removing any leading queue or topic prefixes.
+ * Normalizes the destination name.
+ * <p/>
+ * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+ * was intended as <tt>queue://foo</tt>.
*
* @param destination the destination
* @return the normalized destination
*/
public static String normalizeDestinationName(String destination) {
+ // do not include prefix which is the current behavior when using this method.
+ return normalizeDestinationName(destination, false);
+ }
+
+ /**
+ * Normalizes the destination name.
+ * <p/>
+ * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+ * was intended as <tt>queue://foo</tt>.
+ *
+ * @param destination the destination
+ * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name
+ * @return the normalized destination
+ */
+ public static String normalizeDestinationName(String destination, boolean includePrefix) {
if (ObjectHelper.isEmpty(destination)) {
return destination;
}
if (destination.startsWith(QUEUE_PREFIX)) {
- return removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+ String s = removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = QUEUE_PREFIX + "//" + s;
+ }
+ return s;
+ } else if (destination.startsWith(TEMP_QUEUE_PREFIX)) {
+ String s = removeStartingCharacters(destination.substring(TEMP_QUEUE_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = TEMP_QUEUE_PREFIX + "//" + s;
+ }
+ return s;
} else if (destination.startsWith(TOPIC_PREFIX)) {
- return removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+ String s = removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = TOPIC_PREFIX + "//" + s;
+ }
+ return s;
+ } else if (destination.startsWith(TEMP_TOPIC_PREFIX)) {
+ String s = removeStartingCharacters(destination.substring(TEMP_TOPIC_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = TEMP_TOPIC_PREFIX + "//" + s;
+ }
+ return s;
} else {
return destination;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0e4b1458/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
new file mode 100644
index 0000000..bb30ab1
--- /dev/null
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ * @version
+ */
+public class JmsInOnlyWithReplyToHeaderTopicTest extends CamelTestSupport {
+
+ @Test
+ public void testJmsInOnlyWithReplyToHeader() throws Exception {
+ getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+ mock.expectedHeaderReceived("JMSReplyTo", "topic://bar");
+
+ template.send("activemq:queue:foo?preserveMessageQos=true", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody("World");
+ exchange.getIn().setHeader("JMSReplyTo", "topic:bar");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
+ camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("activemq:queue:foo")
+ .transform(body().prepend("Hello "))
+ .to("mock:result");
+
+ from("activemq:topic:bar").to("mock:bar");
+ }
+ };
+ }
+}
\ No newline at end of file
[3/3] git commit: CAMEL-7049: Fixed jms JMSReplyTo header with a
topic did not work correctly.
Posted by da...@apache.org.
CAMEL-7049: Fixed jms JMSReplyTo header with a topic did not work correctly.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/14532266
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/14532266
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/14532266
Branch: refs/heads/camel-2.11.x
Commit: 1453226640489436d4f15c051ab3a6a4ad537aff
Parents: 12f1adf
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Jan 4 14:15:33 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Jan 4 14:22:21 2014 +0100
----------------------------------------------------------------------
.../apache/camel/component/jms/JmsBinding.java | 5 +-
.../camel/component/jms/JmsMessageHelper.java | 46 +++++++++++-
.../JmsInOnlyWithReplyToHeaderTopicTest.java | 74 ++++++++++++++++++++
3 files changed, 120 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/14532266/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index 22fa523..a6fe7f2 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -319,8 +319,9 @@ public class JmsBinding {
jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
} else if (headerName.equals("JMSReplyTo") && headerValue != null) {
if (headerValue instanceof String) {
- // if the value is a String we must normalize it first
- headerValue = normalizeDestinationName((String) headerValue);
+ // if the value is a String we must normalize it first, and must include the prefix
+ // as ActiveMQ requires that when converting the String to a javax.jms.Destination type
+ headerValue = normalizeDestinationName((String) headerValue, true);
}
Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue);
JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo);
http://git-wip-us.apache.org/repos/asf/camel/blob/14532266/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
index 8b8f15b..52f6af5 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
@@ -30,6 +30,8 @@ import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import static org.apache.camel.component.jms.JmsConfiguration.QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TEMP_QUEUE_PREFIX;
+import static org.apache.camel.component.jms.JmsConfiguration.TEMP_TOPIC_PREFIX;
import static org.apache.camel.component.jms.JmsConfiguration.TOPIC_PREFIX;
import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
@@ -169,19 +171,57 @@ public final class JmsMessageHelper {
}
/**
- * Normalizes the destination name, by removing any leading queue or topic prefixes.
+ * Normalizes the destination name.
+ * <p/>
+ * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+ * was intended as <tt>queue://foo</tt>.
*
* @param destination the destination
* @return the normalized destination
*/
public static String normalizeDestinationName(String destination) {
+ // do not include prefix which is the current behavior when using this method.
+ return normalizeDestinationName(destination, false);
+ }
+
+ /**
+ * Normalizes the destination name.
+ * <p/>
+ * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+ * was intended as <tt>queue://foo</tt>.
+ *
+ * @param destination the destination
+ * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name
+ * @return the normalized destination
+ */
+ public static String normalizeDestinationName(String destination, boolean includePrefix) {
if (ObjectHelper.isEmpty(destination)) {
return destination;
}
if (destination.startsWith(QUEUE_PREFIX)) {
- return removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+ String s = removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = QUEUE_PREFIX + "//" + s;
+ }
+ return s;
+ } else if (destination.startsWith(TEMP_QUEUE_PREFIX)) {
+ String s = removeStartingCharacters(destination.substring(TEMP_QUEUE_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = TEMP_QUEUE_PREFIX + "//" + s;
+ }
+ return s;
} else if (destination.startsWith(TOPIC_PREFIX)) {
- return removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+ String s = removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = TOPIC_PREFIX + "//" + s;
+ }
+ return s;
+ } else if (destination.startsWith(TEMP_TOPIC_PREFIX)) {
+ String s = removeStartingCharacters(destination.substring(TEMP_TOPIC_PREFIX.length()), '/');
+ if (includePrefix) {
+ s = TEMP_TOPIC_PREFIX + "//" + s;
+ }
+ return s;
} else {
return destination;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/14532266/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
new file mode 100644
index 0000000..bb30ab1
--- /dev/null
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyWithReplyToHeaderTopicTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ * @version
+ */
+public class JmsInOnlyWithReplyToHeaderTopicTest extends CamelTestSupport {
+
+ @Test
+ public void testJmsInOnlyWithReplyToHeader() throws Exception {
+ getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+ mock.expectedHeaderReceived("JMSReplyTo", "topic://bar");
+
+ template.send("activemq:queue:foo?preserveMessageQos=true", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody("World");
+ exchange.getIn().setHeader("JMSReplyTo", "topic:bar");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
+ camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
+ return camelContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("activemq:queue:foo")
+ .transform(body().prepend("Hello "))
+ .to("mock:result");
+
+ from("activemq:topic:bar").to("mock:bar");
+ }
+ };
+ }
+}
\ No newline at end of file