You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/04 18:52:14 UTC
[camel] 04/07: CAMEL-12120: Better solution for
routing-slip/dynamic-router to use existing producer when embedded with
error handler.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 83684bcb0e605ef799d21d3d84fba90bcb86d395
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jan 4 16:37:04 2018 +0100
CAMEL-12120: Better solution for routing-slip/dynamic-router to use existing producer when embedded with error handler.
---
.../src/main/java/org/apache/camel/Exchange.java | 1 +
.../camel/model/DynamicRouterDefinition.java | 9 +----
.../apache/camel/model/RoutingSlipDefinition.java | 9 +----
.../org/apache/camel/processor/RoutingSlip.java | 39 ++++++++++++++++++++--
.../DynamicRouterEventNotifierTest.java | 1 +
.../routingslip/RoutingSlipEventNotifierTest.java | 1 +
6 files changed, 42 insertions(+), 18 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java
index 6110201..b1d232d 100644
--- a/camel-core/src/main/java/org/apache/camel/Exchange.java
+++ b/camel-core/src/main/java/org/apache/camel/Exchange.java
@@ -212,6 +212,7 @@ public interface Exchange {
String SKIP_GZIP_ENCODING = "CamelSkipGzipEncoding";
String SKIP_WWW_FORM_URLENCODED = "CamelSkipWwwFormUrlEncoding";
String SLIP_ENDPOINT = "CamelSlipEndpoint";
+ String SLIP_PRODUCER = "CamelSlipProducer";
String SPLIT_INDEX = "CamelSplitIndex";
String SPLIT_COMPLETE = "CamelSplitComplete";
String SPLIT_SIZE = "CamelSplitSize";
diff --git a/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java b/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
index e4b855d..3b2ec79 100644
--- a/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
@@ -88,18 +88,11 @@ public class DynamicRouterDefinition<Type extends ProcessorDefinition<Type>> ext
dynamicRouter.setCacheSize(getCacheSize());
}
- // use dynamic processor to send to the computed slip endpoint
- SendDynamicProcessor dynamicProcessor = new SendDynamicProcessor(headerExpression(Exchange.SLIP_ENDPOINT));
- dynamicProcessor.setCamelContext(routeContext.getCamelContext());
- if (getCacheSize() != null) {
- dynamicProcessor.setCacheSize(getCacheSize());
- }
-
// and wrap this in an error handler
ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
// create error handler (create error handler directly to keep it light weight,
// instead of using ProcessorDefinition.wrapInErrorHandler)
- AsyncProcessor errorHandler = (AsyncProcessor) builder.createErrorHandler(routeContext, dynamicProcessor);
+ AsyncProcessor errorHandler = (AsyncProcessor) builder.createErrorHandler(routeContext, dynamicRouter.newRoutingSlipProcessorForErrorHandler());
dynamicRouter.setErrorHandler(errorHandler);
return dynamicRouter;
diff --git a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index 413d8d5..c2c029c 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -98,18 +98,11 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
routingSlip.setCacheSize(getCacheSize());
}
- // use dynamic processor to send to the computed slip endpoint
- SendDynamicProcessor dynamicProcessor = new SendDynamicProcessor(headerExpression(Exchange.SLIP_ENDPOINT));
- dynamicProcessor.setCamelContext(routeContext.getCamelContext());
- if (getCacheSize() != null) {
- dynamicProcessor.setCacheSize(getCacheSize());
- }
-
// and wrap this in an error handler
ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
// create error handler (create error handler directly to keep it light weight,
// instead of using ProcessorDefinition.wrapInErrorHandler)
- AsyncProcessor errorHandler = (AsyncProcessor) builder.createErrorHandler(routeContext, dynamicProcessor);
+ AsyncProcessor errorHandler = (AsyncProcessor) builder.createErrorHandler(routeContext, routingSlip.newRoutingSlipProcessorForErrorHandler());
routingSlip.setErrorHandler(errorHandler);
return routingSlip;
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index ccfb808..dbfb8ad 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -69,6 +69,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
protected String uriDelimiter;
protected final CamelContext camelContext;
protected AsyncProcessor errorHandler;
+ protected SendDynamicProcessor sendDynamicProcessor;
/**
* The iterator to be used for retrieving the next routing slip(s) to be used.
@@ -359,12 +360,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
AsyncProcessor target = createErrorHandler(routeContext, exchange, asyncProducer, endpoint);
- // set property which endpoint we send to
+ // set property which endpoint we send to and the producer that can do it
exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri());
+ exchange.setProperty(Exchange.SLIP_PRODUCER, asyncProducer);
boolean answer = target.process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
+ // cleanup producer after usage
+ exchange.removeProperty(Exchange.SLIP_PRODUCER);
+
// we only have to handle async completion of the routing slip
if (doneSync) {
callback.done(true);
@@ -424,7 +429,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
// copy results back to the original exchange
ExchangeHelper.copyResults(original, current);
- } catch (Throwable e) {
+ } catch (Throwable e) {
exchange.setException(e);
}
@@ -492,4 +497,34 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
result.getProperties().clear();
result.getProperties().putAll(source.getProperties());
}
+
+ /**
+ * Creates the embedded processor to use when wrapping this routing slip in an error handler.
+ */
+ public AsyncProcessor newRoutingSlipProcessorForErrorHandler() {
+ return new RoutingSlipProcessor();
+ }
+
+ /**
+ * Embedded processor that routes to the routing slip that has been set via the
+ * exchange property {@link Exchange#SLIP_PRODUCER}.
+ */
+ private final class RoutingSlipProcessor implements AsyncProcessor {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ AsyncProcessor producer = exchange.getProperty(Exchange.SLIP_PRODUCER, AsyncProcessor.class);
+ return producer.process(exchange, callback);
+ }
+
+ @Override
+ public String toString() {
+ return "RoutingSlipProcessor";
+ }
+ }
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/routingslip/DynamicRouterEventNotifierTest.java b/camel-core/src/test/java/org/apache/camel/processor/routingslip/DynamicRouterEventNotifierTest.java
index f09214c..5f8426f 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/routingslip/DynamicRouterEventNotifierTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/routingslip/DynamicRouterEventNotifierTest.java
@@ -82,6 +82,7 @@ public class DynamicRouterEventNotifierTest extends ContextTestSupport {
@Override
public void notify(EventObject event) throws Exception {
if (event instanceof ExchangeSendingEvent) {
+ log.info("Sending: {}", event);
sending++;
} else {
sent++;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java b/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java
index 064cd51..aa93217 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/routingslip/RoutingSlipEventNotifierTest.java
@@ -66,6 +66,7 @@ public class RoutingSlipEventNotifierTest extends ContextTestSupport {
@Override
public void notify(EventObject event) throws Exception {
if (event instanceof ExchangeSendingEvent) {
+ log.info("Sending: {}", event);
sending++;
} else {
sent++;
--
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.