You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/03/23 16:41:59 UTC

(camel) 01/01: camel-jms - Using in/out should use single atomic operation with the timeout map for the correlation id. Could help with org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch jms-inout
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 63b317e7731510742a679f409449489656219da6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Mar 23 17:41:45 2024 +0100

    camel-jms - Using in/out should use single atomic operation with the timeout map for the correlation id. Could help with org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers
---
 .../java/org/apache/camel/component/jms/reply/QueueReplyManager.java  | 3 +--
 .../org/apache/camel/component/jms/reply/ReplyManagerSupport.java     | 4 ++--
 .../apache/camel/component/jms/reply/TemporaryQueueReplyManager.java  | 3 +--
 3 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
index c0697cf329d..000350941d2 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
@@ -70,13 +70,12 @@ public class QueueReplyManager extends ReplyManagerSupport {
 
     @Override
     protected void handleReplyMessage(String correlationID, Message message, Session session) {
-        ReplyHandler handler = correlation.get(correlationID);
+        ReplyHandler handler = correlation.remove(correlationID);
         if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
             handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
         }
 
         if (handler != null) {
-            correlation.remove(correlationID);
             handler.onReply(correlationID, message, session);
         } else {
             // we could not correlate the received reply message to a matching request and therefore
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 8118c78ce27..b8e12e72078 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jms.reply;
 
 import java.time.Duration;
+import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -250,12 +251,11 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
                 .build())
                 .build();
 
-        return task.run(() -> getReplyHandler(correlationID), answer -> answer != null).orElse(null);
+        return task.run(() -> getReplyHandler(correlationID), Objects::nonNull).orElse(null);
     }
 
     private ReplyHandler getReplyHandler(String correlationID) {
         log.trace("Early reply not found handler. Waiting a bit longer.");
-
         return correlation.get(correlationID);
     }
 
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index fc715521c26..181463055d9 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -88,13 +88,12 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
 
     @Override
     protected void handleReplyMessage(String correlationID, Message message, Session session) {
-        ReplyHandler handler = correlation.get(correlationID);
+        ReplyHandler handler = correlation.remove(correlationID);
         if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
             handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
         }
 
         if (handler != null) {
-            correlation.remove(correlationID);
             handler.onReply(correlationID, message, session);
         } else {
             // we could not correlate the received reply message to a matching request and therefore