You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/09/08 11:11:33 UTC

camel git commit: CAMEL-7813: camel-jms should not allow sending reply message to the same queue as consumed the message from as it causes an endless loop. Added option to turn off check in case people really know what they are doing.

Repository: camel
Updated Branches:
  refs/heads/master 6e18ca3ee -> 812fa060b


CAMEL-7813: camel-jms should not allow sending reply message to the same queue as consumed the message from as it causes an endless loop. Added option to turn off check in case people really know what they are doing.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/812fa060
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/812fa060
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/812fa060

Branch: refs/heads/master
Commit: 812fa060bfeac5b320624b5d6d4833ac441d42c9
Parents: 6e18ca3
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Sep 8 10:49:33 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Sep 8 11:12:28 2015 +0200

----------------------------------------------------------------------
 .../component/jms/EndpointMessageListener.java  |  7 ++
 .../camel/component/jms/JmsConfiguration.java   | 18 ++++-
 .../apache/camel/component/jms/JmsEndpoint.java | 20 +++++
 .../jms/issues/JmsReplyToLoopIssueTest.java     | 79 ++++++++++++++++++++
 4 files changed, 122 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/812fa060/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
index 3980128..b21b0a7 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
@@ -77,6 +77,13 @@ public class EndpointMessageListener implements SessionAwareMessageListener {
             // and disableReplyTo hasn't been explicit enabled
             sendReply = replyDestination != null && !disableReplyTo;
 
+            // we should also not send back reply to ourself if this destination and replyDestination is the same
+            Destination destination = message.getJMSDestination();
+            if (sendReply && !endpoint.isReplyToSameDestinationAllowed() && destination.equals(replyDestination)) {
+                LOG.debug("JMSDestination and JMSReplyTo is the same, will skip sending a reply message to itself: {}", destination);
+                sendReply = false;
+            }
+
             final Exchange exchange = createExchange(message, session, replyDestination);
             if (eagerLoadingOfProperties) {
                 exchange.getIn().getHeaders();

http://git-wip-us.apache.org/repos/asf/camel/blob/812fa060/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index fa9d38e..9ba497b 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -171,6 +171,8 @@ public class JmsConfiguration implements Cloneable {
     private String replyToDestinationSelectorName;
     @UriParam
     private String replyToOverride;
+    @UriParam(label = "consumer")
+    private boolean replyToSameDestinationAllowed;
     @UriParam(enums = "Bytes,Map,Object,Stream,Text")
     private JmsMessageType jmsMessageType;
     @UriParam
@@ -1438,13 +1440,25 @@ public class JmsConfiguration implements Cloneable {
     }
 
     /**
-     *  Provides an explicit ReplyTo destination in the JMS message, which overrides the setting of replyTo.
-     *  It is useful if you want to forward the message to a remote Queue and receive the reply message from the ReplyTo destination.
+     * Provides an explicit ReplyTo destination in the JMS message, which overrides the setting of replyTo.
+     * It is useful if you want to forward the message to a remote Queue and receive the reply message from the ReplyTo destination.
      */
     public void setReplyToOverride(String replyToDestination) {
         this.replyToOverride = normalizeDestinationName(replyToDestination);
     }
 
+    public boolean isReplyToSameDestinationAllowed() {
+        return replyToSameDestinationAllowed;
+    }
+
+    /**
+     * Whether a JMS consumer is allowed to send a reply message to the same destination that the consumer is using to
+     * consume from. This prevents an endless loop by consuming and sending back the same message to itself.
+     */
+    public void setReplyToSameDestinationAllowed(boolean replyToSameDestinationAllowed) {
+        this.replyToSameDestinationAllowed = replyToSameDestinationAllowed;
+    }
+
     public JmsMessageType getJmsMessageType() {
         return jmsMessageType;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/812fa060/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index aa7f530..c29e081 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -649,6 +649,16 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
     }
 
     @ManagedAttribute
+    public String getReplyToOverride() {
+        return getConfiguration().getReplyToOverride();
+    }
+
+    @ManagedAttribute
+    public boolean isReplyToSameDestinationAllowed() {
+        return getConfiguration().isReplyToSameDestinationAllowed();
+    }
+
+    @ManagedAttribute
     public String getReplyToDestinationSelectorName() {
         return getConfiguration().getReplyToDestinationSelectorName();
     }
@@ -979,6 +989,16 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
     }
 
     @ManagedAttribute
+    public void setReplyToOverride(String replyToDestination) {
+        getConfiguration().setReplyToOverride(replyToDestination);
+    }
+
+    @ManagedAttribute
+    public void setReplyToSameDestinationAllowed(boolean replyToSameDestinationAllowed) {
+        getConfiguration().setReplyToSameDestinationAllowed(replyToSameDestinationAllowed);
+    }
+
+    @ManagedAttribute
     public void setReplyToDeliveryPersistent(boolean replyToDeliveryPersistent) {
         getConfiguration().setReplyToDeliveryPersistent(replyToDeliveryPersistent);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/812fa060/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsReplyToLoopIssueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsReplyToLoopIssueTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsReplyToLoopIssueTest.java
new file mode 100644
index 0000000..059c1cf
--- /dev/null
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsReplyToLoopIssueTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.issues;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jms.CamelJmsTestHelper;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ * @version 
+ */
+public class JmsReplyToLoopIssueTest extends CamelTestSupport {
+
+    @Test
+    public void testReplyToLoopIssue() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("World");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:done").expectedBodiesReceived("World");
+
+        template.sendBodyAndHeader("direct:start", "World", "JMSReplyTo", "queue:bar");
+
+        // sleep a little to ensure we do not do endless loop
+        Thread.sleep(250);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        // must enable preserveMessageQos to force Camel to use the JMSReplyTo header
+                        .to("activemq:queue:foo?preserveMessageQos=true")
+                        .to("mock:done");
+
+                from("activemq:queue:foo")
+                        .to("log:foo?showAll=true", "mock:foo")
+                        .transform(body().prepend("Bye "));
+
+                from("activemq:queue:bar")
+                        .to("log:bar?showAll=true", "mock:bar");
+            }
+        };
+    }
+
+}
\ No newline at end of file