You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/04 18:52:11 UTC

[camel] 01/07: CAMEL-12120: Routingslip/Dynamic-Router EIPs can cause error handlers to be stopped for shared error handlers. Backported older working code. CAMEL-10050 is re-introduced again and we need to come up with a better solution.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9687a0ca2e9572c0970bc227c7755592e8652dc1
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jan 4 10:12:57 2018 +0100

    CAMEL-12120: Routingslip/Dynamic-Router EIPs can cause error handlers to be stopped for shared error handlers. Backported older working code. CAMEL-10050 is re-introduced again and we need to come up with a better solution.
---
 .../org/apache/camel/processor/RoutingSlip.java    | 58 +++++++++------
 .../camel/issues/RoutingSlipMemoryLeakTwoTest.java | 84 +++++++++++++++++++++
 .../RoutingSlipMemoryLeakUniqueSlipsTest.java      | 87 ++++++++++++++++++++++
 .../issues/RoutingSlipNotStopErrorHandlerTest.java | 69 +++++++++++++++++
 4 files changed, 275 insertions(+), 23 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 88aae16..4c4e574 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -17,6 +17,8 @@
 package org.apache.camel.processor;
 
 import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -42,6 +44,7 @@ import org.apache.camel.spi.RouteContext;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -70,6 +73,20 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
     protected Expression expression;
     protected String uriDelimiter;
     protected final CamelContext camelContext;
