You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/01 15:32:08 UTC
[camel] 01/02: CAMEL-16272: camel-core - Optimize redelivery error
handler to reduce object allocations
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 05301445f1d372e143eb1091ea75beda3f1e371b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 1 15:32:23 2021 +0100
CAMEL-16272: camel-core - Optimize redelivery error handler to reduce object allocations
---
.../errorhandler/RedeliveryErrorHandler.java | 61 ++++++++++++----------
.../camel/component/file/FilePollEnrichTest.java | 10 +---
2 files changed, 34 insertions(+), 37 deletions(-)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index d557bb7..bc52b06 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -342,40 +342,25 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
return null;
}
- private final class SimpleDoneTask implements AsyncCallback {
-
- private final SimpleTask task;
-
- private SimpleDoneTask(SimpleTask task) {
- this.task = task;
- }
-
- @Override
- public void done(boolean doneSync) {
- // only continue with callback if we are done
- if (isDone(task.exchange)) {
- reactiveExecutor.schedule(task.callback);
- } else {
- // error occurred so loop back around and call ourselves
- reactiveExecutor.schedule(task);
- }
- }
- }
-
/**
* Simple task to perform calling the processor with no redelivery support
*/
- protected class SimpleTask implements Runnable {
+ protected class SimpleTask implements Runnable, AsyncCallback {
private final ExtendedExchange exchange;
private final AsyncCallback callback;
- private final SimpleDoneTask doneTask = new SimpleDoneTask(this);
+ private boolean first = true;
- public SimpleTask(Exchange exchange, AsyncCallback callback) {
+ SimpleTask(Exchange exchange, AsyncCallback callback) {
this.exchange = (ExtendedExchange) exchange;
this.callback = callback;
}
@Override
+ public void done(boolean doneSync) {
+ reactiveExecutor.schedule(this);
+ }
+
+ @Override
public String toString() {
return "SimpleTask";
}
@@ -402,18 +387,39 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
callback.done(false);
return;
}
+ if (exchange.isInterrupted()) {
+ // mark the exchange to stop continue routing when interrupted
+ // as we do not want to continue routing (for example a task has been cancelled)
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId());
+ }
+ exchange.setRouteStop(true);
+ // we should not continue routing so call callback
+ callback.done(false);
+ return;
+ }
- if (exchange.getException() != null) {
+ // only new failure if the exchange has exception
+ // and it has not been handled by the failure processor before
+ // or not exhausted
+ boolean failure = exchange.getException() != null
+ && !ExchangeHelper.isFailureHandled(exchange)
+ && !exchange.isRedeliveryExhausted();
+
+ if (failure) {
// previous processing cause an exception
handleException();
onExceptionOccurred();
prepareExchangeAfterFailure(exchange);
-
// we do not support redelivery so continue callback
reactiveExecutor.schedule(callback);
+ } else if (first) {
+ // first time call the target processor
+ first = false;
+ outputAsync.process(exchange, this);
} else {
- // Simple delivery
- outputAsync.process(exchange, doneTask);
+ // we are done so continue callback
+ reactiveExecutor.schedule(callback);
}
}
@@ -798,7 +804,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
// and it has not been handled by the error processor
if (isDone(exchange)) {
reactiveExecutor.schedule(callback);
- return;
} else {
// error occurred so loop back around which we do by invoking the processAsyncErrorHandler
reactiveExecutor.schedule(this);
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
index fda93b5..5718743 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
@@ -17,11 +17,9 @@
package org.apache.camel.component.file;
import java.io.File;
-import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.jupiter.api.BeforeEach;
@@ -61,13 +59,7 @@ public class FilePollEnrichTest extends ContextTestSupport {
public void configure() throws Exception {
from("timer:foo?period=1000").routeId("foo").log("Trigger timer foo")
.pollEnrich("file:target/data/pollenrich?move=done", 5000).convertBodyTo(String.class)
- .log("Polled filed ${file:name}").to("mock:result").process(new Processor() {
- public void process(Exchange exchange) throws Exception {
- // force stop route after use to prevent firing
- // timer again
- exchange.getContext().getRouteController().stopRoute("foo", 100, TimeUnit.MILLISECONDS);
- }
- });
+ .log("Polled filed ${file:name}").to("mock:result");
}
};
}