You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/10/12 07:55:41 UTC
[camel] 27/43: Make InterceptSendToEndpointProcessor asynchronous
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 48553958390b8cfc9dbe667723641442e81b9c33
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Mon Oct 1 17:21:23 2018 +0200
Make InterceptSendToEndpointProcessor asynchronous
---
.../apache/camel/impl/InterceptSendToEndpoint.java | 2 +-
.../impl/InterceptSendToEndpointProcessor.java | 41 ++++++++++------------
2 files changed, 19 insertions(+), 24 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
index 3b8a0af..044d538 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
@@ -94,7 +94,7 @@ public class InterceptSendToEndpoint implements Endpoint, ShutdownableService {
@Override
public AsyncProducer createAsyncProducer() throws Exception {
- Producer producer = delegate.createProducer();
+ AsyncProducer producer = delegate.createAsyncProducer();
return new InterceptSendToEndpointProcessor(this, delegate, producer, skip);
}
diff --git a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
index 5ed03e5..07f245f 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
@@ -18,9 +18,11 @@ package org.apache.camel.impl;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncProducer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
+import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,10 +37,10 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer {
private final InterceptSendToEndpoint endpoint;
private final Endpoint delegate;
- private final Producer producer;
+ private final AsyncProducer producer;
private final boolean skip;
- public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, Endpoint delegate, Producer producer, boolean skip) throws Exception {
+ public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip) throws Exception {
super(delegate);
this.endpoint = endpoint;
this.delegate = delegate;
@@ -61,18 +63,19 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer {
if (endpoint.getDetour() != null) {
// detour the exchange using synchronous processing
- try {
- endpoint.getDetour().process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
+ AsyncProcessor detour = AsyncProcessorConverterHelper.convert(endpoint.getDetour());
+ return detour.process(exchange, s -> callback(exchange, callback, s));
}
+ return callback(exchange, callback, true);
+ }
+
+ private boolean callback(Exchange exchange, AsyncCallback callback, boolean doneSync) {
// Decide whether to continue or not; similar logic to the Pipeline
// check for error if so we should break out
if (!continueProcessing(exchange, "skip sending to original intended destination: " + getEndpoint(), log)) {
- callback.done(true);
- return true;
+ callback.done(doneSync);
+ return doneSync;
}
// determine if we should skip or not
@@ -92,24 +95,16 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer {
}
// route to original destination leveraging the asynchronous routing engine if possible
- if (producer instanceof AsyncProcessor) {
- AsyncProcessor async = (AsyncProcessor) producer;
- return async.process(exchange, callback);
- } else {
- try {
- producer.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
- callback.done(true);
- return true;
- }
+ boolean s = producer.process(exchange, ds -> {
+ callback.done(doneSync && ds);
+ });
+ return doneSync && s;
} else {
if (log.isDebugEnabled()) {
log.debug("Stop() means skip sending exchange to original intended destination: {} for exchange: {}", getEndpoint(), exchange);
}
- callback.done(true);
- return true;
+ callback.done(doneSync);
+ return doneSync;
}
}