You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/28 09:04:01 UTC
[camel] branch master updated: CAMEL-16271: camel-core -
ReactiveExecutor add option to turn on|off statistics
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new c266ad3 CAMEL-16271: camel-core - ReactiveExecutor add option to turn on|off statistics
c266ad3 is described below
commit c266ad314b81328784421858ab576c5af1c57381
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Feb 28 10:03:06 2021 +0100
CAMEL-16271: camel-core - ReactiveExecutor add option to turn on|off statistics
---
.../reactive/vertx/VertXReactiveExecutor.java | 11 +++++
.../org/apache/camel/spi/ReactiveExecutor.java | 10 +++++
.../camel/impl/engine/DefaultReactiveExecutor.java | 49 +++++++++++++++++-----
3 files changed, 59 insertions(+), 11 deletions(-)
diff --git a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index 46c8c96..51c1c87 100644
--- a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
+++ b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -113,6 +113,17 @@ public class VertXReactiveExecutor extends ServiceSupport implements CamelContex
}
@Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ // not in use
+ }
+
+ @Override
+ public boolean isStatisticsEnabled() {
+ // not in use
+ return false;
+ }
+
+ @Override
public String toString() {
return "camel-reactive-executor-vertx";
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index ab88038..e78fb3d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -54,4 +54,14 @@ public interface ReactiveExecutor {
*/
boolean executeFromQueue();
+ /**
+ * To enable statistics
+ */
+ void setStatisticsEnabled(boolean statisticsEnabled);
+
+ /**
+ * Whether statistics is enabled
+ */
+ boolean isStatisticsEnabled();
+
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index cfd4baf..a2cb45b 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -19,6 +19,7 @@ package org.apache.camel.impl.engine;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;
import org.apache.camel.StaticService;
@@ -46,9 +47,10 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
});
// use for statistics so we have insights at runtime
+ private boolean statisticsEnabled;
private final AtomicInteger createdWorkers = new AtomicInteger();
- private final AtomicInteger runningWorkers = new AtomicInteger();
- private final AtomicInteger pendingTasks = new AtomicInteger();
+ private final LongAdder runningWorkers = new LongAdder();
+ private final LongAdder pendingTasks = new LongAdder();
@Override
public void schedule(Runnable runnable) {
@@ -70,6 +72,17 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
return workers.get().executeFromQueue();
}
+ @Override
+ @ManagedAttribute(description = "Whether statistics is enabled")
+ public boolean isStatisticsEnabled() {
+ return statisticsEnabled;
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ this.statisticsEnabled = statisticsEnabled;
+ }
+
@ManagedAttribute(description = "Number of created workers")
public int getCreatedWorkers() {
return createdWorkers.get();
@@ -77,17 +90,17 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
@ManagedAttribute(description = "Number of running workers")
public int getRunningWorkers() {
- return runningWorkers.get();
+ return runningWorkers.intValue();
}
@ManagedAttribute(description = "Number of pending tasks")
public int getPendingTasks() {
- return pendingTasks.get();
+ return pendingTasks.intValue();
}
@Override
protected void doStop() throws Exception {
- if (LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled() && statisticsEnabled) {
LOG.debug("Stopping DefaultReactiveExecutor [createdWorkers: {}, runningWorkers: {}, pendingTasks: {}]",
getCreatedWorkers(), getRunningWorkers(), getPendingTasks());
}
@@ -97,6 +110,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
private final int number;
private final DefaultReactiveExecutor executor;
+ private final boolean stats;
private volatile Deque<Runnable> queue = new ArrayDeque<>();
private volatile Deque<Deque<Runnable>> back;
private volatile boolean running;
@@ -104,6 +118,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
public Worker(int number, DefaultReactiveExecutor executor) {
this.number = number;
this.executor = executor;
+ this.stats = executor.isStatisticsEnabled();
}
void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {
@@ -121,14 +136,20 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
}
if (first) {
queue.addFirst(runnable);
- executor.pendingTasks.incrementAndGet();
+ if (stats) {
+ executor.pendingTasks.increment();
+ }
} else {
queue.addLast(runnable);
- executor.pendingTasks.incrementAndGet();
+ if (stats) {
+ executor.pendingTasks.increment();
+ }
}
if (!running || sync) {
running = true;
- executor.runningWorkers.incrementAndGet();
+ if (stats) {
+ executor.runningWorkers.increment();
+ }
try {
for (;;) {
final Runnable polled = queue.pollFirst();
@@ -141,7 +162,9 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
}
}
try {
- executor.pendingTasks.decrementAndGet();
+ if (stats) {
+ executor.pendingTasks.decrement();
+ }
if (LOG.isTraceEnabled()) {
LOG.trace("Worker #{} running: {}", number, runnable);
}
@@ -153,7 +176,9 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
}
} finally {
running = false;
- executor.runningWorkers.decrementAndGet();
+ if (stats) {
+ executor.runningWorkers.decrement();
+ }
}
} else {
if (LOG.isTraceEnabled()) {
@@ -168,7 +193,9 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
return false;
}
try {
- executor.pendingTasks.decrementAndGet();
+ if (stats) {
+ executor.pendingTasks.decrement();
+ }
if (LOG.isTraceEnabled()) {
LOG.trace("Running: {}", polled);
}