You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/01/19 19:18:31 UTC

[2/3] camel git commit: CAMEL-10726 Correlation of JMS InOut exchanges ...

CAMEL-10726 Correlation of JMS InOut exchanges ...

...with custom JMS property

This commit adds a new configuration option `correlationProperty` if set
its value will be used instead of `JMSCorrelationID` JMS message
property to correlate messages in InOut (request-reply) exchange.

Also, if this `correlationProperty` is set, `JMSCorrelationID` will not
be consulted on the incoming message or set on the outgoing message.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8c4a1cc1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8c4a1cc1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8c4a1cc1

Branch: refs/heads/master
Commit: 8c4a1cc1e392327505f04bc69ae3dcfbd76ad51b
Parents: 4b61cac
Author: Zoran Regvart <zo...@regvart.com>
Authored: Thu Jan 19 14:45:01 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jan 19 19:24:30 2017 +0100

----------------------------------------------------------------------
 .../camel-jms/src/main/docs/jms-component.adoc  |  5 +-
 .../camel/component/jms/JmsConfiguration.java   | 20 +++++
 .../apache/camel/component/jms/JmsProducer.java | 42 ++++++++--
 .../camel/component/jms/reply/ReplyManager.java |  6 ++
 .../jms/reply/ReplyManagerSupport.java          | 14 +++-
 ...uestReplyCorrelatedWithCustomHeaderTest.java | 82 ++++++++++++++++++++
 .../springboot/JmsComponentConfiguration.java   | 18 +++++
 7 files changed, 176 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8c4a1cc1/components/camel-jms/src/main/docs/jms-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/docs/jms-component.adoc b/components/camel-jms/src/main/docs/jms-component.adoc
index 1596260..a59c32c 100644
--- a/components/camel-jms/src/main/docs/jms-component.adoc
+++ b/components/camel-jms/src/main/docs/jms-component.adoc
@@ -301,7 +301,7 @@ The JMS component supports 74 options which are listed below.
 
 
 // endpoint options: START
-The JMS component supports 86 endpoint options which are listed below:
+The JMS component supports 87 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -350,6 +350,7 @@ The JMS component supports 86 endpoint options which are listed below:
 | timeToLive | producer | -1 | long | When sending messages specifies the time-to-live of the message (in milliseconds).
 | allowNullBody | producer (advanced) | true | boolean | Whether to allow sending messages with no body. If this option is false and the message body is null then an JMSException is thrown.
 | alwaysCopyMessage | producer (advanced) | false | boolean | If true Camel will always make a JMS message copy of the message when it is passed to the producer for sending. Copying the message is needed in some situations such as when a replyToDestinationSelectorName is set (incidentally Camel will set the alwaysCopyMessage option to true if a replyToDestinationSelectorName is set)
+| correlationProperty | producer (advanced) |  | String | Use this JMS property to correlate messages in InOut exchange pattern (request-reply) instead of JMSCorrelationID property. This allows you to exchange messages with systems that do not correlate messages using JMSCorrelationID JMS property. If used JMSCorrelationID will not be used or set by Camel. The value of here named property will be generated if not supplied in the header of the message under the same name.
 | disableTimeToLive | producer (advanced) | false | boolean | Use this option to force disabling time to live. For example when you do request/reply over JMS then Camel will by default use the requestTimeout value as time to live on the message being sent. The problem is that the sender and receiver systems have to have their clocks synchronized so they are in sync. This is not always so easy to archive. So you can use disableTimeToLive=true to not set a time to live value on the sent message. Then the message will not expire on the receiver system. See below in section About time to live for more details.
 | forceSendOriginalMessage | producer (advanced) | false | boolean | When using mapJmsMessage=false Camel will create a new JMS message to send to a new JMS destination if you touch the headers (get or set) during the route. Set this option to true to force Camel to send the original JMS message that was received.
 | includeSentJMSMessageID | producer (advanced) | false | boolean | Only applicable when sending to JMS destination using InOnly (eg fire and forget). Enabling this option will enrich the Camel Exchange with the actual JMSMessageID that was used by the JMS client when the message was sent to the JMS destination.
