You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2018/10/12 07:55:41 UTC

[camel] 27/43: Make InterceptSendToEndpointProcessor asynchronous

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 48553958390b8cfc9dbe667723641442e81b9c33
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Mon Oct 1 17:21:23 2018 +0200

    Make InterceptSendToEndpointProcessor asynchronous
---
 .../apache/camel/impl/InterceptSendToEndpoint.java |  2 +-
 .../impl/InterceptSendToEndpointProcessor.java     | 41 ++++++++++------------
 2 files changed, 19 insertions(+), 24 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
index 3b8a0af..044d538 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
@@ -94,7 +94,7 @@ public class InterceptSendToEndpoint implements Endpoint, ShutdownableService {
 
     @Override
     public AsyncProducer createAsyncProducer() throws Exception {
-        Producer producer = delegate.createProducer();
+        AsyncProducer producer = delegate.createAsyncProducer();
         return new InterceptSendToEndpointProcessor(this, delegate, producer, skip);
     }
 
diff --git a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
index 5ed03e5..07f245f 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
@@ -18,9 +18,11 @@ package org.apache.camel.impl;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncProducer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
+import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,10 +37,10 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer {
 
     private final InterceptSendToEndpoint endpoint;
     private final Endpoint delegate;
-    private final Producer producer;
+    private final AsyncProducer producer;
     private final boolean skip;
 
-    public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, Endpoint delegate, Producer producer, boolean skip) throws Exception {
+    public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip) throws Exception {
         super(delegate);
         this.endpoint = endpoint;
         this.delegate = delegate;
@@ -61,18 +63,19 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer {
 
         if (endpoint.getDetour() != null) {
             // detour the exchange using synchronous processing
-            try {
-                endpoint.getDetour().process(exchange);
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
+            AsyncProcessor detour = AsyncProcessorConverterHelper.convert(endpoint.getDetour());
+            return detour.process(exchange, s -> callback(exchange, callback, s));
         }
 
+        return callback(exchange, callback, true);
+    }
+
+    private boolean callback(Exchange exchange, AsyncCallback callback, boolean doneSync) {
         // Decide whether to continue or not; similar logic to the Pipeline
         // check for error if so we should break out
         if (!continueProcessing(exchange, "skip sending to original intended destination: " + getEndpoint(), log)) {
-            callback.done(true);
-            return true;
+            callback.done(doneSync);
+            return doneSync;
         }
 
         // determine if we should skip or not
@@ -92,24 +95,16 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer {
             }
 
             // route to original destination leveraging the asynchronous routing engine if possible
-            if (producer instanceof AsyncProcessor) {
-                AsyncProcessor async = (AsyncProcessor) producer;
-                return async.process(exchange, callback);
-            } else {
-                try {
-                    producer.process(exchange);
-                } catch (Exception e) {
-                    exchange.setException(e);
-                }
-                callback.done(true);
-                return true;
-            }
+            boolean s = producer.process(exchange, ds -> {
+                callback.done(doneSync && ds);
+            });
+            return doneSync && s;
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("Stop() means skip sending exchange to original intended destination: {} for exchange: {}", getEndpoint(), exchange);
             }
-            callback.done(true);
-            return true;
+            callback.done(doneSync);
+            return doneSync;
         }
     }