You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/04 18:52:12 UTC

[camel] 02/07: CAMEL-12120: routingSlip and dynamicRouter should use an embedded dynamicProcessor to send to endpoints instead of error handler cache.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 021026be1effad794fa32a0f0d2c7a29596dc633
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jan 4 15:18:02 2018 +0100

    CAMEL-12120: routingSlip and dynamicRouter should use an embedded dynamicProcessor to send to endpoints instead of error handler cache.
---
 .../camel/model/DynamicRouterDefinition.java       | 21 ++++++
 .../apache/camel/model/RoutingSlipDefinition.java  | 21 ++++++
 .../org/apache/camel/processor/RoutingSlip.java    | 68 ++++-------------
 .../camel/issues/RoutingSlipMemoryLeakTwoTest.java | 84 ---------------------
 .../RoutingSlipMemoryLeakUniqueSlipsTest.java      | 87 ----------------------
 5 files changed, 57 insertions(+), 224 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java b/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
index 9d39830..505fb01 100644
--- a/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
@@ -23,13 +23,19 @@ import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
 
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.ErrorHandlerFactory;
+import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.DynamicRouter;
+import org.apache.camel.processor.SendDynamicProcessor;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
 
+import static org.apache.camel.builder.ExpressionBuilder.headerExpression;
+
 /**
  * Routes messages based on dynamic rules
  */
@@ -81,6 +87,21 @@ public class DynamicRouterDefinition<Type extends ProcessorDefinition<Type>> ext
         if (getCacheSize() != null) {
             dynamicRouter.setCacheSize(getCacheSize());
         }
+
+        // use dynamic processor to send to the computed slip endpoint
+        SendDynamicProcessor dynamicProcessor = new SendDynamicProcessor(headerExpression(Exchange.SLIP_ENDPOINT));
+        dynamicProcessor.setCamelContext(routeContext.getCamelContext());
+        if (getCacheSize() != null) {
+            dynamicProcessor.setCacheSize(getCacheSize());
+        }
+
+        // and wrap this in an error handler
+        ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
+        // create error handler (create error handler directly to keep it light weight,
+        // instead of using ProcessorDefinition.wrapInErrorHandler)
+        AsyncProcessor errorHandler = (AsyncProcessor) builder.createErrorHandler(routeContext, dynamicProcessor);
+        dynamicRouter.setErrorHandler(errorHandler);
+
         return dynamicRouter;
     }
 
diff --git a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index 5d02a36..8815640 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -23,14 +23,20 @@ import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
 
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.ErrorHandlerFactory;
+import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.model.language.HeaderExpression;
 import org.apache.camel.processor.RoutingSlip;
+import org.apache.camel.processor.SendDynamicProcessor;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
 
+import static org.apache.camel.builder.ExpressionBuilder.headerExpression;
+
 /**
  * Routes a message through a series of steps that are pre-determined (the slip)
  */
@@ -91,6 +97,21 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
         if (getCacheSize() != null) {
             routingSlip.setCacheSize(getCacheSize());
         }
+
+        // use dynamic processor to send to the computed slip endpoint
+        SendDynamicProcessor dynamicProcessor = new SendDynamicProcessor(headerExpression(Exchange.SLIP_ENDPOINT));
+        dynamicProcessor.setCamelContext(routeContext.getCamelContext());
+        if (getCacheSize() != null) {
+            dynamicProcessor.setCacheSize(getCacheSize());
+        }
+
+        // and wrap this in an error handler
+        ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
+        // create error handler (create error handler directly to keep it light weight,
+        // instead of using ProcessorDefinition.wrapInErrorHandler)
+        AsyncProcessor errorHandler = (AsyncProcessor) builder.createErrorHandler(routeContext, dynamicProcessor);
+        routingSlip.setErrorHandler(errorHandler);
+
         return routingSlip;
     }
 
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 4c4e574..ccfb808 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -17,21 +17,17 @@
 package org.apache.camel.processor;
 
 import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.AsyncProducerCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
