You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/31 17:55:39 UTC

[camel] branch master updated: CAMEL-14466: camel-core - Optimize reduce error handler memory usage

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dc8d0f  CAMEL-14466: camel-core - Optimize reduce error handler memory usage
2dc8d0f is described below

commit 2dc8d0f65767fd94d3fb77fa44c5dd2086e3a0b5
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 31 18:28:08 2020 +0100

    CAMEL-14466: camel-core - Optimize reduce error handler memory usage
---
 .../errorhandler/RedeliveryErrorHandler.java       | 282 +++++++++++++++++++--
 .../DefaultErrorHandlerLogStackTraceTest.java      |  47 ++++
 .../org/apache/camel/processor/RollbackTest.java   |   3 -
 ...orHandlerOnExceptionOccurredProcessorTest.java} |  23 +-
 .../OnExceptionOccurredProcessorTest.java          |   4 +-
 5 files changed, 319 insertions(+), 40 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 68131d7..3863d4c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -88,6 +88,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
     protected final boolean useOriginalMessagePolicy;
     protected final boolean useOriginalBodyPolicy;
     protected boolean redeliveryEnabled;
+    protected boolean simpleTask;
     protected volatile boolean preparingShutdown;
     protected final ExchangeFormatter exchangeFormatter;
     protected final boolean customExchangeFormatter;
@@ -163,13 +164,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
      */
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        // Create the redelivery state object for this exchange
-        RedeliveryState state = new RedeliveryState(exchange, callback);
+        // Create the redelivery task object for this exchange (optimize to only create task can do redelivery or not)
+        Runnable task;
+        if (simpleTask) {
+            task = new SimpleTask(exchange, callback);
+        } else {
+            task = new RedeliveryTask(exchange, callback);
+        }
         // Run it
         if (exchange.isTransacted()) {
-            reactiveExecutor.scheduleSync(state);
+            reactiveExecutor.scheduleSync(task);
         } else {
-            reactiveExecutor.scheduleMain(state);
+            reactiveExecutor.scheduleMain(task);
         }
         return false;
     }
@@ -228,8 +234,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
      * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY}
      * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p>
      *
