You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/31 17:55:39 UTC
[camel] branch master updated: CAMEL-14466: camel-core - Optimize
reduce error handler memory usage
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 2dc8d0f CAMEL-14466: camel-core - Optimize reduce error handler memory usage
2dc8d0f is described below
commit 2dc8d0f65767fd94d3fb77fa44c5dd2086e3a0b5
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 31 18:28:08 2020 +0100
CAMEL-14466: camel-core - Optimize reduce error handler memory usage
---
.../errorhandler/RedeliveryErrorHandler.java | 282 +++++++++++++++++++--
.../DefaultErrorHandlerLogStackTraceTest.java | 47 ++++
.../org/apache/camel/processor/RollbackTest.java | 3 -
...orHandlerOnExceptionOccurredProcessorTest.java} | 23 +-
.../OnExceptionOccurredProcessorTest.java | 4 +-
5 files changed, 319 insertions(+), 40 deletions(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 68131d7..3863d4c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -88,6 +88,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
protected final boolean useOriginalMessagePolicy;
protected final boolean useOriginalBodyPolicy;
protected boolean redeliveryEnabled;
+ protected boolean simpleTask;
protected volatile boolean preparingShutdown;
protected final ExchangeFormatter exchangeFormatter;
protected final boolean customExchangeFormatter;
@@ -163,13 +164,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
*/
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
- // Create the redelivery state object for this exchange
- RedeliveryState state = new RedeliveryState(exchange, callback);
+ // Create the redelivery task object for this exchange (optimize to only create task can do redelivery or not)
+ Runnable task;
+ if (simpleTask) {
+ task = new SimpleTask(exchange, callback);
+ } else {
+ task = new RedeliveryTask(exchange, callback);
+ }
// Run it
if (exchange.isTransacted()) {
- reactiveExecutor.scheduleSync(state);
+ reactiveExecutor.scheduleSync(task);
} else {
- reactiveExecutor.scheduleMain(state);
+ reactiveExecutor.scheduleMain(task);
}
return false;
}
@@ -228,8 +234,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
* <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY}
* and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p>
*
- * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryState#redeliveryDelay}
- * and {@link RedeliveryState#redeliveryCounter} are copied in.</p>
+ * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryTask#redeliveryDelay}
+ * and {@link RedeliveryTask#redeliveryCounter} are copied in.</p>
*
* @param exchange The current exchange in question.
* @param redeliveryPolicy The RedeliveryPolicy to use in the calculation.
@@ -338,27 +344,245 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
}
/**
- * Contains the current redelivery state
+ * Simple task to perform calling the processor with no redelivery support
*/
- protected class RedeliveryState implements Runnable {
- Exchange original;
- ExtendedExchange exchange;
- AsyncCallback callback;
- int redeliveryCounter;
- long redeliveryDelay;
- Predicate retryWhilePredicate;
+ protected class SimpleTask implements Runnable {
+ private ExtendedExchange exchange;
+ private AsyncCallback callback;
+
+ public SimpleTask(Exchange exchange, AsyncCallback callback) {
+ // init with values from the error handler
+ this.exchange = (ExtendedExchange) exchange;
+ this.callback = callback;
+ }
+
+ @Override
+ public String toString() {
+ return "SimpleTask";
+ }
+
+ /**
+ * Processing logic.
+ */
+ @Override
+ public void run() {
+ // can we still run
+ if (!isRunAllowed()) {
+ LOG.trace("Run not allowed, will reject executing exchange: {}", exchange);
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ callback.done(false);
+ return;
+ }
+
+ if (exchange.getException() != null) {
+ // previous processing cause an exception
+ handleException();
+ onExceptionOccurred();
+ prepareExchangeAfterFailure(exchange);
+
+ // we do not support redelivery so continue callback
+ reactiveExecutor.schedule(callback);
+ } else {
+ // Simple delivery
+ outputAsync.process(exchange, doneSync -> {
+ // only continue with callback if we are done
+ if (isDone(exchange)) {
+ reactiveExecutor.schedule(callback);
+ } else {
+ // error occurred so loop back around and call ourselves
+ reactiveExecutor.schedule(this);
+ }
+ });
+ }
+ }
+
+ protected boolean isRunAllowed() {
+ // if camel context is forcing a shutdown then do not allow running
+ boolean forceShutdown = shutdownStrategy.forceShutdown(RedeliveryErrorHandler.this);
+ if (forceShutdown) {
+ return false;
+ }
+
+ // we cannot run if we are stopping/stopped
+ return !isStoppingOrStopped();
+ }
+
+ protected void handleException() {
+ Exception e = exchange.getException();
+ // e is never null
+
+ Throwable previous = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+ if (previous != null && previous != e) {
+ // a 2nd exception was thrown while handling a previous exception
+ // so we need to add the previous as suppressed by the new exception
+ // see also FatalFallbackErrorHandler
+ Throwable[] suppressed = e.getSuppressed();
+ boolean found = false;
+ for (Throwable t : suppressed) {
+ if (t == previous) {
+ found = true;
+ }
+ }
+ if (!found) {
+ e.addSuppressed(previous);
+ }
+ }
+
+ // store the original caused exception in a property, so we can restore it later
+ exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+ }
+
+ /**
+ * Gives an optional configured OnExceptionOccurred processor a chance to process just after an exception
+ * was thrown while processing the Exchange. This allows to execute the processor at the same time the exception was thrown.
+ */
+ protected void onExceptionOccurred() {
+ if (onExceptionProcessor == null) {
+ return;
+ }
+
+ // run this synchronously as its just a Processor
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("OnExceptionOccurred processor {} is processing Exchange: {} due exception occurred", onExceptionProcessor, exchange);
+ }
+ onExceptionProcessor.process(exchange);
+ } catch (Throwable e) {
+ // we dont not want new exception to override existing, so log it as a WARN
+ LOG.warn("Error during processing OnExceptionOccurred. This exception is ignored.", e);
+ }
+ LOG.trace("OnExceptionOccurred processor done");
+ }
+
+ protected void prepareExchangeAfterFailure(final Exchange exchange) {
+ ExtendedExchange ee = (ExtendedExchange) exchange;
+
+ // we could not process the exchange so we let the failure processor handled it
+ ExchangeHelper.setFailureHandled(exchange);
+
+ // honor if already set a handling
+ boolean alreadySet = ee.getErrorHandlerHandled() != null;
+ if (alreadySet) {
+ boolean handled = ee.getErrorHandlerHandled() != null && ee.getErrorHandlerHandled();
+ LOG.trace("This exchange has already been marked for handling: {}", handled);
+ if (!handled) {
+ // exception not handled, put exception back in the exchange
+ exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
+ // and put failure endpoint back as well
+ exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
+ }
+ return;
+ }
+
+ // not handled by default
+ prepareExchangeAfterFailureNotHandled(exchange);
+ }
+
+ private void prepareExchangeAfterFailureNotHandled(Exchange exchange) {
+ ExtendedExchange ee = (ExtendedExchange) exchange;
+
+ LOG.trace("This exchange is not handled or continued so its marked as failed: {}", ee);
+ // exception not handled, put exception back in the exchange
+ ee.setErrorHandlerHandled(false);
+ ee.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
+ // and put failure endpoint back as well
+ ee.setProperty(Exchange.FAILURE_ENDPOINT, ee.getProperty(Exchange.TO_ENDPOINT));
+ // and store the route id so we know in which route we failed
+ UnitOfWork uow = ee.getUnitOfWork();
+ RouteContext rc = uow != null ? uow.getRouteContext() : null;
+ if (rc != null) {
+ ee.setProperty(Exchange.FAILURE_ROUTE_ID, rc.getRouteId());
+ }
+
+ // create log message
+ String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
+ msg = msg + ". Exhausted after delivery attempt: 1 caught: " + ee.getException();
+
+ // log that we failed delivery as we are exhausted
+ logFailedDelivery(exchange, msg, null);
+ }
+
+ private void logFailedDelivery(Exchange exchange, String message, Throwable e) {
+ if (logger == null) {
+ return;
+ }
+
+ if (e == null) {
+ e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
+ }
+
+ if (exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) {
+ String msg = "Rollback " + ExchangeHelper.logIds(exchange);
+ Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
+ if (cause != null) {
+ msg = msg + " due: " + cause.getMessage();
+ }
+
+ // should we include message history
+ if (redeliveryPolicy.isLogExhaustedMessageHistory()) {
+ // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
+ ExchangeFormatter formatter = customExchangeFormatter
+ ? exchangeFormatter : (redeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
+ String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, false);
+ if (routeStackTrace != null) {
+ msg = msg + "\n" + routeStackTrace;
+ }
+ }
+
+ if (logger.getLevel() == LoggingLevel.ERROR) {
+ // log intended rollback on maximum WARN level (not ERROR)
+ logger.log(msg, LoggingLevel.WARN);
+ } else {
+ // otherwise use the desired logging level
+ logger.log(msg);
+ }
+ } else {
+ String msg = message;
+ // should we include message history
+ if (redeliveryPolicy.isLogExhaustedMessageHistory()) {
+ // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
+ ExchangeFormatter formatter = customExchangeFormatter
+ ? exchangeFormatter : (redeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
+ String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, e != null && redeliveryPolicy.isLogStackTrace());
+ if (routeStackTrace != null) {
+ msg = msg + "\n" + routeStackTrace;
+ }
+ }
+
+ if (e != null && redeliveryPolicy.isLogStackTrace()) {
+ logger.log(msg, e);
+ } else {
+ logger.log(msg);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Task to perform calling the processor and handling redelivery if it fails (more advanced than ProcessTask)
+ */
+ protected class RedeliveryTask implements Runnable {
+ private Exchange original;
+ private ExtendedExchange exchange;
+ private AsyncCallback callback;
+ private int redeliveryCounter;
+ private long redeliveryDelay;
+ private Predicate retryWhilePredicate;
// default behavior which can be overloaded on a per exception basis
- RedeliveryPolicy currentRedeliveryPolicy;
- Processor failureProcessor;
- Processor onRedeliveryProcessor;
- Processor onExceptionProcessor;
- Predicate handledPredicate;
- Predicate continuedPredicate;
- boolean useOriginalInMessage;
- boolean useOriginalInBody;
-
- public RedeliveryState(Exchange exchange, AsyncCallback callback) {
+ private RedeliveryPolicy currentRedeliveryPolicy;
+ private Processor failureProcessor;
+ private Processor onRedeliveryProcessor;
+ private Processor onExceptionProcessor;
+ private Predicate handledPredicate;
+ private Predicate continuedPredicate;
+ private boolean useOriginalInMessage;
+ private boolean useOriginalInBody;
+
+ public RedeliveryTask(Exchange exchange, AsyncCallback callback) {
// init with values from the error handler
this.retryWhilePredicate = retryWhilePolicy;
this.currentRedeliveryPolicy = redeliveryPolicy;
@@ -377,11 +601,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
@Override
public String toString() {
- return "RedeliveryState";
+ return "RedeliveryTask";
}
/**
- * Redelivery logic.
+ * Processing and redelivery logic.
*/
@Override
public void run() {
@@ -1254,6 +1478,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
// reset flag when starting
preparingShutdown = false;
redeliverySleepCounter.set(0);
+
+ // calculate if we can use simple task or not
+ // if we need redelivery and other things then we cannot)
+ // however if we dont then its less memory overhead (and a bit less cpu) of using the simple task
+ simpleTask = deadLetter == null && !redeliveryEnabled && exceptionPolicies.isEmpty()
+ && onPrepareProcessor == null;
}
@Override
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerLogStackTraceTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerLogStackTraceTest.java
new file mode 100644
index 0000000..0f08c06
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerLogStackTraceTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class DefaultErrorHandlerLogStackTraceTest extends ContextTestSupport {
+
+ @Test
+ public void testLogStackTrace() throws Exception {
+ try {
+ template.sendBody("direct:start", "Hello World");
+ fail("Should fail");
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(defaultErrorHandler().logStackTrace(true).loggingLevel(LoggingLevel.WARN));
+
+ from("direct:start").log("Incoming ${body}").throwException(new IllegalArgumentException("Forced"));
+ }
+ };
+ }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java
index 4047885..2dc0769 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java
@@ -66,9 +66,6 @@ public class RollbackTest extends ContextTestSupport {
assertNotNull(out.getException());
assertIsInstanceOf(RollbackExchangeException.class, out.getException());
assertEquals("Should be marked as rollback", true, out.isRollbackOnly());
- // should not try to redeliver if exchange was marked as rollback only
- assertEquals(0, out.getIn().getHeader(Exchange.REDELIVERY_COUNTER));
- assertEquals(false, out.getIn().getHeader(Exchange.REDELIVERED));
}
@Override
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/DefaultErrorHandlerOnExceptionOccurredProcessorTest.java
similarity index 76%
copy from core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/onexception/DefaultErrorHandlerOnExceptionOccurredProcessorTest.java
index be9ac44..1a395fc 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/DefaultErrorHandlerOnExceptionOccurredProcessorTest.java
@@ -23,7 +23,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.JndiRegistry;
import org.junit.Test;
-public class OnExceptionOccurredProcessorTest extends ContextTestSupport {
+public class DefaultErrorHandlerOnExceptionOccurredProcessorTest extends ContextTestSupport {
@Override
protected JndiRegistry createRegistry() throws Exception {
@@ -34,15 +34,16 @@ public class OnExceptionOccurredProcessorTest extends ContextTestSupport {
@Test
public void testOnExceptionOccurred() throws Exception {
- getMockEndpoint("mock:dead").expectedMessageCount(1);
-
- template.sendBody("direct:start", "Hello World");
-
- assertMockEndpointsSatisfied();
+ try {
+ template.sendBody("direct:start", "Hello World");
+ fail("Should throw exception");
+ } catch (Exception e) {
+ // expected
+ }
MyProcessor myProcessor = context.getRegistry().lookupByNameAndType("myProcessor", MyProcessor.class);
- // 1 = first time + 3 redelivery attempts
- assertEquals(1 + 3, myProcessor.getInvoked());
+ // 1 = first time + 0 redelivery attempts
+ assertEquals(1, myProcessor.getInvoked());
}
@Override
@@ -52,9 +53,11 @@ public class OnExceptionOccurredProcessorTest extends ContextTestSupport {
public void configure() throws Exception {
MyProcessor myProcessor = context.getRegistry().lookupByNameAndType("myProcessor", MyProcessor.class);
- errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(3).redeliveryDelay(0).onExceptionOccurred(myProcessor));
+ errorHandler(defaultErrorHandler().onExceptionOccurred(myProcessor));
+
+ from("direct:start").to("log:a").to("direct:foo").to("log:b");
- from("direct:start").throwException(new IllegalArgumentException("Forced"));
+ from("direct:foo").throwException(new IllegalArgumentException("Forced"));
}
};
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java
index be9ac44..618de36 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionOccurredProcessorTest.java
@@ -54,7 +54,9 @@ public class OnExceptionOccurredProcessorTest extends ContextTestSupport {
errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(3).redeliveryDelay(0).onExceptionOccurred(myProcessor));
- from("direct:start").throwException(new IllegalArgumentException("Forced"));
+ from("direct:start").to("log:a").to("direct:foo").to("log:b");
+
+ from("direct:foo").throwException(new IllegalArgumentException("Forced"));
}
};
}