You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/28 09:04:01 UTC

[camel] branch master updated: CAMEL-16271: camel-core - ReactiveExecutor add option to turn on|off statistics

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new c266ad3  CAMEL-16271: camel-core - ReactiveExecutor add option to turn on|off statistics
c266ad3 is described below

commit c266ad314b81328784421858ab576c5af1c57381
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 28 10:03:06 2021 +0100

    CAMEL-16271: camel-core - ReactiveExecutor add option to turn on|off statistics
---
 .../reactive/vertx/VertXReactiveExecutor.java      | 11 +++++
 .../org/apache/camel/spi/ReactiveExecutor.java     | 10 +++++
 .../camel/impl/engine/DefaultReactiveExecutor.java | 49 +++++++++++++++++-----
 3 files changed, 59 insertions(+), 11 deletions(-)

diff --git a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index 46c8c96..51c1c87 100644
--- a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
+++ b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -113,6 +113,17 @@ public class VertXReactiveExecutor extends ServiceSupport implements CamelContex
     }
 
     @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        // not in use
+    }
+
+    @Override
+    public boolean isStatisticsEnabled() {
+        // not in use
+        return false;
+    }
+
+    @Override
     public String toString() {
         return "camel-reactive-executor-vertx";
     }
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index ab88038..e78fb3d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -54,4 +54,14 @@ public interface ReactiveExecutor {
      */
     boolean executeFromQueue();
 
+    /**
+     * To enable statistics
+     */
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
+    /**
+     * Whether statistics is enabled
+     */
+    boolean isStatisticsEnabled();
+
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index cfd4baf..a2cb45b 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -19,6 +19,7 @@ package org.apache.camel.impl.engine;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Supplier;
 
 import org.apache.camel.StaticService;
@@ -46,9 +47,10 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
     });
 
     // use for statistics so we have insights at runtime
+    private boolean statisticsEnabled;
     private final AtomicInteger createdWorkers = new AtomicInteger();
-    private final AtomicInteger runningWorkers = new AtomicInteger();
-    private final AtomicInteger pendingTasks = new AtomicInteger();
+    private final LongAdder runningWorkers = new LongAdder();
+    private final LongAdder pendingTasks = new LongAdder();
 
     @Override
     public void schedule(Runnable runnable) {
@@ -70,6 +72,17 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
         return workers.get().executeFromQueue();
     }
 
+    @Override
+    @ManagedAttribute(description = "Whether statistics is enabled")
+    public boolean isStatisticsEnabled() {
+        return statisticsEnabled;
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        this.statisticsEnabled = statisticsEnabled;
+    }
+
     @ManagedAttribute(description = "Number of created workers")
     public int getCreatedWorkers() {
         return createdWorkers.get();
@@ -77,17 +90,17 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
 
     @ManagedAttribute(description = "Number of running workers")
     public int getRunningWorkers() {
-        return runningWorkers.get();
+        return runningWorkers.intValue();
     }
 
     @ManagedAttribute(description = "Number of pending tasks")
     public int getPendingTasks() {
-        return pendingTasks.get();
+        return pendingTasks.intValue();
     }
 
     @Override
     protected void doStop() throws Exception {
-        if (LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled() && statisticsEnabled) {
             LOG.debug("Stopping DefaultReactiveExecutor [createdWorkers: {}, runningWorkers: {}, pendingTasks: {}]",
                     getCreatedWorkers(), getRunningWorkers(), getPendingTasks());
         }
@@ -97,6 +110,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
 
         private final int number;
         private final DefaultReactiveExecutor executor;
+        private final boolean stats;
         private volatile Deque<Runnable> queue = new ArrayDeque<>();
         private volatile Deque<Deque<Runnable>> back;
         private volatile boolean running;
@@ -104,6 +118,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
         public Worker(int number, DefaultReactiveExecutor executor) {
             this.number = number;
             this.executor = executor;
+            this.stats = executor.isStatisticsEnabled();
         }
 
         void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
@@ -121,14 +136,20 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
             }
             if (first) {
                 queue.addFirst(runnable);
-                executor.pendingTasks.incrementAndGet();
+                if (stats) {
+                    executor.pendingTasks.increment();
+                }
             } else {
                 queue.addLast(runnable);
-                executor.pendingTasks.incrementAndGet();
+                if (stats) {
+                    executor.pendingTasks.increment();
+                }
             }
             if (!running || sync) {
                 running = true;
-                executor.runningWorkers.incrementAndGet();
+                if (stats) {
+                    executor.runningWorkers.increment();
+                }
                 try {
                     for (;;) {
                         final Runnable polled = queue.pollFirst();
@@ -141,7 +162,9 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
                             }
                         }
                         try {
-                            executor.pendingTasks.decrementAndGet();
+                            if (stats) {
+                                executor.pendingTasks.decrement();
+                            }
                             if (LOG.isTraceEnabled()) {
                                 LOG.trace("Worker #{} running: {}", number, runnable);
                             }
@@ -153,7 +176,9 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
                     }
                 } finally {
                     running = false;
-                    executor.runningWorkers.decrementAndGet();
+                    if (stats) {
+                        executor.runningWorkers.decrement();
+                    }
                 }
             } else {
                 if (LOG.isTraceEnabled()) {
@@ -168,7 +193,9 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
                 return false;
             }
             try {
-                executor.pendingTasks.decrementAndGet();
+                if (stats) {
+                    executor.pendingTasks.decrement();
+                }
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Running: {}", polled);
                 }