+    private final ConcurrentMap<PreparedErrorHandler, AsyncProcessor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, AsyncProcessor>();
+
+    /**
+     * Class that represents prepared fine grained error handlers when processing routingslip/dynamic-router exchanges
+     * <p/>
+     * This is similar to how multicast processor does.
+     */
+    static final class PreparedErrorHandler extends KeyValueHolder<String, Processor> {
+
+        PreparedErrorHandler(String key, Processor value) {
+            super(key, value);
+        }
+
+    }
 
     /**
      * The iterator to be used for retrieving the next routing slip(s) to be used.
@@ -321,6 +338,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
             // this is needed to support redelivery on that output alone and not doing redelivery
             // for the entire routingslip/dynamic-router block again which will start from scratch again
 
+            // create key for cache
+            final PreparedErrorHandler key = new PreparedErrorHandler(endpoint.getEndpointUri(), processor);
+
+            // lookup cached first to reuse and preserve memory
+            answer = errorHandlers.get(key);
+            if (answer != null) {
+                log.trace("Using existing error handler for: {}", processor);
+                return answer;
+            }
+
             log.trace("Creating error handler for: {}", processor);
             ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
             // create error handler (create error handler directly to keep it light weight,
@@ -331,6 +358,9 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
                 // must start the error handler
                 ServiceHelper.startServices(answer);
 
+                // add to cache
+                errorHandlers.putIfAbsent(key, answer);
+
             } catch (Exception e) {
                 throw ObjectHelper.wrapRuntimeCamelException(e);
             }
@@ -430,16 +460,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
 
                             // copy results back to the original exchange
                             ExchangeHelper.copyResults(original, current);
-
-                            if (target instanceof DeadLetterChannel) {
-                                Processor deadLetter = ((DeadLetterChannel) target).getDeadLetter();
-                                try {
-                                    ServiceHelper.stopService(deadLetter);
-                                } catch (Exception e) {
-                                    log.warn("Error stopping DeadLetterChannel error handler on routing slip. This exception is ignored.", e);
-                                }
-                            }
-                        } catch (Throwable e) {
+                       } catch (Throwable e) {
                             exchange.setException(e);
                         }
 
@@ -449,18 +470,6 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
                     }
                 });
 
-                // stop error handler if we completed synchronously
-                if (answer) {
-                    if (target instanceof DeadLetterChannel) {
-                        Processor deadLetter = ((DeadLetterChannel) target).getDeadLetter();
-                        try {
-                            ServiceHelper.stopService(deadLetter);
-                        } catch (Exception e) {
-                            log.warn("Error stopping DeadLetterChannel error handler on routing slip. This exception is ignored.", e);
-                        }
-                    }
-                }
-
                 return answer;
             }
         });
@@ -489,7 +498,10 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
     }
 
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(producerCache);
+        ServiceHelper.stopAndShutdownServices(producerCache, errorHandlers);
+
+        // only clear error handlers when shutting down
+        errorHandlers.clear();
     }
 
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java
new file mode 100644
index 0000000..34b174c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.RoutingSlip;
+
+public class RoutingSlipMemoryLeakTwoTest extends ContextTestSupport {
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/output");
+        super.setUp();
+    }
+
+    /**
+     * Reproducer for the memory leak: CAMEL-10048
+     */
+    public void testMemoryLeakInExceptionHandlerCaching() throws Exception {
+        int messageCount = 100;
+        for (int i = 0; i < messageCount; i++) {
+            template.sendBody("direct:start", "message " + i);
+        }
+        RoutingSlip routingSlip = context.getProcessor("memory-leak", RoutingSlip.class);
+        assertNotNull(routingSlip);
+
+        Map<?, ?> errorHandlers = getRoutingSlipErrorHandlers(routingSlip);
+        assertEquals("Error handlers cache must contain only one value", 1, errorHandlers.size());
+    }
+
+    private Map<?, ?> getRoutingSlipErrorHandlers(RoutingSlip routingSlip) throws Exception {
+        Field errorHandlersField = routingSlip.getClass().getDeclaredField("errorHandlers");
+        errorHandlersField.setAccessible(true);
+        Map<?, ?> errorHandlers = (Map<?, ?>) errorHandlersField.get(routingSlip);
+        return errorHandlers;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                from("direct:start")
+                    .routingSlip(method(SlipProvider.class)).id("memory-leak");
+            }
+        };
+    }
+
+    public static class SlipProvider {
+
+        public String computeSlip(String body) {
+            /*
+             * It is important to have a processor here, that does not extend
+             * AsyncProcessor. Only in this case
+             * AsyncProcessorConverterHelper.convert() creates a new object,
+             * thus leading to a memory leak. For example, if you replace file
+             * endpoint with mock endpoint, then everything goes fine, because
+             * MockEndpoint.createProducer() creates an implementation of
+             * AsyncProcessor.
+             */
+            return "file:target/output";
+        }
+    }
+}
\ No newline at end of file
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java
new file mode 100644
index 0000000..39db36c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.RoutingSlip;
+
+public class RoutingSlipMemoryLeakUniqueSlipsTest extends ContextTestSupport {
+
+    private static final AtomicInteger counter = new AtomicInteger(0);
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/output");
+        super.setUp();
+    }
+
+    /**
+     * Reproducer for the memory leak: CAMEL-10048/CAMEL-10050
+     */
+    public void testMemoryLeakInExceptionHandlerCaching() throws Exception {
+        int messageCount = 100;
+        for (int i = 0; i < messageCount; i++) {
+            template.sendBody("direct:start", "message " + i);
+        }
+        RoutingSlip routingSlip = context.getProcessor("memory-leak", RoutingSlip.class);
+        assertNotNull(routingSlip);
+
+        Map<?, ?> errorHandlers = getRoutingSlipErrorHandlers(routingSlip);
+        assertEquals("Error handlers cache must contain only one value", 1, errorHandlers.size());
+    }
+
+    private Map<?, ?> getRoutingSlipErrorHandlers(RoutingSlip routingSlip) throws Exception {
+        Field errorHandlersField = routingSlip.getClass().getDeclaredField("errorHandlers");
+        errorHandlersField.setAccessible(true);
+        Map<?, ?> errorHandlers = (Map<?, ?>) errorHandlersField.get(routingSlip);
+        return errorHandlers;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                from("direct:start")
+                    .routingSlip(method(SlipProvider.class)).id("memory-leak");
+            }
+        };
+    }
+
+    public static class SlipProvider {
+
+        public String computeSlip(String body) {
+            /*
+             * It is important to have a processor here, that does not extend
+             * AsyncProcessor. Only in this case
+             * AsyncProcessorConverterHelper.convert() creates a new object,
+             * thus leading to a memory leak. For example, if you replace file
+             * endpoint with mock endpoint, then everything goes fine, because
+             * MockEndpoint.createProducer() creates an implementation of
+             * AsyncProcessor.
+             */
+            return "mock:" + counter.incrementAndGet();
+        }
+    }
+}
\ No newline at end of file
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipNotStopErrorHandlerTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipNotStopErrorHandlerTest.java
new file mode 100644
index 0000000..1b56196
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipNotStopErrorHandlerTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class RoutingSlipNotStopErrorHandlerTest extends ContextTestSupport {
+
+    private static final String DIRECT_START = "direct:start";
+    private static final String THROWING_ROUTE = "direct:throwingRoute";
+
+    public static class CustomRoutingSlip {
+
+        public String router() {
+            return THROWING_ROUTE;
+        }
+    }
+
+    @Test
+    public void testRoutingSlipNotStopErrorHandler() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(2);
+
+        template.sendBody(DIRECT_START, "ABC");
+
+        template.sendBody(THROWING_ROUTE, "123");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                errorHandler(deadLetterChannel("mock:result")
+                    .maximumRedeliveries(1)
+                    .redeliveryDelay(10)
+                    .retriesExhaustedLogLevel(LoggingLevel.ERROR)
+                    .retryAttemptedLogLevel(LoggingLevel.WARN)
+                    .logStackTrace(true)
+                    .logRetryStackTrace(true)
+                );
+
+                from(DIRECT_START).routingSlip(method(CustomRoutingSlip.class, "router"));
+
+                from(THROWING_ROUTE).process().exchange(o -> {
+                    throw new IllegalStateException();
+                });
+            }
+        };
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.