You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/10 10:53:24 UTC

[camel] branch main updated: CAMEL-17121: converted camel-rabbitmq reply manager to repeatable tasks

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f4515f8 CAMEL-17121: converted camel-rabbitmq reply manager to repeatable tasks
0f4515f8 is described below

commit 0f4515f896f0a2425077b2ec9f4aca852277b6d9
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Nov 9 18:11:03 2021 +0100

    CAMEL-17121: converted camel-rabbitmq reply manager to repeatable tasks
---
 .../rabbitmq/reply/ReplyManagerSupport.java        | 36 ++++++----------------
 1 file changed, 10 insertions(+), 26 deletions(-)

diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
index 6b39178..e8f7972 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.rabbitmq.reply;
 
+import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -33,6 +34,9 @@ import org.apache.camel.component.rabbitmq.RabbitMQMessageConverter;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.task.ForegroundTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -215,33 +219,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
             LOG.warn("Early reply received with correlationID [{}] -> {}", correlationID, message);
         }
 
-        ReplyHandler answer = null;
+        ForegroundTask task = Tasks.foregroundTask().withBudget(Budgets.iterationBudget()
+                .withMaxIterations(50)
+                .withInterval(Duration.ofMillis(100))
+                .build())
+                .build();
 
-        // wait up till 5 seconds
-        boolean done = false;
-        int counter = 0;
-        while (!done && counter++ < 50) {
-            LOG.trace("Early reply not found handler at attempt {}. Waiting a bit longer.", counter);
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // ignore
-            }
-
-            // try again
-            answer = correlation.get(correlationID);
-            done = answer != null;
-
-            if (answer != null) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace(
-                            "Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}",
-                            correlationID, counter, answer);
-                }
-            }
-        }
-
-        return answer;
+        return task.run(() -> correlation.get(correlationID), answer -> answer != null).orElse(null);
     }
 
     @Override