You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/03/23 16:41:58 UTC

(camel) branch jms-inout created (now 63b317e7731)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch jms-inout
in repository https://gitbox.apache.org/repos/asf/camel.git


      at 63b317e7731 camel-jms - Using in/out should use single atomic operation with the timeout map for the correlation id. Could help with org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers

This branch includes the following new commits:

     new 63b317e7731 camel-jms - Using in/out should use single atomic operation with the timeout map for the correlation id. Could help with org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(camel) 01/01: camel-jms - Using in/out should use single atomic operation with the timeout map for the correlation id. Could help with org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch jms-inout
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 63b317e7731510742a679f409449489656219da6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Mar 23 17:41:45 2024 +0100

    camel-jms - Using in/out should use single atomic operation with the timeout map for the correlation id. Could help with org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers
---
 .../java/org/apache/camel/component/jms/reply/QueueReplyManager.java  | 3 +--
 .../org/apache/camel/component/jms/reply/ReplyManagerSupport.java     | 4 ++--
 .../apache/camel/component/jms/reply/TemporaryQueueReplyManager.java  | 3 +--
 3 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
index c0697cf329d..000350941d2 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
@@ -70,13 +70,12 @@ public class QueueReplyManager extends ReplyManagerSupport {
 
     @Override
     protected void handleReplyMessage(String correlationID, Message message, Session session) {
-        ReplyHandler handler = correlation.get(correlationID);
+        ReplyHandler handler = correlation.remove(correlationID);
         if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
             handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
         }
 
         if (handler != null) {
-            correlation.remove(correlationID);
             handler.onReply(correlationID, message, session);
         } else {
             // we could not correlate the received reply message to a matching request and therefore
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 8118c78ce27..b8e12e72078 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jms.reply;
 
 import java.time.Duration;
+import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -250,12 +251,11 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
                 .build())
                 .build();
 
-        return task.run(() -> getReplyHandler(correlationID), answer -> answer != null).orElse(null);
+        return task.run(() -> getReplyHandler(correlationID), Objects::nonNull).orElse(null);
     }
 
     private ReplyHandler getReplyHandler(String correlationID) {
         log.trace("Early reply not found handler. Waiting a bit longer.");
-
         return correlation.get(correlationID);
     }
 
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index fc715521c26..181463055d9 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -88,13 +88,12 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
 
     @Override
     protected void handleReplyMessage(String correlationID, Message message, Session session) {
-        ReplyHandler handler = correlation.get(correlationID);
+        ReplyHandler handler = correlation.remove(correlationID);
         if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
             handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
         }
 
         if (handler != null) {
-            correlation.remove(correlationID);
             handler.onReply(correlationID, message, session);
         } else {
             // we could not correlate the received reply message to a matching request and therefore