You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/12/04 14:10:01 UTC

[camel] 01/08: CAMEL-12869: ReplyTo destination must match endpoint type (topic or queue) that the message is sent on

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b1e30af08e4a6ee56e04a124e5c281da307b948e
Author: Dmitry Volodin <dm...@gmail.com>
AuthorDate: Thu Nov 15 18:51:21 2018 +0300

    CAMEL-12869: ReplyTo destination must match endpoint type (topic or
    queue) that the message is sent on
---
 components/camel-sjms/src/main/docs/sjms-component.adoc     |  2 +-
 .../java/org/apache/camel/component/sjms/SjmsEndpoint.java  |  2 ++
 .../camel/component/sjms/jms/DestinationNameParser.java     | 13 +++++++++++++
 .../apache/camel/component/sjms/producer/InOutProducer.java | 11 ++++++++---
 4 files changed, 24 insertions(+), 4 deletions(-)

diff --git a/components/camel-sjms/src/main/docs/sjms-component.adoc b/components/camel-sjms/src/main/docs/sjms-component.adoc
index 6e9ad12..bff4665 100644
--- a/components/camel-sjms/src/main/docs/sjms-component.adoc
+++ b/components/camel-sjms/src/main/docs/sjms-component.adoc
@@ -150,7 +150,7 @@ with the following path and query parameters:
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | *messageSelector* (consumer) | Sets the JMS Message selector syntax. |  | String
-| *namedReplyTo* (producer) | Sets the reply to destination name used for InOut producer endpoints. |  | String
+| *namedReplyTo* (producer) | Sets the reply to destination name used for InOut producer endpoints. The type of the reply to destination can be determined by the starting prefix (topic: or queue:) in its name. |  | String
 | *persistent* (producer) | Flag used to enable/disable message persistence. | true | boolean
 | *producerCount* (producer) | Sets the number of producers used for this endpoint. | 1 | int
 | *ttl* (producer) | Flag used to adjust the Time To Live value of produced messages. | -1 | long
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 42d35fd..b593f9c 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -563,6 +563,8 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
 
     /**
      * Sets the reply to destination name used for InOut producer endpoints.
+     * The type of the reply to destination can be determined by the starting 
+     * prefix (topic: or queue:) in its name. 
      */
     public void setNamedReplyTo(String namedReplyTo) {
         this.namedReplyTo = namedReplyTo;
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
index 41eab2d..095d1c9 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
@@ -27,6 +27,19 @@ public class DestinationNameParser {
         }
         return destinationName.startsWith("topic:");
     }
+    
+    public boolean isNamedReplyToTopic(String namedReplyTo, boolean isDestinationTopic) {
+        if (namedReplyTo == null) {
+            throw new IllegalArgumentException("namedReplyTo is null");
+        }
+        if (namedReplyTo.startsWith("topic:")) {
+            return true;
+        } else if (namedReplyTo.startsWith("queue:")) {
+            return false;
+        } else {
+            return isDestinationTopic;
+        }
+    }
 
     public String getShortName(String destinationName) {
         if (destinationName == null) {
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index 36fc732..3f09738 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -38,6 +38,7 @@ import org.apache.camel.component.sjms.SjmsEndpoint;
 import org.apache.camel.component.sjms.SjmsMessage;
 import org.apache.camel.component.sjms.SjmsProducer;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
+import org.apache.camel.component.sjms.jms.DestinationNameParser;
 import org.apache.camel.component.sjms.jms.JmsConstants;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.spi.UuidGenerator;
@@ -87,12 +88,16 @@ public class InOutProducer extends SjmsProducer {
                 }
 
                 Destination replyToDestination;
+                boolean isReplyToTopic = false;
                 if (ObjectHelper.isEmpty(getNamedReplyTo())) {
-                    replyToDestination = getEndpoint().getDestinationCreationStrategy().createTemporaryDestination(session, isTopic());
+                    isReplyToTopic = isTopic();
+                    replyToDestination = getEndpoint().getDestinationCreationStrategy().createTemporaryDestination(session, isReplyToTopic);
                 } else {
-                    replyToDestination = getEndpoint().getDestinationCreationStrategy().createDestination(session, getNamedReplyTo(), isTopic());
+                    DestinationNameParser parser = new DestinationNameParser();
+                    isReplyToTopic = parser.isNamedReplyToTopic(getNamedReplyTo(), isTopic());
+                    replyToDestination = getEndpoint().getDestinationCreationStrategy().createDestination(session, getNamedReplyTo(), isReplyToTopic);
                 }
-                MessageConsumer messageConsumer = getEndpoint().getJmsObjectFactory().createMessageConsumer(session, replyToDestination, null, isTopic(), null, true, false, false);
+                MessageConsumer messageConsumer = getEndpoint().getJmsObjectFactory().createMessageConsumer(session, replyToDestination, null, isReplyToTopic, null, true, false, false);
                 messageConsumer.setMessageListener(new MessageListener() {
                     @Override
                     public void onMessage(final Message message) {