You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/03/28 06:43:42 UTC

(camel) branch main updated: jms inout concurrency issue (#13598)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 630b74d1f85 jms inout concurrency issue (#13598)
630b74d1f85 is described below

commit 630b74d1f85a4c53a7b7a48f0219e72c6b0dcfcc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Mar 28 07:43:36 2024 +0100

    jms inout concurrency issue (#13598)
    
    CAMEL-20493: camel-jms - Using in/out should use single atomic operation with the timeout map for the correlation id. Could help with org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers
---
 .../camel/component/jms/reply/QueueReplyManager.java       |  3 +--
 .../camel/component/jms/reply/ReplyManagerSupport.java     | 14 +++++++-------
 .../component/jms/reply/TemporaryQueueReplyManager.java    |  3 +--
 .../camel/component/sjms/reply/QueueReplyManager.java      |  5 +----
 .../component/sjms/reply/TemporaryQueueReplyManager.java   |  3 +--
 .../java/org/apache/camel/support/DefaultTimeoutMap.java   |  5 +++++
 6 files changed, 16 insertions(+), 17 deletions(-)

diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
index c0697cf329d..000350941d2 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
@@ -70,13 +70,12 @@ public class QueueReplyManager extends ReplyManagerSupport {
 
     @Override
     protected void handleReplyMessage(String correlationID, Message message, Session session) {
-        ReplyHandler handler = correlation.get(correlationID);
+        ReplyHandler handler = correlation.remove(correlationID);
         if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
             handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
         }
 
         if (handler != null) {
-            correlation.remove(correlationID);
             handler.onReply(correlationID, message, session);
         } else {
             // we could not correlate the received reply message to a matching request and therefore
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 8118c78ce27..31234500eee 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jms.reply;
 
 import java.time.Duration;
+import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -230,13 +231,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
 
     /**
      * <b>IMPORTANT:</b> This logic is only being used due to high performance in-memory only testing using InOut over
-     * JMS. Its unlikely to happen in a real life situation with communication to a remote broker, which always will be
-     * slower to send back reply, before Camel had a chance to update it's internal correlation map.
+     * JMS. It is unlikely to happen in a real life situation with communication to a remote broker, which always will
+     * be slower to send back reply, before Camel had a chance to update the internal correlation map.
      */
     protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, Message message) {
         // race condition, when using messageID as correlationID then we store a provisional correlation id
         // at first, which gets updated with the JMSMessageID after the message has been sent. And in the unlikely
-        // event that the reply comes back really really fast, and the correlation map hasn't yet been updated
+        // event that the reply comes back really fast, and the correlation map hasn't yet been updated
         // from the provisional id to the JMSMessageID. If so we have to wait a bit and lookup again.
         if (log.isWarnEnabled()) {
             log.warn("Early reply received with correlationID [{}] -> {}", correlationID, message);
@@ -250,13 +251,12 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
                 .build())
                 .build();
 
-        return task.run(() -> getReplyHandler(correlationID), answer -> answer != null).orElse(null);
+        return task.run(() -> getReplyHandler(correlationID), Objects::nonNull).orElse(null);
     }
 
     private ReplyHandler getReplyHandler(String correlationID) {
-        log.trace("Early reply not found handler. Waiting a bit longer.");
-
-        return correlation.get(correlationID);
+        log.trace("Early reply not found. Waiting a bit longer.");
+        return correlation.remove(correlationID); // get and remove
     }
 
     @Override
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index fc715521c26..181463055d9 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -88,13 +88,12 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
 
     @Override
     protected void handleReplyMessage(String correlationID, Message message, Session session) {
-        ReplyHandler handler = correlation.get(correlationID);
+        ReplyHandler handler = correlation.remove(correlationID);
         if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
             handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
         }
 
         if (handler != null) {
-            correlation.remove(correlationID);
             handler.onReply(correlationID, message, session);
         } else {
             // we could not correlate the received reply message to a matching request and therefore
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
index fb67ba02739..662f40fa6cc 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
@@ -45,16 +45,13 @@ public class QueueReplyManager extends ReplyManagerSupport {
             // should not happen that we can't find the handler
             return;
         }
-
         correlation.put(newCorrelationId, handler, requestTimeout);
     }
 
     @Override
     protected void handleReplyMessage(String correlationID, Message message, Session session) {
-        ReplyHandler handler = correlation.get(correlationID);
-
+        ReplyHandler handler = correlation.remove(correlationID);
         if (handler != null) {
-            correlation.remove(correlationID);
             handler.onReply(correlationID, message, session);
         } else {
             // we could not correlate the received reply message to a matching request and therefore
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java
index 6c3ce420c1e..98553e9a23a 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/TemporaryQueueReplyManager.java
@@ -63,9 +63,8 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
 
     @Override
     protected void handleReplyMessage(String correlationID, Message message, Session session) {
-        ReplyHandler handler = correlation.get(correlationID);
+        ReplyHandler handler = correlation.remove(correlationID);
         if (handler != null) {
-            correlation.remove(correlationID);
             handler.onReply(correlationID, message, session);
         } else {
             // we could not correlate the received reply message to a matching request and therefore
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java
index 5be87e40ef0..b6cda4f3ecc 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultTimeoutMap.java
@@ -134,6 +134,11 @@ public class DefaultTimeoutMap<K, V> extends ServiceSupport implements TimeoutMa
 
     @Override
     public V remove(K key) {
+        // if no contains, the lock is not necessary
+        if (!map.containsKey(key)) {
+            return null;
+        }
+
         V value = null;
         lock.lock();
         try {