You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/04 18:52:14 UTC

[camel] 04/07: CAMEL-12120: Better solution for routing-slip/dynamic-router to use existing producer when embedded with error handler.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 83684bcb0e605ef799d21d3d84fba90bcb86d395
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jan 4 16:37:04 2018 +0100

    CAMEL-12120: Better solution for routing-slip/dynamic-router to use existing producer when embedded with error handler.
---
 .../src/main/java/org/apache/camel/Exchange.java   |  1 +
 .../camel/model/DynamicRouterDefinition.java       |  9 +----
 .../apache/camel/model/RoutingSlipDefinition.java  |  9 +----
 .../org/apache/camel/processor/RoutingSlip.java    | 39 ++++++++++++++++++++--
 .../DynamicRouterEventNotifierTest.java            |  1 +
 .../routingslip/RoutingSlipEventNotifierTest.java  |  1 +
 6 files changed, 42 insertions(+), 18 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java
index 6110201..b1d232d 100644
--- a/camel-core/src/main/java/org/apache/camel/Exchange.java
+++ b/camel-core/src/main/java/org/apache/camel/Exchange.java
@@ -212,6 +212,7 @@ public interface Exchange {
     String SKIP_GZIP_ENCODING = "CamelSkipGzipEncoding";
     String SKIP_WWW_FORM_URLENCODED = "CamelSkipWwwFormUrlEncoding"; 
     String SLIP_ENDPOINT      = "CamelSlipEndpoint";
+    String SLIP_PRODUCER      = "CamelSlipProducer";
     String SPLIT_INDEX        = "CamelSplitIndex";
     String SPLIT_COMPLETE     = "CamelSplitComplete";
     String SPLIT_SIZE         = "CamelSplitSize";
diff --git a/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java b/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
index e4b855d..3b2ec79 100644
--- a/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
@@ -88,18 +88,11 @@ public class DynamicRouterDefinition<Type extends ProcessorDefinition<Type>> ext
             dynamicRouter.setCacheSize(getCacheSize());
         }
 
-        // use dynamic processor to send to the computed slip endpoint
-        SendDynamicProcessor dynamicProcessor = new SendDynamicProcessor(headerExpression(Exchange.SLIP_ENDPOINT));
-        dynamicProcessor.setCamelContext(routeContext.getCamelContext());
-        if (getCacheSize() != null) {
-            dynamicProcessor.setCacheSize(getCacheSize());
-        }
-
         // and wrap this in an error handler
         ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
         // create error handler (create error handler directly to keep it light weight,
         // instead of using ProcessorDefinition.wrapInErrorHandler)
-        AsyncProcessor errorHandler = (AsyncProcessor) builder.createErrorHandler(routeContext, dynamicProcessor);
+        AsyncProcessor errorHandler = (AsyncProcessor) builder.createErrorHandler(routeContext, dynamicRouter.newRoutingSlipProcessorForErrorHandler());
         dynamicRouter.setErrorHandler(errorHandler);
 
         return dynamicRouter;
diff --git a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index 413d8d5..c2c029c 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -98,18 +98,11 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
             routingSlip.setCacheSize(getCacheSize());
         }
 
-        // use dynamic processor to send to the computed slip endpoint
-        SendDynamicProcessor dynamicProcessor = new SendDynamicProcessor(headerExpression(Exchange.SLIP_ENDPOINT));
-        dynamicProcessor.setCamelContext(routeContext.getCamelContext());
-        if (getCacheSize() != null) {
-            dynamicProcessor.setCacheSize(getCacheSize());
-        }
-
         // and wrap this in an error handler
         ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
         // create error handler (create error handler directly to keep it light weight,
         // instead of using ProcessorDefinition.wrapInErrorHandler)
-        AsyncProcessor errorHandler = (AsyncProcessor) builder.createErrorHandler(routeContext, dynamicProcessor);
+        AsyncProcessor errorHandler = (AsyncProcessor) builder.createErrorHandler(routeContext, routingSlip.newRoutingSlipProcessorForErrorHandler());
         routingSlip.setErrorHandler(errorHandler);
 
         return routingSlip;
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index ccfb808..dbfb8ad 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -69,6 +69,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
     protected String uriDelimiter;
     protected final CamelContext camelContext;
     protected AsyncProcessor errorHandler;
+    protected SendDynamicProcessor sendDynamicProcessor;
 
     /**
      * The iterator to be used for retrieving the next routing slip(s) to be used.
@@ -359,12 +360,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
                 RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
                 AsyncProcessor target = createErrorHandler(routeContext, exchange, asyncProducer, endpoint);
 
-                // set property which endpoint we send to
+                // set property which endpoint we send to and the producer that can do it
                 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
                 exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri());
+                exchange.setProperty(Exchange.SLIP_PRODUCER, asyncProducer);
 
                 boolean answer = target.process(exchange, new AsyncCallback() {
                     public void done(boolean doneSync) {
+                        // cleanup producer after usage
+                        exchange.removeProperty(Exchange.SLIP_PRODUCER);
+
                         // we only have to handle async completion of the routing slip
                         if (doneSync) {
                             callback.done(true);
@@ -424,7 +429,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
 
                             // copy results back to the original exchange
                             ExchangeHelper.copyResults(original, current);
-                       } catch (Throwable e) {
+                        } catch (Throwable e) {
                             exchange.setException(e);
                         }
 
@@ -492,4 +497,34 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
         result.getProperties().clear();
         result.getProperties().putAll(source.getProperties());
     }
+
+    /**
+     * Creates the embedded processor to use when wrapping this routing slip in an error handler.
+     */
+    public AsyncProcessor newRoutingSlipProcessorForErrorHandler() {
+        return new RoutingSlipProcessor();
+    }
+
+    /**
+     * Embedded processor that routes to the routing slip that has been set via the
+     * exchange property {@link Exchange#SLIP_PRODUCER}.
+     */
+    private final class RoutingSlipProcessor implements AsyncProcessor {
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            AsyncProcessorHelper.process(this, exchange);
+        }
+
+        @Override
+        public boolean process(Exchange exchange, AsyncCallback callback) {
+            AsyncProcessor producer = exchange.getProperty(Exchange.SLIP_PRODUCER, AsyncProcessor.class);
+            return producer.process(exchange, callback);
+        }
+
+        @Override
+        public String toString() {
+            return "RoutingSlipProcessor";
+        }
+    }
 }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/routingslip/DynamicRouterEventNotifierTest.java b/camel-core/src/test/java/org/apache/camel/processor/routingslip/DynamicRouterEventNotifierTest.java
index f09214c..5f8426f 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/routingslip/DynamicRouterEventNotifierTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/routingslip/DynamicRouterEventNotifierTest.java
@@ -82,6 +82,7 @@ public class DynamicRouterEventNotifierTest extends ContextTestSupport {
         @Override
         public void notify(EventObject event) throws Exception {
             if (event instanceof ExchangeSendingEvent) {
+                log.info("Sending: {}", event);
                 sending++;
             } else {
                 sent++;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java b/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java
index 064cd51..aa93217 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java
@@ -66,6 +66,7 @@ public class RoutingSlipEventNotifierTest extends ContextTestSupport {
         @Override
         public void notify(EventObject event) throws Exception {
             if (event instanceof ExchangeSendingEvent) {
+                log.info("Sending: {}", event);
                 sending++;
             } else {
                 sent++;

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.