You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/10 10:53:24 UTC
[camel] branch main updated: CAMEL-17121: converted camel-rabbitmq
reply manager to repeatable tasks
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 0f4515f8 CAMEL-17121: converted camel-rabbitmq reply manager to repeatable tasks
0f4515f8 is described below
commit 0f4515f896f0a2425077b2ec9f4aca852277b6d9
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Nov 9 18:11:03 2021 +0100
CAMEL-17121: converted camel-rabbitmq reply manager to repeatable tasks
---
.../rabbitmq/reply/ReplyManagerSupport.java | 36 ++++++----------------
1 file changed, 10 insertions(+), 26 deletions(-)
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
index 6b39178..e8f7972 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.rabbitmq.reply;
+import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -33,6 +34,9 @@ import org.apache.camel.component.rabbitmq.RabbitMQMessageConverter;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.task.ForegroundTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -215,33 +219,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
LOG.warn("Early reply received with correlationID [{}] -> {}", correlationID, message);
}
- ReplyHandler answer = null;
+ ForegroundTask task = Tasks.foregroundTask().withBudget(Budgets.iterationBudget()
+ .withMaxIterations(50)
+ .withInterval(Duration.ofMillis(100))
+ .build())
+ .build();
- // wait up till 5 seconds
- boolean done = false;
- int counter = 0;
- while (!done && counter++ < 50) {
- LOG.trace("Early reply not found handler at attempt {}. Waiting a bit longer.", counter);
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // ignore
- }
-
- // try again
- answer = correlation.get(correlationID);
- done = answer != null;
-
- if (answer != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(
- "Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}",
- correlationID, counter, answer);
- }
- }
- }
-
- return answer;
+ return task.run(() -> correlation.get(correlationID), answer -> answer != null).orElse(null);
}
@Override