You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/04 18:52:12 UTC
[camel] 02/07: CAMEL-12120: routingSlip and dynamicRouter should
use an embedded dynamicProcessor to send to endpoints instead of error
handler cache.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 021026be1effad794fa32a0f0d2c7a29596dc633
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jan 4 15:18:02 2018 +0100
CAMEL-12120: routingSlip and dynamicRouter should use an embedded dynamicProcessor to send to endpoints instead of error handler cache.
---
.../camel/model/DynamicRouterDefinition.java | 21 ++++++
.../apache/camel/model/RoutingSlipDefinition.java | 21 ++++++
.../org/apache/camel/processor/RoutingSlip.java | 68 ++++-------------
.../camel/issues/RoutingSlipMemoryLeakTwoTest.java | 84 ---------------------
.../RoutingSlipMemoryLeakUniqueSlipsTest.java | 87 ----------------------
5 files changed, 57 insertions(+), 224 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java b/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
index 9d39830..505fb01 100644
--- a/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
@@ -23,13 +23,19 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.ErrorHandlerFactory;
+import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.DynamicRouter;
+import org.apache.camel.processor.SendDynamicProcessor;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.RouteContext;
+import static org.apache.camel.builder.ExpressionBuilder.headerExpression;
+
/**
* Routes messages based on dynamic rules
*/
@@ -81,6 +87,21 @@ public class DynamicRouterDefinition<Type extends ProcessorDefinition<Type>> ext
if (getCacheSize() != null) {
dynamicRouter.setCacheSize(getCacheSize());
}
+
+ // use dynamic processor to send to the computed slip endpoint
+ SendDynamicProcessor dynamicProcessor = new SendDynamicProcessor(headerExpression(Exchange.SLIP_ENDPOINT));
+ dynamicProcessor.setCamelContext(routeContext.getCamelContext());
+ if (getCacheSize() != null) {
+ dynamicProcessor.setCacheSize(getCacheSize());
+ }
+
+ // and wrap this in an error handler
+ ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
+ // create error handler (create error handler directly to keep it light weight,
+ // instead of using ProcessorDefinition.wrapInErrorHandler)
+ AsyncProcessor errorHandler = (AsyncProcessor) builder.createErrorHandler(routeContext, dynamicProcessor);
+ dynamicRouter.setErrorHandler(errorHandler);
+
return dynamicRouter;
}
diff --git a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index 5d02a36..8815640 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -23,14 +23,20 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.ErrorHandlerFactory;
+import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.model.language.HeaderExpression;
import org.apache.camel.processor.RoutingSlip;
+import org.apache.camel.processor.SendDynamicProcessor;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.RouteContext;
+import static org.apache.camel.builder.ExpressionBuilder.headerExpression;
+
/**
* Routes a message through a series of steps that are pre-determined (the slip)
*/
@@ -91,6 +97,21 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
if (getCacheSize() != null) {
routingSlip.setCacheSize(getCacheSize());
}
+
+ // use dynamic processor to send to the computed slip endpoint
+ SendDynamicProcessor dynamicProcessor = new SendDynamicProcessor(headerExpression(Exchange.SLIP_ENDPOINT));
+ dynamicProcessor.setCamelContext(routeContext.getCamelContext());
+ if (getCacheSize() != null) {
+ dynamicProcessor.setCacheSize(getCacheSize());
+ }
+
+ // and wrap this in an error handler
+ ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
+ // create error handler (create error handler directly to keep it light weight,
+ // instead of using ProcessorDefinition.wrapInErrorHandler)
+ AsyncProcessor errorHandler = (AsyncProcessor) builder.createErrorHandler(routeContext, dynamicProcessor);
+ routingSlip.setErrorHandler(errorHandler);
+
return routingSlip;
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 4c4e574..ccfb808 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -17,21 +17,17 @@
package org.apache.camel.processor;
import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.AsyncProducerCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
-import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.Message;
-import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Traceable;
import org.apache.camel.builder.ExpressionBuilder;
@@ -44,7 +40,6 @@ import org.apache.camel.spi.RouteContext;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
@@ -73,20 +68,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
protected Expression expression;
protected String uriDelimiter;
protected final CamelContext camelContext;
- private final ConcurrentMap<PreparedErrorHandler, AsyncProcessor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, AsyncProcessor>();
-
- /**
- * Class that represents prepared fine grained error handlers when processing routingslip/dynamic-router exchanges
- * <p/>
- * This is similar to how multicast processor does.
- */
- static final class PreparedErrorHandler extends KeyValueHolder<String, Processor> {
-
- PreparedErrorHandler(String key, Processor value) {
- super(key, value);
- }
-
- }
+ protected AsyncProcessor errorHandler;
/**
* The iterator to be used for retrieving the next routing slip(s) to be used.
@@ -162,6 +144,14 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
this.cacheSize = cacheSize;
}
+ public AsyncProcessor getErrorHandler() {
+ return errorHandler;
+ }
+
+ public void setErrorHandler(AsyncProcessor errorHandler) {
+ this.errorHandler = errorHandler;
+ }
+
@Override
public String toString() {
return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]";
@@ -332,38 +322,12 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class);
// do not wrap in error handler if we are inside a try block
- if (!tryBlock && routeContext != null) {
+ if (!tryBlock && routeContext != null && errorHandler != null) {
// wrap the producer in error handler so we have fine grained error handling on
// the output side instead of the input side
// this is needed to support redelivery on that output alone and not doing redelivery
// for the entire routingslip/dynamic-router block again which will start from scratch again
-
- // create key for cache
- final PreparedErrorHandler key = new PreparedErrorHandler(endpoint.getEndpointUri(), processor);
-
- // lookup cached first to reuse and preserve memory
- answer = errorHandlers.get(key);
- if (answer != null) {
- log.trace("Using existing error handler for: {}", processor);
- return answer;
- }
-
- log.trace("Creating error handler for: {}", processor);
- ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
- // create error handler (create error handler directly to keep it light weight,
- // instead of using ProcessorDefinition.wrapInErrorHandler)
- try {
- answer = (AsyncProcessor) builder.createErrorHandler(routeContext, processor);
-
- // must start the error handler
- ServiceHelper.startServices(answer);
-
- // add to cache
- errorHandlers.putIfAbsent(key, answer);
-
- } catch (Exception e) {
- throw ObjectHelper.wrapRuntimeCamelException(e);
- }
+ answer = errorHandler;
}
return answer;
@@ -490,18 +454,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize);
}
}
- ServiceHelper.startService(producerCache);
+
+ ServiceHelper.startServices(producerCache, errorHandler);
}
protected void doStop() throws Exception {
- ServiceHelper.stopServices(producerCache);
+ ServiceHelper.stopServices(producerCache, errorHandler);
}
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(producerCache, errorHandlers);
-
- // only clear error handlers when shutting down
- errorHandlers.clear();
+ ServiceHelper.stopAndShutdownServices(producerCache, errorHandler);
}
public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java
deleted file mode 100644
index 34b174c..0000000
--- a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.issues;
-
-import java.lang.reflect.Field;
-import java.util.Map;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.RoutingSlip;
-
-public class RoutingSlipMemoryLeakTwoTest extends ContextTestSupport {
-
- @Override
- protected void setUp() throws Exception {
- deleteDirectory("target/output");
- super.setUp();
- }
-
- /**
- * Reproducer for the memory leak: CAMEL-10048
- */
- public void testMemoryLeakInExceptionHandlerCaching() throws Exception {
- int messageCount = 100;
- for (int i = 0; i < messageCount; i++) {
- template.sendBody("direct:start", "message " + i);
- }
- RoutingSlip routingSlip = context.getProcessor("memory-leak", RoutingSlip.class);
- assertNotNull(routingSlip);
-
- Map<?, ?> errorHandlers = getRoutingSlipErrorHandlers(routingSlip);
- assertEquals("Error handlers cache must contain only one value", 1, errorHandlers.size());
- }
-
- private Map<?, ?> getRoutingSlipErrorHandlers(RoutingSlip routingSlip) throws Exception {
- Field errorHandlersField = routingSlip.getClass().getDeclaredField("errorHandlers");
- errorHandlersField.setAccessible(true);
- Map<?, ?> errorHandlers = (Map<?, ?>) errorHandlersField.get(routingSlip);
- return errorHandlers;
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- errorHandler(deadLetterChannel("mock:dead"));
-
- from("direct:start")
- .routingSlip(method(SlipProvider.class)).id("memory-leak");
- }
- };
- }
-
- public static class SlipProvider {
-
- public String computeSlip(String body) {
- /*
- * It is important to have a processor here, that does not extend
- * AsyncProcessor. Only in this case
- * AsyncProcessorConverterHelper.convert() creates a new object,
- * thus leading to a memory leak. For example, if you replace file
- * endpoint with mock endpoint, then everything goes fine, because
- * MockEndpoint.createProducer() creates an implementation of
- * AsyncProcessor.
- */
- return "file:target/output";
- }
- }
-}
\ No newline at end of file
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java
deleted file mode 100644
index 39db36c..0000000
--- a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.issues;
-
-import java.lang.reflect.Field;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.RoutingSlip;
-
-public class RoutingSlipMemoryLeakUniqueSlipsTest extends ContextTestSupport {
-
- private static final AtomicInteger counter = new AtomicInteger(0);
-
- @Override
- protected void setUp() throws Exception {
- deleteDirectory("target/output");
- super.setUp();
- }
-
- /**
- * Reproducer for the memory leak: CAMEL-10048/CAMEL-10050
- */
- public void testMemoryLeakInExceptionHandlerCaching() throws Exception {
- int messageCount = 100;
- for (int i = 0; i < messageCount; i++) {
- template.sendBody("direct:start", "message " + i);
- }
- RoutingSlip routingSlip = context.getProcessor("memory-leak", RoutingSlip.class);
- assertNotNull(routingSlip);
-
- Map<?, ?> errorHandlers = getRoutingSlipErrorHandlers(routingSlip);
- assertEquals("Error handlers cache must contain only one value", 1, errorHandlers.size());
- }
-
- private Map<?, ?> getRoutingSlipErrorHandlers(RoutingSlip routingSlip) throws Exception {
- Field errorHandlersField = routingSlip.getClass().getDeclaredField("errorHandlers");
- errorHandlersField.setAccessible(true);
- Map<?, ?> errorHandlers = (Map<?, ?>) errorHandlersField.get(routingSlip);
- return errorHandlers;
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- errorHandler(deadLetterChannel("mock:dead"));
-
- from("direct:start")
- .routingSlip(method(SlipProvider.class)).id("memory-leak");
- }
- };
- }
-
- public static class SlipProvider {
-
- public String computeSlip(String body) {
- /*
- * It is important to have a processor here, that does not extend
- * AsyncProcessor. Only in this case
- * AsyncProcessorConverterHelper.convert() creates a new object,
- * thus leading to a memory leak. For example, if you replace file
- * endpoint with mock endpoint, then everything goes fine, because
- * MockEndpoint.createProducer() creates an implementation of
- * AsyncProcessor.
- */
- return "mock:" + counter.incrementAndGet();
- }
- }
-}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.