@@ -1354,4 +1355,4 @@ wmq.setDestinationResolver(new DestinationResolver() {
 * link:bean-integration.html[Bean Integration]
 * link:tutorial-jmsremoting.html[Tutorial-JmsRemoting]
 * http://activemq.apache.org/jmstemplate-gotchas.html[JMSTemplate
-gotchas]
\ No newline at end of file
+gotchas]

http://git-wip-us.apache.org/repos/asf/camel/blob/8c4a1cc1/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index 8d297b8..8fc2afd 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -446,6 +446,11 @@ public class JmsConfiguration implements Cloneable {
     @UriParam(label = "advanced",
             description = "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message.")
     private MessageCreatedStrategy messageCreatedStrategy;
+    @UriParam(label = "producer,advanced",
+            description = "When using InOut exchange pattern use this JMS property instead of JMSCorrelationID"
+                    + " JMS property to correlate messages. If set messages will be correlated solely on the"
+                    + " value of this property JMSCorrelationID property will be ignored and not set by Camel.")
+    private String correlationProperty;
 
     public JmsConfiguration() {
     }
@@ -2082,4 +2087,19 @@ public class JmsConfiguration implements Cloneable {
     public void setSelector(String selector) {
         this.selector = selector;
     }
+
+    /**
+     * Use this JMS property to correlate messages in InOut exchange pattern (request-reply)
+     * instead of JMSCorrelationID property. This allows you to exchange messages with 
+     * systems that do not correlate messages using JMSCorrelationID JMS property. If used
+     * JMSCorrelationID will not be used or set by Camel. The value of here named property
+     * will be generated if not supplied in the header of the message under the same name.
+     */
+    public void setCorrelationProperty(final String correlationProperty) {
+        this.correlationProperty = correlationProperty;
+    }
+
+    public String getCorrelationProperty() {
+        return correlationProperty;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8c4a1cc1/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
index 251e55f..d752ffa 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
@@ -20,6 +20,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.Optional.ofNullable;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -188,30 +191,37 @@ public class JmsProducer extends DefaultAsyncProducer {
         // the request timeout can be overruled by a header otherwise the endpoint configured value is used
         final long timeout = exchange.getIn().getHeader(JmsConstants.JMS_REQUEST_TIMEOUT, endpoint.getRequestTimeout(), long.class);
 
+        final JmsConfiguration configuration = endpoint.getConfiguration();
+
         // when using message id as correlation id, we need at first to use a provisional correlation id
         // which we then update to the real JMSMessageID when the message has been sent
         // this is done with the help of the MessageSentCallback
-        final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
+        final boolean msgIdAsCorrId = configuration.isUseMessageIDAsCorrelationID();
         final String provisionalCorrelationId = msgIdAsCorrId ? getUuidGenerator().generateUuid() : null;
         MessageSentCallback messageSentCallback = null;
         if (msgIdAsCorrId) {
             messageSentCallback = new UseMessageIdAsCorrelationIdMessageSentCallback(replyManager, provisionalCorrelationId, timeout);
         }
 
-        final String originalCorrelationId = in.getHeader("JMSCorrelationID", String.class);
+        final String correlationProperty = configuration.getCorrelationProperty();
+
+        final String correlationPropertyToUse = ofNullable(correlationProperty).orElse("JMSCorrelationID");
+
+        final String originalCorrelationId = in.getHeader(correlationPropertyToUse, String.class);
+
         boolean generateFreshCorrId = (ObjectHelper.isEmpty(originalCorrelationId) && !msgIdAsCorrId) 
                 || (originalCorrelationId != null && originalCorrelationId.startsWith(GENERATED_CORRELATION_ID_PREFIX));
         if (generateFreshCorrId) {
             // we append the 'Camel-' prefix to know it was generated by us
-            in.setHeader("JMSCorrelationID", GENERATED_CORRELATION_ID_PREFIX + getUuidGenerator().generateUuid());
+            in.setHeader(correlationPropertyToUse, GENERATED_CORRELATION_ID_PREFIX + getUuidGenerator().generateUuid());
         }
-        
+
         MessageCreator messageCreator = new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {
                 Message answer = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
 
                 Destination replyTo = null;
-                String replyToOverride = endpoint.getConfiguration().getReplyToOverride();
+                String replyToOverride = configuration.getReplyToOverride();
                 if (replyToOverride != null) {
                     replyTo = resolveOrCreateDestination(replyToOverride, session);
                 } else {
@@ -227,9 +237,13 @@ public class JmsProducer extends DefaultAsyncProducer {
                 String correlationId = determineCorrelationId(answer, provisionalCorrelationId);
                 replyManager.registerReply(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout);
 
+                if (correlationProperty != null) {
+                    replyManager.setCorrelationProperty(correlationProperty);
+                }
+
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Using JMSCorrelationID: {}, JMSReplyTo destination: {}, with request timeout: {} ms.",
-                           new Object[]{correlationId, replyTo, timeout});
+                    LOG.debug("Using {}: {}, JMSReplyTo destination: {}, with request timeout: {} ms.",
+                           new Object[]{correlationPropertyToUse, correlationId, replyTo, timeout});
                 }
 
                 LOG.trace("Created javax.jms.Message: {}", answer);
@@ -256,9 +270,21 @@ public class JmsProducer extends DefaultAsyncProducer {
             return provisionalCorrelationId;
         }
 
+        final JmsConfiguration configuration = endpoint.getConfiguration();
+        final String correlationProperty = configuration.getCorrelationProperty();
+
         final String messageId = message.getJMSMessageID();
         final String correlationId = message.getJMSCorrelationID();
-        if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
+        final String correlationPropertyValue;
+        if (correlationProperty == null) {
+            correlationPropertyValue = null;
+        } else {
+            correlationPropertyValue = message.getStringProperty(correlationProperty);
+        }
+
+        if (!ObjectHelper.isEmpty(correlationPropertyValue)) {
+            return correlationPropertyValue;
+        } else if (configuration.isUseMessageIDAsCorrelationID()) {
             return messageId;
         } else if (ObjectHelper.isEmpty(correlationId)) {
             // correlation id is empty so fallback to message id

http://git-wip-us.apache.org/repos/asf/camel/blob/8c4a1cc1/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
index f5d3beb..2c316a8 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
@@ -59,6 +59,12 @@ public interface ReplyManager extends SessionAwareMessageListener {
     void setOnTimeoutExecutorService(ExecutorService executorService);
 
     /**
+     * Sets the JMS message property used for message correlation. If set message correlation will be performed on the
+     * value of this JMS property, JMSCorrelationID will be ignored.
+     */
+    void setCorrelationProperty(String correlationProperty);
+
+    /**
      * Gets the reply to queue being used
      */
     Destination getReplyTo();

http://git-wip-us.apache.org/repos/asf/camel/blob/8c4a1cc1/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index bb81399..0e4231a 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -58,6 +58,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
     protected final CountDownLatch replyToLatch = new CountDownLatch(1);
     protected final long replyToTimeout = 10000;
     protected CorrelationTimeoutMap correlation;
+    protected String correlationProperty;
 
     public ReplyManagerSupport(CamelContext camelContext) {
         this.camelContext = camelContext;
@@ -82,6 +83,11 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
         replyToLatch.countDown();
     }
 
+    @Override
+    public void setCorrelationProperty(final String correlationProperty) {
+        this.correlationProperty = correlationProperty;
+    }
+
     public Destination getReplyTo() {
         if (replyTo != null) {
             return replyTo;
@@ -123,11 +129,17 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
 
     public void onMessage(Message message, Session session) throws JMSException {
         String correlationID = null;
+
         try {
-            correlationID = message.getJMSCorrelationID();
+            if (correlationProperty == null) {
+                correlationID = message.getJMSCorrelationID();
+            } else {
+                correlationID = message.getStringProperty(correlationProperty);
+            }
         } catch (JMSException e) {
             // ignore
         }
+
         if (correlationID == null) {
             log.warn("Ignoring message with no correlationID: {}", message);
             return;

http://git-wip-us.apache.org/repos/asf/camel/blob/8c4a1cc1/components/camel-jms/src/test/java/org/apache/camel/component/jms/RequestReplyCorrelatedWithCustomHeaderTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/RequestReplyCorrelatedWithCustomHeaderTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/RequestReplyCorrelatedWithCustomHeaderTest.java
new file mode 100644
index 0000000..d09a202
--- /dev/null
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/RequestReplyCorrelatedWithCustomHeaderTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.Body;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Header;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+public class RequestReplyCorrelatedWithCustomHeaderTest extends CamelTestSupport {
+
+    private ConnectionFactory connectionFactory;
+
+    public static void processRequest(@Body final String body,
+            @Header("CustomCorrelation") final String customCorrelation,
+            @Header("JMSCorrelationId") final String jmsCorrelationId, final Exchange exchange) throws Exception {
+        assertNotNull(customCorrelation);
+        assertNull(jmsCorrelationId);
+        exchange.getIn().setBody("Hi, " + body + ", " + customCorrelation);
+    }
+
+    @Test
+    public void shouldCorrelateRepliesWithCustomCorrelationProperty() throws Exception {
+        final String reply = template.requestBody("activemq:queue:request?correlationProperty=CustomCorrelation",
+                "Bobby", String.class);
+
+        assertTrue(reply.matches("Hi, Bobby, Camel-.*"));
+    }
+
+    @Test
+    public void shouldCorrelateRepliesWithCustomCorrelationPropertyAndValue() throws Exception {
+        final String reply = template.requestBodyAndHeader(
+                "activemq:queue:request?correlationProperty=CustomCorrelation", "Bobby", "CustomCorrelation",
+                "custom-id", String.class);
+
+        assertEquals("Hi, Bobby, custom-id", reply);
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        final CamelContext camelContext = super.createCamelContext();
+
+        connectionFactory = CamelJmsTestHelper.createConnectionFactory();
+
+        final JmsComponent activeMq = jmsComponentAutoAcknowledge(connectionFactory);
+
+        camelContext.addComponent("activemq", activeMq);
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("activemq:queue:request").bean(RequestReplyCorrelatedWithCustomHeaderTest.class, "processRequest");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8c4a1cc1/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java
index d367b3a..18086d2 100644
--- a/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java
@@ -1775,6 +1775,16 @@ public class JmsComponentConfiguration {
          * Sets the JMS selector to use
          */
         private String selector;
+        /**
+         * Use this JMS property to correlate messages in InOut exchange pattern
+         * (request-reply) instead of JMSCorrelationID property. This allows you
+         * to exchange messages with systems that do not correlate messages
+         * using JMSCorrelationID JMS property. If used JMSCorrelationID will
+         * not be used or set by Camel. The value of here named property will be
+         * generated if not supplied in the header of the message under the same
+         * name.
+         */
+        private String correlationProperty;
 
         public ConsumerType getConsumerType() {
             return consumerType;
@@ -2492,5 +2502,13 @@ public class JmsComponentConfiguration {
         public void setSelector(String selector) {
             this.selector = selector;
         }
+
+        public String getCorrelationProperty() {
+            return correlationProperty;
+        }
+
+        public void setCorrelationProperty(String correlationProperty) {
+            this.correlationProperty = correlationProperty;
+        }
     }
 }
\ No newline at end of file