You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/24 12:18:36 UTC

(camel) branch main updated: CAMEL-20363: camel-jms - Make getting JMSCorrelationID more roboust f… (#12892)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 0423ead503c CAMEL-20363: camel-jms - Make getting JMSCorrelationID more roboust f… (#12892)
0423ead503c is described below

commit 0423ead503c2743dcb1e5ff3fd66f6a6775c569f
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jan 24 13:18:30 2024 +0100

    CAMEL-20363: camel-jms - Make getting JMSCorrelationID more roboust f… (#12892)
    
    * CAMEL-20363: camel-jms - Make getting JMSCorrelationID more roboust for JMS brokers that has bugs in this regard.
    
    * camel-jms - Flaky test on CI
---
 .../camel/component/jms/EndpointMessageListener.java      |  9 +++++----
 .../java/org/apache/camel/component/jms/JmsBinding.java   |  2 +-
 .../org/apache/camel/component/jms/JmsMessageHelper.java  | 15 +++++++++++++++
 .../java/org/apache/camel/component/jms/JmsProducer.java  |  2 +-
 .../camel/component/jms/reply/ReplyManagerSupport.java    |  4 ++--
 .../apache/camel/component/jms/JmsDeliveryDelayTest.java  |  8 +++++---
 6 files changed, 29 insertions(+), 11 deletions(-)

diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
index be704310287..8dbbe7b9458 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
@@ -107,7 +107,7 @@ public class EndpointMessageListener implements SessionAwareMessageListener {
                 exchange.getIn().getHeaders();
             }
 
-            String correlationId = message.getJMSCorrelationID();
+            String correlationId = JmsMessageHelper.getJMSCorrelationID(message);
             if (correlationId != null) {
                 LOG.debug("Received Message has JMSCorrelationID [{}]", correlationId);
             }
@@ -378,11 +378,12 @@ public class EndpointMessageListener implements SessionAwareMessageListener {
      */
     protected String determineCorrelationId(final Message message) throws JMSException {
         final String messageId = message.getJMSMessageID();
-        final String correlationId = message.getJMSCorrelationID();
-
         if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
             return messageId;
-        } else if (ObjectHelper.isEmpty(correlationId)) {
+        }
+
+        final String correlationId = JmsMessageHelper.getJMSCorrelationID(message);
+        if (ObjectHelper.isEmpty(correlationId)) {
             // correlation id is empty so fallback to message id
             return messageId;
         } else {
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index 2a874f77abe..b488e62e9d7 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -178,7 +178,7 @@ public class JmsBinding {
         if (jmsMessage != null) {
             // lets populate the standard JMS message headers
             try {
-                map.put(JmsConstants.JMS_HEADER_CORRELATION_ID, jmsMessage.getJMSCorrelationID());
+                map.put(JmsConstants.JMS_HEADER_CORRELATION_ID, JmsMessageHelper.getJMSCorrelationID(jmsMessage));
                 map.put(JmsConstants.JMS_HEADER_CORRELATION_ID_AS_BYTES,
                         JmsMessageHelper.getJMSCorrelationIDAsBytes(jmsMessage));
                 map.put(JmsConstants.JMS_HEADER_DELIVERY_MODE, jmsMessage.getJMSDeliveryMode());
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
index 0557ec55ab6..c5342c200f6 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
@@ -469,4 +469,19 @@ public final class JmsMessageHelper {
         }
         return null;
     }
+
+    /**
+     * Gets the JMSCorrelationID from the message.
+     *
+     * @param  message the message
+     * @return         the JMSCorrelationID, or <tt>null</tt> if not able to get
+     */
+    public static String getJMSCorrelationID(Message message) {
+        try {
+            return message.getJMSCorrelationID();
+        } catch (Exception e) {
+            // ignore if JMS broker do not support this
+        }
+        return null;
+    }
 }
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
index 1fcb6ad032c..5e5b60bf510 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
@@ -275,7 +275,7 @@ public class JmsProducer extends DefaultAsyncProducer {
         final String correlationProperty = configuration.getCorrelationProperty();
 
         final String messageId = message.getJMSMessageID();
-        final String correlationId = message.getJMSCorrelationID();
+        final String correlationId = JmsMessageHelper.getJMSCorrelationID(message);
         final String correlationPropertyValue;
         if (correlationProperty == null) {
             correlationPropertyValue = null;
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 88877970b99..dbdf3f3d95a 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -145,11 +145,11 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
 
         try {
             if (correlationProperty == null) {
-                correlationID = message.getJMSCorrelationID();
+                correlationID = JmsMessageHelper.getJMSCorrelationID(message);
             } else {
                 correlationID = message.getStringProperty(correlationProperty);
             }
-        } catch (JMSException e) {
+        } catch (Exception e) {
             // ignore
         }
 
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeliveryDelayTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeliveryDelayTest.java
index 132b5d7f08e..fb70af7ed23 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeliveryDelayTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeliveryDelayTest.java
@@ -51,12 +51,13 @@ public class JmsDeliveryDelayTest extends AbstractPersistentJMSTest {
 
         routeWatch.restart();
         template.sendBody("activemq:topic:JmsDeliveryDelayTest?deliveryDelay=1000", "Hello World");
-        if (!routeComplete.await(2000, TimeUnit.MILLISECONDS)) {
+        if (!routeComplete.await(5000, TimeUnit.MILLISECONDS)) {
             fail("Message was not received from Artemis topic for too long");
         }
 
         MockEndpoint.assertIsSatisfied(context);
-        assertTrue(routeWatch.taken() >= 1000, "Should take at least 1000 millis");
+        // give some slack
+        assertTrue(routeWatch.taken() >= 900, "Should take at least 1000 millis");
     }
 
     @Test
@@ -69,7 +70,8 @@ public class JmsDeliveryDelayTest extends AbstractPersistentJMSTest {
 
         MockEndpoint.assertIsSatisfied(context);
         assertEquals(response, "Hello World");
-        assertTrue(routeWatch.taken() >= 1000, "Should take at least 1000 millis");
+        // give some slack
+        assertTrue(routeWatch.taken() >= 900, "Should take at least 1000 millis");
     }
 
     @BeforeEach