You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/04 18:52:11 UTC
[camel] 01/07: CAMEL-12120: Routingslip/Dynamic-Router EIPs can
cause error handlers to be stopped for shared error handlers. Backported
older working code. CAMEL-10050 is re-introduced again and we need to come
up with a better solution.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9687a0ca2e9572c0970bc227c7755592e8652dc1
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jan 4 10:12:57 2018 +0100
CAMEL-12120: Routingslip/Dynamic-Router EIPs can cause error handlers to be stopped for shared error handlers. Backported older working code. CAMEL-10050 is re-introduced again and we need to come up with a better solution.
---
.../org/apache/camel/processor/RoutingSlip.java | 58 +++++++++------
.../camel/issues/RoutingSlipMemoryLeakTwoTest.java | 84 +++++++++++++++++++++
.../RoutingSlipMemoryLeakUniqueSlipsTest.java | 87 ++++++++++++++++++++++
.../issues/RoutingSlipNotStopErrorHandlerTest.java | 69 +++++++++++++++++
4 files changed, 275 insertions(+), 23 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 88aae16..4c4e574 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -17,6 +17,8 @@
package org.apache.camel.processor;
import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
@@ -42,6 +44,7 @@ import org.apache.camel.spi.RouteContext;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
@@ -70,6 +73,20 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
protected Expression expression;
protected String uriDelimiter;
protected final CamelContext camelContext;
+ private final ConcurrentMap<PreparedErrorHandler, AsyncProcessor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, AsyncProcessor>();
+
+ /**
+ * Class that represents prepared fine grained error handlers when processing routingslip/dynamic-router exchanges
+ * <p/>
+ * This is similar to how multicast processor does.
+ */
+ static final class PreparedErrorHandler extends KeyValueHolder<String, Processor> {
+
+ PreparedErrorHandler(String key, Processor value) {
+ super(key, value);
+ }
+
+ }
/**
* The iterator to be used for retrieving the next routing slip(s) to be used.
@@ -321,6 +338,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
// this is needed to support redelivery on that output alone and not doing redelivery
// for the entire routingslip/dynamic-router block again which will start from scratch again
+ // create key for cache
+ final PreparedErrorHandler key = new PreparedErrorHandler(endpoint.getEndpointUri(), processor);
+
+ // lookup cached first to reuse and preserve memory
+ answer = errorHandlers.get(key);
+ if (answer != null) {
+ log.trace("Using existing error handler for: {}", processor);
+ return answer;
+ }
+
log.trace("Creating error handler for: {}", processor);
ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
// create error handler (create error handler directly to keep it light weight,
@@ -331,6 +358,9 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
// must start the error handler
ServiceHelper.startServices(answer);
+ // add to cache
+ errorHandlers.putIfAbsent(key, answer);
+
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
@@ -430,16 +460,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
// copy results back to the original exchange
ExchangeHelper.copyResults(original, current);
-
- if (target instanceof DeadLetterChannel) {
- Processor deadLetter = ((DeadLetterChannel) target).getDeadLetter();
- try {
- ServiceHelper.stopService(deadLetter);
- } catch (Exception e) {
- log.warn("Error stopping DeadLetterChannel error handler on routing slip. This exception is ignored.", e);
- }
- }
- } catch (Throwable e) {
+ } catch (Throwable e) {
exchange.setException(e);
}
@@ -449,18 +470,6 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
}
});
- // stop error handler if we completed synchronously
- if (answer) {
- if (target instanceof DeadLetterChannel) {
- Processor deadLetter = ((DeadLetterChannel) target).getDeadLetter();
- try {
- ServiceHelper.stopService(deadLetter);
- } catch (Exception e) {
- log.warn("Error stopping DeadLetterChannel error handler on routing slip. This exception is ignored.", e);
- }
- }
- }
-
return answer;
}
});
@@ -489,7 +498,10 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
}
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(producerCache);
+ ServiceHelper.stopAndShutdownServices(producerCache, errorHandlers);
+
+ // only clear error handlers when shutting down
+ errorHandlers.clear();
}
public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java
new file mode 100644
index 0000000..34b174c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.RoutingSlip;
+
+public class RoutingSlipMemoryLeakTwoTest extends ContextTestSupport {
+
+ @Override
+ protected void setUp() throws Exception {
+ deleteDirectory("target/output");
+ super.setUp();
+ }
+
+ /**
+ * Reproducer for the memory leak: CAMEL-10048
+ */
+ public void testMemoryLeakInExceptionHandlerCaching() throws Exception {
+ int messageCount = 100;
+ for (int i = 0; i < messageCount; i++) {
+ template.sendBody("direct:start", "message " + i);
+ }
+ RoutingSlip routingSlip = context.getProcessor("memory-leak", RoutingSlip.class);
+ assertNotNull(routingSlip);
+
+ Map<?, ?> errorHandlers = getRoutingSlipErrorHandlers(routingSlip);
+ assertEquals("Error handlers cache must contain only one value", 1, errorHandlers.size());
+ }
+
+ private Map<?, ?> getRoutingSlipErrorHandlers(RoutingSlip routingSlip) throws Exception {
+ Field errorHandlersField = routingSlip.getClass().getDeclaredField("errorHandlers");
+ errorHandlersField.setAccessible(true);
+ Map<?, ?> errorHandlers = (Map<?, ?>) errorHandlersField.get(routingSlip);
+ return errorHandlers;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead"));
+
+ from("direct:start")
+ .routingSlip(method(SlipProvider.class)).id("memory-leak");
+ }
+ };
+ }
+
+ public static class SlipProvider {
+
+ public String computeSlip(String body) {
+ /*
+ * It is important to have a processor here, that does not extend
+ * AsyncProcessor. Only in this case
+ * AsyncProcessorConverterHelper.convert() creates a new object,
+ * thus leading to a memory leak. For example, if you replace file
+ * endpoint with mock endpoint, then everything goes fine, because
+ * MockEndpoint.createProducer() creates an implementation of
+ * AsyncProcessor.
+ */
+ return "file:target/output";
+ }
+ }
+}
\ No newline at end of file
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java
new file mode 100644
index 0000000..39db36c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.RoutingSlip;
+
+public class RoutingSlipMemoryLeakUniqueSlipsTest extends ContextTestSupport {
+
+ private static final AtomicInteger counter = new AtomicInteger(0);
+
+ @Override
+ protected void setUp() throws Exception {
+ deleteDirectory("target/output");
+ super.setUp();
+ }
+
+ /**
+ * Reproducer for the memory leak: CAMEL-10048/CAMEL-10050
+ */
+ public void testMemoryLeakInExceptionHandlerCaching() throws Exception {
+ int messageCount = 100;
+ for (int i = 0; i < messageCount; i++) {
+ template.sendBody("direct:start", "message " + i);
+ }
+ RoutingSlip routingSlip = context.getProcessor("memory-leak", RoutingSlip.class);
+ assertNotNull(routingSlip);
+
+ Map<?, ?> errorHandlers = getRoutingSlipErrorHandlers(routingSlip);
+ assertEquals("Error handlers cache must contain only one value", 1, errorHandlers.size());
+ }
+
+ private Map<?, ?> getRoutingSlipErrorHandlers(RoutingSlip routingSlip) throws Exception {
+ Field errorHandlersField = routingSlip.getClass().getDeclaredField("errorHandlers");
+ errorHandlersField.setAccessible(true);
+ Map<?, ?> errorHandlers = (Map<?, ?>) errorHandlersField.get(routingSlip);
+ return errorHandlers;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead"));
+
+ from("direct:start")
+ .routingSlip(method(SlipProvider.class)).id("memory-leak");
+ }
+ };
+ }
+
+ public static class SlipProvider {
+
+ public String computeSlip(String body) {
+ /*
+ * It is important to have a processor here, that does not extend
+ * AsyncProcessor. Only in this case
+ * AsyncProcessorConverterHelper.convert() creates a new object,
+ * thus leading to a memory leak. For example, if you replace file
+ * endpoint with mock endpoint, then everything goes fine, because
+ * MockEndpoint.createProducer() creates an implementation of
+ * AsyncProcessor.
+ */
+ return "mock:" + counter.incrementAndGet();
+ }
+ }
+}
\ No newline at end of file
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipNotStopErrorHandlerTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipNotStopErrorHandlerTest.java
new file mode 100644
index 0000000..1b56196
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipNotStopErrorHandlerTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class RoutingSlipNotStopErrorHandlerTest extends ContextTestSupport {
+
+ private static final String DIRECT_START = "direct:start";
+ private static final String THROWING_ROUTE = "direct:throwingRoute";
+
+ public static class CustomRoutingSlip {
+
+ public String router() {
+ return THROWING_ROUTE;
+ }
+ }
+
+ @Test
+ public void testRoutingSlipNotStopErrorHandler() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(2);
+
+ template.sendBody(DIRECT_START, "ABC");
+
+ template.sendBody(THROWING_ROUTE, "123");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ errorHandler(deadLetterChannel("mock:result")
+ .maximumRedeliveries(1)
+ .redeliveryDelay(10)
+ .retriesExhaustedLogLevel(LoggingLevel.ERROR)
+ .retryAttemptedLogLevel(LoggingLevel.WARN)
+ .logStackTrace(true)
+ .logRetryStackTrace(true)
+ );
+
+ from(DIRECT_START).routingSlip(method(CustomRoutingSlip.class, "router"));
+
+ from(THROWING_ROUTE).process().exchange(o -> {
+ throw new IllegalStateException();
+ });
+ }
+ };
+ }
+}
--
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.