You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/03 16:00:25 UTC

[camel] branch master updated: CAMEL-14354: ReactiveExecutor should avoid unnessasary object allocations for human descriptions for processors that creates many objects for toString representation of their processor chain.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 70ec443  CAMEL-14354: ReactiveExecutor should avoid unnessasary object allocations for human descriptions for processors that creates many objects for toString representation of their processor chain.
70ec443 is described below

commit 70ec443c612b04fa67dd66dfce2770ebe5da8dd8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 3 16:10:48 2020 +0100

    CAMEL-14354: ReactiveExecutor should avoid unnessasary object allocations for human descriptions for processors that creates many objects for toString representation of their processor chain.
---
 .../reactive/vertx/VertXReactiveExecutor.java      | 29 ++++++++++++++++------
 .../org/apache/camel/spi/ReactiveExecutor.java     | 12 +++------
 .../camel/impl/engine/DefaultReactiveExecutor.java | 21 +++++++++++++---
 .../camel/processor/CamelInternalProcessor.java    |  2 +-
 .../apache/camel/processor/MulticastProcessor.java |  2 +-
 .../java/org/apache/camel/processor/Pipeline.java  |  9 +++----
 .../processor/SharedCamelInternalProcessor.java    |  2 +-
 7 files changed, 49 insertions(+), 28 deletions(-)

diff --git a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index 64f78d2..9465fea 100644
--- a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
+++ b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -49,29 +49,36 @@ public class VertXReactiveExecutor extends ServiceSupport implements ReactiveExe
     }
 
     @Override
-    public void schedule(Runnable runnable, String description) {
+    public void schedule(Runnable runnable) {
         LOG.trace("schedule: {}", runnable);
+        vertx.nettyEventLoopGroup().execute(runnable);
+    }
+
+    @Override
+    public void schedule(Runnable runnable, String description) {
         if (description != null) {
             runnable = describe(runnable, description);
         }
+        schedule(runnable);
+    }
+
+    @Override
+    public void scheduleMain(Runnable runnable) {
+        LOG.trace("scheduleMain: {}", runnable);
         vertx.nettyEventLoopGroup().execute(runnable);
     }
 
     @Override
     public void scheduleMain(Runnable runnable, String description) {
-        LOG.trace("scheduleMain: {}", runnable);
         if (description != null) {
             runnable = describe(runnable, description);
         }
-        vertx.nettyEventLoopGroup().execute(runnable);
+        scheduleMain(runnable);
     }
 
     @Override
-    public void scheduleSync(Runnable runnable, String description) {
+    public void scheduleSync(Runnable runnable) {
         LOG.trace("scheduleSync: {}", runnable);
-        if (description != null) {
-            runnable = describe(runnable, description);
-        }
         final Runnable task = runnable;
         vertx.executeBlocking(future -> {
             task.run();
@@ -80,6 +87,14 @@ public class VertXReactiveExecutor extends ServiceSupport implements ReactiveExe
     }
 
     @Override
+    public void scheduleSync(Runnable runnable, String description) {
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
+        scheduleSync(runnable);
+    }
+
+    @Override
     public boolean executeFromQueue() {
         // not supported so return false
         return false;
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index 2a4eb9f..fc3c4de 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -28,9 +28,7 @@ public interface ReactiveExecutor {
      *
      * @param runnable    the task
      */
-    default void schedule(Runnable runnable) {
-        schedule(runnable, null);
-    }
+    void schedule(Runnable runnable);
 
     /**
      * Schedules the task to be run
@@ -45,9 +43,7 @@ public interface ReactiveExecutor {
      *
      * @param runnable    the task
      */
-    default void scheduleMain(Runnable runnable) {
-        scheduleMain(runnable, null);
-    }
+    void scheduleMain(Runnable runnable);
 
     /**
      * Schedules the task to be prioritized and run asap
@@ -62,9 +58,7 @@ public interface ReactiveExecutor {
      *
      * @param runnable    the task
      */
-    default void scheduleSync(Runnable runnable) {
-        scheduleSync(runnable, null);
-    }
+    void scheduleSync(Runnable runnable);
 
     /**
      * Schedules the task to run synchronously
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index c441e34..7be0a46 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -52,11 +52,26 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
     private final AtomicLong pendingTasks = new AtomicLong();
 
     @Override
+    public void schedule(Runnable runnable) {
+        workers.get().schedule(runnable, true, false, false);
+    }
+
+    @Override
+    public void scheduleMain(Runnable runnable) {
+        workers.get().schedule(runnable, true, true, false);
+    }
+
+    @Override
+    public void scheduleSync(Runnable runnable) {
+        workers.get().schedule(runnable, false, true, true);
+    }
+
+    @Override
     public void scheduleMain(Runnable runnable, String description) {
         if (description != null) {
             runnable = describe(runnable, description);
         }
-        workers.get().schedule(runnable, true, true, false);
+        scheduleMain(runnable);
     }
 
     @Override
@@ -64,7 +79,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
         if (description != null) {
             runnable = describe(runnable, description);
         }
-        workers.get().schedule(runnable, true, false, false);
+        schedule(runnable);
     }
 
     @Override
@@ -72,7 +87,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
         if (description != null) {
             runnable = describe(runnable, description);
         }
-        workers.get().schedule(runnable, false, true, true);
+        scheduleSync(runnable);
     }
 
     @Override
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 5d5af65..7f273cc 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -240,7 +240,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                     log.trace("Exchange processed and is continued routed asynchronously for exchangeId: {} -> {}",
                              exchange.getExchangeId(), exchange);
                 }
-            }, "CamelInternalProcessor - UnitOfWork - afterProcess - " + processor + " - " + exchange.getExchangeId());
+            });
             return false;
         }
     }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index b0d49f0..7546dbc 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -260,7 +260,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
         if (isParallelProcessing()) {
             executorService.submit(() -> camelContext.getReactiveExecutor().schedule(runnable));
         } else {
-            camelContext.getReactiveExecutor().schedule(runnable, "Multicast next step");
+            camelContext.getReactiveExecutor().schedule(runnable);
         }
     }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index fcc9baa..6ad6f8f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -83,11 +83,9 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         if (exchange.isTransacted()) {
-            camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
-                    "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
+            camelContext.getReactiveExecutor().scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true));
         } else {
-            camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
-                    "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
+            camelContext.getReactiveExecutor().scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true));
         }
         return false;
     }
@@ -103,8 +101,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
             AsyncProcessor processor = processors.next();
 
             processor.process(exchange, doneSync ->
-                    camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, false),
-                            "Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]"));
+                    camelContext.getReactiveExecutor().schedule(() -> doProcess(exchange, callback, processors, false)));
         } else {
             ExchangeHelper.copyResults(exchange, exchange);
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index cd4405b..4eaa6a7 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -201,7 +201,7 @@ public class SharedCamelInternalProcessor {
                     LOG.trace("Exchange processed and is continued routed asynchronously for exchangeId: {} -> {}",
                             exchange.getExchangeId(), exchange);
                 }
-            }, "SharedCamelInternalProcessor - UnitOfWork - afterProcess - " + processor + " - " + exchange.getExchangeId());
+            });
             return sync;
         }
     }