-import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
 import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.Message;
-import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.Traceable;
 import org.apache.camel.builder.ExpressionBuilder;
@@ -44,7 +40,6 @@ import org.apache.camel.spi.RouteContext;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -73,20 +68,7 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
     protected Expression expression;
     protected String uriDelimiter;
     protected final CamelContext camelContext;
-    private final ConcurrentMap<PreparedErrorHandler, AsyncProcessor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, AsyncProcessor>();
-
-    /**
-     * Class that represents prepared fine grained error handlers when processing routingslip/dynamic-router exchanges
-     * <p/>
-     * This is similar to how multicast processor does.
-     */
-    static final class PreparedErrorHandler extends KeyValueHolder<String, Processor> {
-
-        PreparedErrorHandler(String key, Processor value) {
-            super(key, value);
-        }
-
-    }
+    protected AsyncProcessor errorHandler;
 
     /**
      * The iterator to be used for retrieving the next routing slip(s) to be used.
@@ -162,6 +144,14 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
         this.cacheSize = cacheSize;
     }
 
+    public AsyncProcessor getErrorHandler() {
+        return errorHandler;
+    }
+
+    public void setErrorHandler(AsyncProcessor errorHandler) {
+        this.errorHandler = errorHandler;
+    }
+
     @Override
     public String toString() {
         return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]";
@@ -332,38 +322,12 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
         boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class);
 
         // do not wrap in error handler if we are inside a try block
-        if (!tryBlock && routeContext != null) {
+        if (!tryBlock && routeContext != null && errorHandler != null) {
             // wrap the producer in error handler so we have fine grained error handling on
             // the output side instead of the input side
             // this is needed to support redelivery on that output alone and not doing redelivery
             // for the entire routingslip/dynamic-router block again which will start from scratch again
-
-            // create key for cache
-            final PreparedErrorHandler key = new PreparedErrorHandler(endpoint.getEndpointUri(), processor);
-
-            // lookup cached first to reuse and preserve memory
-            answer = errorHandlers.get(key);
-            if (answer != null) {
-                log.trace("Using existing error handler for: {}", processor);
-                return answer;
-            }
-
-            log.trace("Creating error handler for: {}", processor);
-            ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
-            // create error handler (create error handler directly to keep it light weight,
-            // instead of using ProcessorDefinition.wrapInErrorHandler)
-            try {
-                answer = (AsyncProcessor) builder.createErrorHandler(routeContext, processor);
-
-                // must start the error handler
-                ServiceHelper.startServices(answer);
-
-                // add to cache
-                errorHandlers.putIfAbsent(key, answer);
-
-            } catch (Exception e) {
-                throw ObjectHelper.wrapRuntimeCamelException(e);
-            }
+            answer = errorHandler;
         }
 
         return answer;
@@ -490,18 +454,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
                 log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize);
             }
         }
-        ServiceHelper.startService(producerCache);
+
+        ServiceHelper.startServices(producerCache, errorHandler);
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(producerCache);
+        ServiceHelper.stopServices(producerCache, errorHandler);
     }
 
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(producerCache, errorHandlers);
-
-        // only clear error handlers when shutting down
-        errorHandlers.clear();
+        ServiceHelper.stopAndShutdownServices(producerCache, errorHandler);
     }
 
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java
deleted file mode 100644
index 34b174c..0000000
--- a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTwoTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.issues;
-
-import java.lang.reflect.Field;
-import java.util.Map;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.RoutingSlip;
-
-public class RoutingSlipMemoryLeakTwoTest extends ContextTestSupport {
-
-    @Override
-    protected void setUp() throws Exception {
-        deleteDirectory("target/output");
-        super.setUp();
-    }
-
-    /**
-     * Reproducer for the memory leak: CAMEL-10048
-     */
-    public void testMemoryLeakInExceptionHandlerCaching() throws Exception {
-        int messageCount = 100;
-        for (int i = 0; i < messageCount; i++) {
-            template.sendBody("direct:start", "message " + i);
-        }
-        RoutingSlip routingSlip = context.getProcessor("memory-leak", RoutingSlip.class);
-        assertNotNull(routingSlip);
-
-        Map<?, ?> errorHandlers = getRoutingSlipErrorHandlers(routingSlip);
-        assertEquals("Error handlers cache must contain only one value", 1, errorHandlers.size());
-    }
-
-    private Map<?, ?> getRoutingSlipErrorHandlers(RoutingSlip routingSlip) throws Exception {
-        Field errorHandlersField = routingSlip.getClass().getDeclaredField("errorHandlers");
-        errorHandlersField.setAccessible(true);
-        Map<?, ?> errorHandlers = (Map<?, ?>) errorHandlersField.get(routingSlip);
-        return errorHandlers;
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                errorHandler(deadLetterChannel("mock:dead"));
-
-                from("direct:start")
-                    .routingSlip(method(SlipProvider.class)).id("memory-leak");
-            }
-        };
-    }
-
-    public static class SlipProvider {
-
-        public String computeSlip(String body) {
-            /*
-             * It is important to have a processor here, that does not extend
-             * AsyncProcessor. Only in this case
-             * AsyncProcessorConverterHelper.convert() creates a new object,
-             * thus leading to a memory leak. For example, if you replace file
-             * endpoint with mock endpoint, then everything goes fine, because
-             * MockEndpoint.createProducer() creates an implementation of
-             * AsyncProcessor.
-             */
-            return "file:target/output";
-        }
-    }
-}
\ No newline at end of file
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java
deleted file mode 100644
index 39db36c..0000000
--- a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakUniqueSlipsTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.issues;
-
-import java.lang.reflect.Field;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.RoutingSlip;
-
-public class RoutingSlipMemoryLeakUniqueSlipsTest extends ContextTestSupport {
-
-    private static final AtomicInteger counter = new AtomicInteger(0);
-
-    @Override
-    protected void setUp() throws Exception {
-        deleteDirectory("target/output");
-        super.setUp();
-    }
-
-    /**
-     * Reproducer for the memory leak: CAMEL-10048/CAMEL-10050
-     */
-    public void testMemoryLeakInExceptionHandlerCaching() throws Exception {
-        int messageCount = 100;
-        for (int i = 0; i < messageCount; i++) {
-            template.sendBody("direct:start", "message " + i);
-        }
-        RoutingSlip routingSlip = context.getProcessor("memory-leak", RoutingSlip.class);
-        assertNotNull(routingSlip);
-
-        Map<?, ?> errorHandlers = getRoutingSlipErrorHandlers(routingSlip);
-        assertEquals("Error handlers cache must contain only one value", 1, errorHandlers.size());
-    }
-
-    private Map<?, ?> getRoutingSlipErrorHandlers(RoutingSlip routingSlip) throws Exception {
-        Field errorHandlersField = routingSlip.getClass().getDeclaredField("errorHandlers");
-        errorHandlersField.setAccessible(true);
-        Map<?, ?> errorHandlers = (Map<?, ?>) errorHandlersField.get(routingSlip);
-        return errorHandlers;
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                errorHandler(deadLetterChannel("mock:dead"));
-
-                from("direct:start")
-                    .routingSlip(method(SlipProvider.class)).id("memory-leak");
-            }
-        };
-    }
-
-    public static class SlipProvider {
-
-        public String computeSlip(String body) {
-            /*
-             * It is important to have a processor here, that does not extend
-             * AsyncProcessor. Only in this case
-             * AsyncProcessorConverterHelper.convert() creates a new object,
-             * thus leading to a memory leak. For example, if you replace file
-             * endpoint with mock endpoint, then everything goes fine, because
-             * MockEndpoint.createProducer() creates an implementation of
-             * AsyncProcessor.
-             */
-            return "mock:" + counter.incrementAndGet();
-        }
-    }
-}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.