-     * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryState#redeliveryDelay}
-     * and {@link RedeliveryState#redeliveryCounter} are copied in.</p>
+     * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryTask#redeliveryDelay}
+     * and {@link RedeliveryTask#redeliveryCounter} are copied in.</p>
      *
      * @param exchange The current exchange in question.
      * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation.
@@ -338,27 +344,245 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
     }
 
     /**
-     * Contains the current redelivery state
+     * Simple task to perform calling the processor with no redelivery support
      */
-    protected class RedeliveryState implements Runnable {
-        Exchange original;
-        ExtendedExchange exchange;
-        AsyncCallback callback;
-        int redeliveryCounter;
-        long redeliveryDelay;
-        Predicate retryWhilePredicate;
+    protected class SimpleTask implements Runnable {
+        private ExtendedExchange exchange;
+        private AsyncCallback callback;
+
+        public SimpleTask(Exchange exchange, AsyncCallback callback) {
+            // init with values from the error handler
+            this.exchange = (ExtendedExchange) exchange;
+            this.callback = callback;
+        }
+
+        @Override
+        public String toString() {
+            return "SimpleTask";
+        }
+
+        /**
+         * Processing logic.
+         */
+        @Override
+        public void run() {
+            // can we still run
+            if (!isRunAllowed()) {
+                LOG.trace("Run not allowed, will reject executing exchange: {}", exchange);
+                if (exchange.getException() == null) {
+                    exchange.setException(new RejectedExecutionException());
+                }
+                callback.done(false);
+                return;
+            }
+
+            if (exchange.getException() != null) {
+                // previous processing cause an exception
+                handleException();
+                onExceptionOccurred();
+                prepareExchangeAfterFailure(exchange);
+
+                // we do not support redelivery so continue callback
+                reactiveExecutor.schedule(callback);
+            } else {
+                // Simple delivery
+                outputAsync.process(exchange, doneSync -> {
+                    // only continue with callback if we are done
+                    if (isDone(exchange)) {
+                        reactiveExecutor.schedule(callback);
+                    } else {
+                        // error occurred so loop back around and call ourselves
+                        reactiveExecutor.schedule(this);
+                    }
+                });
+            }
+        }
+
+        protected boolean isRunAllowed() {
+            // if camel context is forcing a shutdown then do not allow running
+            boolean forceShutdown = shutdownStrategy.forceShutdown(RedeliveryErrorHandler.this);
+            if (forceShutdown) {
+                return false;
+            }
+
+            // we cannot run if we are stopping/stopped
+            return !isStoppingOrStopped();
+        }
+
+        protected void handleException() {
+            Exception e = exchange.getException();
+            // e is never null
+
+            Throwable previous = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+            if (previous != null && previous != e) {
+                // a 2nd exception was thrown while handling a previous exception
+                // so we need to add the previous as suppressed by the new exception
+                // see also FatalFallbackErrorHandler
+                Throwable[] suppressed = e.getSuppressed();
+                boolean found = false;
+                for (Throwable t : suppressed) {
+                    if (t == previous) {
+                        found = true;
+                    }
+                }
+                if (!found) {
+                    e.addSuppressed(previous);
+                }
+            }
+
+            // store the original caused exception in a property, so we can restore it later
+            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+        }
+
+        /**
+         * Gives an optional configured OnExceptionOccurred processor a chance to process just after an exception
+         * was thrown while processing the Exchange. This allows to execute the processor at the same time the exception was thrown.
+         */
+        protected void onExceptionOccurred() {
+            if (onExceptionProcessor == null) {
+                return;
+            }
+
+            // run this synchronously as its just a Processor
+            try {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("OnExceptionOccurred processor {} is processing Exchange: {} due exception occurred", onExceptionProcessor, exchange);
+                }
+                onExceptionProcessor.process(exchange);
+            } catch (Throwable e) {
+                // we dont not want new exception to override existing, so log it as a WARN
+                LOG.warn("Error during processing OnExceptionOccurred. This exception is ignored.", e);
+            }
+            LOG.trace("OnExceptionOccurred processor done");
+        }
+
+        protected void prepareExchangeAfterFailure(final Exchange exchange) {
+            ExtendedExchange ee = (ExtendedExchange) exchange;
+
+            // we could not process the exchange so we let the failure processor handled it
+            ExchangeHelper.setFailureHandled(exchange);
+
+            // honor if already set a handling
+            boolean alreadySet = ee.getErrorHandlerHandled() != null;
+            if (alreadySet) {
+                boolean handled = ee.getErrorHandlerHandled() != null && ee.getErrorHandlerHandled();
+                LOG.trace("This exchange has already been marked for handling: {}", handled);
+                if (!handled) {
+                    // exception not handled, put exception back in the exchange
+                    exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
+                    // and put failure endpoint back as well
+                    exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+                }
+                return;
+            }
+
+            // not handled by default
+            prepareExchangeAfterFailureNotHandled(exchange);
+        }
+
+        private void prepareExchangeAfterFailureNotHandled(Exchange exchange) {
+            ExtendedExchange ee = (ExtendedExchange) exchange;
+
+            LOG.trace("This exchange is not handled or continued so its marked as failed: {}", ee);
+            // exception not handled, put exception back in the exchange
+            ee.setErrorHandlerHandled(false);
+            ee.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
+            // and put failure endpoint back as well
+            ee.setProperty(Exchange.FAILURE_ENDPOINT, ee.getProperty(Exchange.TO_ENDPOINT));
+            // and store the route id so we know in which route we failed
+            UnitOfWork uow = ee.getUnitOfWork();
+            RouteContext rc = uow != null ? uow.getRouteContext() : null;
+            if (rc != null) {
+                ee.setProperty(Exchange.FAILURE_ROUTE_ID, rc.getRouteId());
+            }
+
+            // create log message
+            String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
+            msg = msg + ". Exhausted after delivery attempt: 1 caught: " + ee.getException();
+
+            // log that we failed delivery as we are exhausted
+            logFailedDelivery(exchange, msg, null);
+        }
+
+        private void logFailedDelivery(Exchange exchange, String message, Throwable e) {
+            if (logger == null) {
+                return;
+            }
+
+            if (e == null) {
+                e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
+            }
+
+            if (exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) {
+                String msg = "Rollback " + ExchangeHelper.logIds(exchange);
+                Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+                if (cause != null) {
+                    msg = msg + " due: " + cause.getMessage();
+                }
+
+                // should we include message history
+                if (redeliveryPolicy.isLogExhaustedMessageHistory()) {
+                    // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
+                    ExchangeFormatter formatter = customExchangeFormatter
+                            ? exchangeFormatter : (redeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
+                    String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, false);
+                    if (routeStackTrace != null) {
+                        msg = msg + "\n" + routeStackTrace;
+                    }
+                }
+
+                if (logger.getLevel() == LoggingLevel.ERROR) {
+                    // log intended rollback on maximum WARN level (not ERROR)
+                    logger.log(msg, LoggingLevel.WARN);
+                } else {
+                    // otherwise use the desired logging level
+                    logger.log(msg);
+                }
+            } else {
+                String msg = message;
+                // should we include message history
+                if (redeliveryPolicy.isLogExhaustedMessageHistory()) {
+                    // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
+                    ExchangeFormatter formatter = customExchangeFormatter
+                            ? exchangeFormatter : (redeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
+                    String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, e != null && redeliveryPolicy.isLogStackTrace());
+                    if (routeStackTrace != null) {
+                        msg = msg + "\n" + routeStackTrace;
+                    }
+                }
+
+                if (e != null && redeliveryPolicy.isLogStackTrace()) {
+                    logger.log(msg, e);
+                } else {
+                    logger.log(msg);
+                }
+            }
+        }
+   }
+
+
+    /**
+     * Task to perform calling the processor and handling redelivery if it fails (more advanced than ProcessTask)
+     */
+    protected class RedeliveryTask implements Runnable {
+        private Exchange original;
+        private ExtendedExchange exchange;
+        private AsyncCallback callback;
+        private int redeliveryCounter;
+        private long redeliveryDelay;
+        private Predicate retryWhilePredicate;
 
         // default behavior which can be overloaded on a per exception basis
-        RedeliveryPolicy currentRedeliveryPolicy;
-        Processor failureProcessor;
-        Processor onRedeliveryProcessor;
-        Processor onExceptionProcessor;
-        Predicate handledPredicate;
-        Predicate continuedPredicate;
-        boolean useOriginalInMessage;
-        boolean useOriginalInBody;
-
-        public RedeliveryState(Exchange exchange, AsyncCallback callback) {
+        private RedeliveryPolicy currentRedeliveryPolicy;
+        private Processor failureProcessor;
+        private Processor onRedeliveryProcessor;
+        private Processor onExceptionProcessor;
+        private Predicate handledPredicate;
+        private Predicate continuedPredicate;
+        private boolean useOriginalInMessage;
+        private boolean useOriginalInBody;
+
+        public RedeliveryTask(Exchange exchange, AsyncCallback callback) {
             // init with values from the error handler
             this.retryWhilePredicate = retryWhilePolicy;
             this.currentRedeliveryPolicy = redeliveryPolicy;
@@ -377,11 +601,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
 
         @Override
         public String toString() {
-            return "RedeliveryState";
+            return "RedeliveryTask";
         }
 
         /**
-         * Redelivery logic.
+         * Processing and redelivery logic.
          */
         @Override
         public void run() {
@@ -1254,6 +1478,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         // reset flag when starting
         preparingShutdown = false;
         redeliverySleepCounter.set(0);
+
+        // calculate if we can use simple task or not
+        // if we need redelivery and other things then we cannot)
+        // however if we dont then its less memory overhead (and a bit less cpu) of using the simple task
+        simpleTask = deadLetter == null && !redeliveryEnabled && exceptionPolicies.isEmpty()
+                && onPrepareProcessor == null;
     }
 
     @Override
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerLogStackTraceTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerLogStackTraceTest.java
new file mode 100644
index 0000000..0f08c06
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerLogStackTraceTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class DefaultErrorHandlerLogStackTraceTest extends ContextTestSupport {
+
+    @Test
+    public void testLogStackTrace() throws Exception {
+        try {
+            template.sendBody("direct:start", "Hello World");
+            fail("Should fail");
+        } catch (Exception e) {
+            // ignore
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(defaultErrorHandler().logStackTrace(true).loggingLevel(LoggingLevel.WARN));
+
+                from("direct:start").log("Incoming ${body}").throwException(new IllegalArgumentException("Forced"));
+            }
+        };
+    }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java
index 4047885..2dc0769 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java
@@ -66,9 +66,6 @@ public class RollbackTest extends ContextTestSupport {
         assertNotNull(out.getException());
         assertIsInstanceOf(RollbackExchangeException.class, out.getException());
         assertEquals("Should be marked as rollback", true, out.isRollbackOnly());
-        // should not try to redeliver if exchange was marked as rollback only
-        assertEquals(0, out.getIn().getHeader(Exchange.REDELIVERY_COUNTER));
-        assertEquals(false, out.getIn().getHeader(Exchange.REDELIVERED));
     }
 
     @Override
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/DefaultErrorHandlerOnExceptionOccurredProcessorTest.java
similarity index 76%
copy from core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/onexception/DefaultErrorHandlerOnExceptionOccurredProcessorTest.java
index be9ac44..1a395fc 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/DefaultErrorHandlerOnExceptionOccurredProcessorTest.java
@@ -23,7 +23,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.JndiRegistry;
 import org.junit.Test;
 
-public class OnExceptionOccurredProcessorTest extends ContextTestSupport {
+public class DefaultErrorHandlerOnExceptionOccurredProcessorTest extends ContextTestSupport {
 
     @Override
     protected JndiRegistry createRegistry() throws Exception {
@@ -34,15 +34,16 @@ public class OnExceptionOccurredProcessorTest extends ContextTestSupport {
 
     @Test
     public void testOnExceptionOccurred() throws Exception {
-        getMockEndpoint("mock:dead").expectedMessageCount(1);
-
-        template.sendBody("direct:start", "Hello World");
-
-        assertMockEndpointsSatisfied();
+        try {
+            template.sendBody("direct:start", "Hello World");
+            fail("Should throw exception");
+        } catch (Exception e) {
+            // expected
+        }
 
         MyProcessor myProcessor = context.getRegistry().lookupByNameAndType("myProcessor", MyProcessor.class);
-        // 1 = first time + 3 redelivery attempts
-        assertEquals(1 + 3, myProcessor.getInvoked());
+        // 1 = first time + 0 redelivery attempts
+        assertEquals(1, myProcessor.getInvoked());
     }
 
     @Override
@@ -52,9 +53,11 @@ public class OnExceptionOccurredProcessorTest extends ContextTestSupport {
             public void configure() throws Exception {
                 MyProcessor myProcessor = context.getRegistry().lookupByNameAndType("myProcessor", MyProcessor.class);
 
-                errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(3).redeliveryDelay(0).onExceptionOccurred(myProcessor));
+                errorHandler(defaultErrorHandler().onExceptionOccurred(myProcessor));
+
+                from("direct:start").to("log:a").to("direct:foo").to("log:b");
 
-                from("direct:start").throwException(new IllegalArgumentException("Forced"));
+                from("direct:foo").throwException(new IllegalArgumentException("Forced"));
             }
         };
     }
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java
index be9ac44..618de36 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java
@@ -54,7 +54,9 @@ public class OnExceptionOccurredProcessorTest extends ContextTestSupport {
 
                 errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(3).redeliveryDelay(0).onExceptionOccurred(myProcessor));
 
-                from("direct:start").throwException(new IllegalArgumentException("Forced"));
+                from("direct:start").to("log:a").to("direct:foo").to("log:b");
+
+                from("direct:foo").throwException(new IllegalArgumentException("Forced"));
             }
         };
     }