You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/01 15:32:08 UTC

[camel] 01/02: CAMEL-16272: camel-core - Optimize redelivery error handler to reduce object allocations

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 05301445f1d372e143eb1091ea75beda3f1e371b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 1 15:32:23 2021 +0100

    CAMEL-16272: camel-core - Optimize redelivery error handler to reduce object allocations
---
 .../errorhandler/RedeliveryErrorHandler.java       | 61 ++++++++++++----------
 .../camel/component/file/FilePollEnrichTest.java   | 10 +---
 2 files changed, 34 insertions(+), 37 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index d557bb7..bc52b06 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -342,40 +342,25 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
         return null;
     }
 
-    private final class SimpleDoneTask implements AsyncCallback {
-
-        private final SimpleTask task;
-
-        private SimpleDoneTask(SimpleTask task) {
-            this.task = task;
-        }
-
-        @Override
-        public void done(boolean doneSync) {
-            // only continue with callback if we are done
-            if (isDone(task.exchange)) {
-                reactiveExecutor.schedule(task.callback);
-            } else {
-                // error occurred so loop back around and call ourselves
-                reactiveExecutor.schedule(task);
-            }
-        }
-    }
-
     /**
      * Simple task to perform calling the processor with no redelivery support
      */
-    protected class SimpleTask implements Runnable {
+    protected class SimpleTask implements Runnable, AsyncCallback {
         private final ExtendedExchange exchange;
         private final AsyncCallback callback;
-        private final SimpleDoneTask doneTask = new SimpleDoneTask(this);
+        private boolean first = true;
 
-        public SimpleTask(Exchange exchange, AsyncCallback callback) {
+        SimpleTask(Exchange exchange, AsyncCallback callback) {
             this.exchange = (ExtendedExchange) exchange;
             this.callback = callback;
         }
 
         @Override
+        public void done(boolean doneSync) {
+            reactiveExecutor.schedule(this);
+        }
+
+        @Override
         public String toString() {
             return "SimpleTask";
         }
@@ -402,18 +387,39 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 callback.done(false);
                 return;
             }
+            if (exchange.isInterrupted()) {
+                // mark the exchange to stop continue routing when interrupted
+                // as we do not want to continue routing (for example a task has been cancelled)
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId());
+                }
+                exchange.setRouteStop(true);
+                // we should not continue routing so call callback
+                callback.done(false);
+                return;
+            }
 
-            if (exchange.getException() != null) {
+            // only new failure if the exchange has exception
+            // and it has not been handled by the failure processor before
+            // or not exhausted
+            boolean failure = exchange.getException() != null
+                    && !ExchangeHelper.isFailureHandled(exchange)
+                    && !exchange.isRedeliveryExhausted();
+
+            if (failure) {
                 // previous processing cause an exception
                 handleException();
                 onExceptionOccurred();
                 prepareExchangeAfterFailure(exchange);
-
                 // we do not support redelivery so continue callback
                 reactiveExecutor.schedule(callback);
+            } else if (first) {
+                // first time call the target processor
+                first = false;
+                outputAsync.process(exchange, this);
             } else {
-                // Simple delivery
-                outputAsync.process(exchange, doneTask);
+                // we are done so continue callback
+                reactiveExecutor.schedule(callback);
             }
         }
 
@@ -798,7 +804,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 // and it has not been handled by the error processor
                 if (isDone(exchange)) {
                     reactiveExecutor.schedule(callback);
-                    return;
                 } else {
                     // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
                     reactiveExecutor.schedule(this);
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
index fda93b5..5718743 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
@@ -17,11 +17,9 @@
 package org.apache.camel.component.file;
 
 import java.io.File;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.jupiter.api.BeforeEach;
@@ -61,13 +59,7 @@ public class FilePollEnrichTest extends ContextTestSupport {
             public void configure() throws Exception {
                 from("timer:foo?period=1000").routeId("foo").log("Trigger timer foo")
                         .pollEnrich("file:target/data/pollenrich?move=done", 5000).convertBodyTo(String.class)
-                        .log("Polled filed ${file:name}").to("mock:result").process(new Processor() {
-                            public void process(Exchange exchange) throws Exception {
-                                // force stop route after use to prevent firing
-                                // timer again
-                                exchange.getContext().getRouteController().stopRoute("foo", 100, TimeUnit.MILLISECONDS);
-                            }
-                        });
+                        .log("Polled filed ${file:name}").to("mock:result");
             }
         };
     }