You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/01 15:32:07 UTC

[camel] branch master updated (04d6af4 -> a62ac66)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 04d6af4  Regen for commit 8a95f94516e5e260fffb3a2d881f97c0d487a050
     new 0530144  CAMEL-16272: camel-core - Optimize redelivery error handler to reduce object allocations
     new a62ac66  CAMEL-16272: camel-core - Optimize redelivery error handler to reduce object allocations

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../errorhandler/RedeliveryErrorHandler.java       | 62 ++++++++++++----------
 .../camel/component/file/FilePollEnrichTest.java   | 10 +---
 2 files changed, 35 insertions(+), 37 deletions(-)


[camel] 02/02: CAMEL-16272: camel-core - Optimize redelivery error handler to reduce object allocations

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a62ac666223777ce6d15269b3a0b6a04b433dbaa
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 1 16:18:47 2021 +0100

    CAMEL-16272: camel-core - Optimize redelivery error handler to reduce object allocations
---
 .../camel/processor/errorhandler/RedeliveryErrorHandler.java     | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index bc52b06..3c0eaf3 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -356,13 +356,14 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
         }
 
         @Override
-        public void done(boolean doneSync) {
-            reactiveExecutor.schedule(this);
+        public String toString() {
+            return "SimpleTask";
         }
 
         @Override
-        public String toString() {
-            return "SimpleTask";
+        public void done(boolean doneSync) {
+            // the run method decides what to do when we are done
+            run();
         }
 
         /**


[camel] 01/02: CAMEL-16272: camel-core - Optimize redelivery error handler to reduce object allocations

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 05301445f1d372e143eb1091ea75beda3f1e371b
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Mar 1 15:32:23 2021 +0100

    CAMEL-16272: camel-core - Optimize redelivery error handler to reduce object allocations
---
 .../errorhandler/RedeliveryErrorHandler.java       | 61 ++++++++++++----------
 .../camel/component/file/FilePollEnrichTest.java   | 10 +---
 2 files changed, 34 insertions(+), 37 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index d557bb7..bc52b06 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -342,40 +342,25 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
         return null;
     }
 
-    private final class SimpleDoneTask implements AsyncCallback {
-
-        private final SimpleTask task;
-
-        private SimpleDoneTask(SimpleTask task) {
-            this.task = task;
-        }
-
-        @Override
-        public void done(boolean doneSync) {
-            // only continue with callback if we are done
-            if (isDone(task.exchange)) {
-                reactiveExecutor.schedule(task.callback);
-            } else {
-                // error occurred so loop back around and call ourselves
-                reactiveExecutor.schedule(task);
-            }
-        }
-    }
-
     /**
      * Simple task to perform calling the processor with no redelivery support
      */
-    protected class SimpleTask implements Runnable {
+    protected class SimpleTask implements Runnable, AsyncCallback {
         private final ExtendedExchange exchange;
         private final AsyncCallback callback;
-        private final SimpleDoneTask doneTask = new SimpleDoneTask(this);
+        private boolean first = true;
 
-        public SimpleTask(Exchange exchange, AsyncCallback callback) {
+        SimpleTask(Exchange exchange, AsyncCallback callback) {
             this.exchange = (ExtendedExchange) exchange;
             this.callback = callback;
         }
 
         @Override
+        public void done(boolean doneSync) {
+            reactiveExecutor.schedule(this);
+        }
+
+        @Override
         public String toString() {
             return "SimpleTask";
         }
@@ -402,18 +387,39 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 callback.done(false);
                 return;
             }
+            if (exchange.isInterrupted()) {
+                // mark the exchange to stop continue routing when interrupted
+                // as we do not want to continue routing (for example a task has been cancelled)
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId());
+                }
+                exchange.setRouteStop(true);
+                // we should not continue routing so call callback
+                callback.done(false);
+                return;
+            }
 
-            if (exchange.getException() != null) {
+            // only new failure if the exchange has exception
+            // and it has not been handled by the failure processor before
+            // or not exhausted
+            boolean failure = exchange.getException() != null
+                    && !ExchangeHelper.isFailureHandled(exchange)
+                    && !exchange.isRedeliveryExhausted();
+
+            if (failure) {
                 // previous processing cause an exception
                 handleException();
                 onExceptionOccurred();
                 prepareExchangeAfterFailure(exchange);
-
                 // we do not support redelivery so continue callback
                 reactiveExecutor.schedule(callback);
+            } else if (first) {
+                // first time call the target processor
+                first = false;
+                outputAsync.process(exchange, this);
             } else {
-                // Simple delivery
-                outputAsync.process(exchange, doneTask);
+                // we are done so continue callback
+                reactiveExecutor.schedule(callback);
             }
         }
 
@@ -798,7 +804,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                 // and it has not been handled by the error processor
                 if (isDone(exchange)) {
                     reactiveExecutor.schedule(callback);
-                    return;
                 } else {
                     // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
                     reactiveExecutor.schedule(this);
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
index fda93b5..5718743 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FilePollEnrichTest.java
@@ -17,11 +17,9 @@
 package org.apache.camel.component.file;
 
 import java.io.File;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.jupiter.api.BeforeEach;
@@ -61,13 +59,7 @@ public class FilePollEnrichTest extends ContextTestSupport {
             public void configure() throws Exception {
                 from("timer:foo?period=1000").routeId("foo").log("Trigger timer foo")
                         .pollEnrich("file:target/data/pollenrich?move=done", 5000).convertBodyTo(String.class)
-                        .log("Polled filed ${file:name}").to("mock:result").process(new Processor() {
-                            public void process(Exchange exchange) throws Exception {
-                                // force stop route after use to prevent firing
-                                // timer again
-                                exchange.getContext().getRouteController().stopRoute("foo", 100, TimeUnit.MILLISECONDS);
-                            }
-                        });
+                        .log("Polled filed ${file:name}").to("mock:result");
             }
         };
     }