You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dm...@apache.org on 2018/11/16 09:04:25 UTC

[camel] branch master updated: CAMEL-12869: ReplyTo destination must match endpoint type (topic or queue) that the message is sent on

This is an automated email from the ASF dual-hosted git repository.

dmvolod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new f3a1cb5  CAMEL-12869: ReplyTo destination must match endpoint type (topic or queue) that the message is sent on
f3a1cb5 is described below

commit f3a1cb5492d897b24b45c8e9bd583f5d4259313b
Author: Dmitry Volodin <dm...@gmail.com>
AuthorDate: Thu Nov 15 18:51:21 2018 +0300

    CAMEL-12869: ReplyTo destination must match endpoint type (topic or
    queue) that the message is sent on
---
 components/camel-sjms/src/main/docs/sjms-component.adoc     |  2 +-
 .../java/org/apache/camel/component/sjms/SjmsEndpoint.java  |  2 ++
 .../camel/component/sjms/jms/DestinationNameParser.java     | 13 +++++++++++++
 .../apache/camel/component/sjms/producer/InOutProducer.java | 11 ++++++++---
 4 files changed, 24 insertions(+), 4 deletions(-)

diff --git a/components/camel-sjms/src/main/docs/sjms-component.adoc b/components/camel-sjms/src/main/docs/sjms-component.adoc
index 6e9ad12..bff4665 100644
--- a/components/camel-sjms/src/main/docs/sjms-component.adoc
+++ b/components/camel-sjms/src/main/docs/sjms-component.adoc
@@ -150,7 +150,7 @@ with the following path and query parameters:
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | *messageSelector* (consumer) | Sets the JMS Message selector syntax. |  | String
-| *namedReplyTo* (producer) | Sets the reply to destination name used for InOut producer endpoints. |  | String
+| *namedReplyTo* (producer) | Sets the reply to destination name used for InOut producer endpoints. The type of the reply to destination can be determined by the starting prefix (topic: or queue:) in its name. |  | String
 | *persistent* (producer) | Flag used to enable/disable message persistence. | true | boolean
 | *producerCount* (producer) | Sets the number of producers used for this endpoint. | 1 | int
 | *ttl* (producer) | Flag used to adjust the Time To Live value of produced messages. | -1 | long
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index c73ebb0..faa01d0 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -563,6 +563,8 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
 
     /**
      * Sets the reply to destination name used for InOut producer endpoints.
+     * The type of the reply to destination can be determined by the starting 
+     * prefix (topic: or queue:) in its name. 
      */
     public void setNamedReplyTo(String namedReplyTo) {
         this.namedReplyTo = namedReplyTo;
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
index 41eab2d..095d1c9 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
@@ -27,6 +27,19 @@ public class DestinationNameParser {
         }
         return destinationName.startsWith("topic:");
     }
+    
+    public boolean isNamedReplyToTopic(String namedReplyTo, boolean isDestinationTopic) {
+        if (namedReplyTo == null) {
+            throw new IllegalArgumentException("namedReplyTo is null");
+        }
+        if (namedReplyTo.startsWith("topic:")) {
+            return true;
+        } else if (namedReplyTo.startsWith("queue:")) {
+            return false;
+        } else {
+            return isDestinationTopic;
+        }
+    }
 
     public String getShortName(String destinationName) {
         if (destinationName == null) {
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index 1be3630..53be09e 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -38,6 +38,7 @@ import org.apache.camel.component.sjms.SjmsEndpoint;
 import org.apache.camel.component.sjms.SjmsMessage;
 import org.apache.camel.component.sjms.SjmsProducer;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
+import org.apache.camel.component.sjms.jms.DestinationNameParser;
 import org.apache.camel.component.sjms.jms.JmsConstants;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.spi.UuidGenerator;
@@ -87,12 +88,16 @@ public class InOutProducer extends SjmsProducer {
                 }
 
                 Destination replyToDestination;
+                boolean isReplyToTopic = false;
                 if (ObjectHelper.isEmpty(getNamedReplyTo())) {
-                    replyToDestination = getEndpoint().getDestinationCreationStrategy().createTemporaryDestination(session, isTopic());
+                    isReplyToTopic = isTopic();
+                    replyToDestination = getEndpoint().getDestinationCreationStrategy().createTemporaryDestination(session, isReplyToTopic);
                 } else {
-                    replyToDestination = getEndpoint().getDestinationCreationStrategy().createDestination(session, getNamedReplyTo(), isTopic());
+                    DestinationNameParser parser = new DestinationNameParser();
+                    isReplyToTopic = parser.isNamedReplyToTopic(getNamedReplyTo(), isTopic());
+                    replyToDestination = getEndpoint().getDestinationCreationStrategy().createDestination(session, getNamedReplyTo(), isReplyToTopic);
                 }
-                MessageConsumer messageConsumer = getEndpoint().getJmsObjectFactory().createMessageConsumer(session, replyToDestination, null, isTopic(), null, true, false, false);
+                MessageConsumer messageConsumer = getEndpoint().getJmsObjectFactory().createMessageConsumer(session, replyToDestination, null, isReplyToTopic, null, true, false, false);
                 messageConsumer.setMessageListener(new MessageListener() {
                     @Override
                     public void onMessage(final Message message) {