You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2022/03/30 12:54:39 UTC
[ignite] branch ignite-2.13 updated: IGNITE-16648 Added task execution time metric to thread pools (#9892)
This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch ignite-2.13
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.13 by this push:
new 816104a IGNITE-16648 Added task execution time metric to thread pools (#9892)
816104a is described below
commit 816104afff67666472618d48d69e253715937bdf
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Tue Mar 29 12:45:10 2022 +0300
IGNITE-16648 Added task execution time metric to thread pools (#9892)
(cherry picked from commit 8d710aa53a2112ce1b8149043a1e3aa5d860112e)
---
docs/_docs/monitoring-metrics/new-metrics.adoc | 1 +
.../metric/impl/HistogramMetricImpl.java | 10 +++
.../internal/processors/pool/PoolProcessor.java | 9 +++
.../ignite/internal/util/GridMutableLong.java | 9 +++
.../ignite/internal/util/StripedExecutor.java | 49 +++++++++++--
.../thread/IgniteStripedThreadPoolExecutor.java | 22 +++++-
.../ignite/thread/IgniteThreadPoolExecutor.java | 85 +++++++++++++++++++++-
.../ignite/thread/ThreadPoolMetricsTest.java | 9 ++-
8 files changed, 182 insertions(+), 12 deletions(-)
diff --git a/docs/_docs/monitoring-metrics/new-metrics.adoc b/docs/_docs/monitoring-metrics/new-metrics.adoc
index 44a68fc..2442bbe 100644
--- a/docs/_docs/monitoring-metrics/new-metrics.adoc
+++ b/docs/_docs/monitoring-metrics/new-metrics.adoc
@@ -218,6 +218,7 @@ Register name: `threadPools.{thread_pool_name}`
|RejectedExecutionHandlerClass| string | Class name of current rejection handler.
|Shutdown | boolean| True if this executor has been shut down.
|TaskCount | long | Approximate total number of tasks that have been scheduled for execution.
+|TaskExecutionTime | histogram | Task execution time, in milliseconds.
|Terminated |boolean| True if all tasks have completed following shut down.
|Terminating |long| True if terminating but not yet terminated.
|ThreadFactoryClass| string| Class name of thread factory used to create new threads.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/HistogramMetricImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/HistogramMetricImpl.java
index 88267be..b174996 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/HistogramMetricImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metric/impl/HistogramMetricImpl.java
@@ -43,6 +43,16 @@ public class HistogramMetricImpl extends AbstractMetric implements HistogramMetr
}
/**
+ * @param name Name.
+ * @param toCopy Histogram metric to copy.
+ */
+ public HistogramMetricImpl(String name, HistogramMetricImpl toCopy) {
+ super(name, toCopy.description());
+
+ holder = toCopy.holder;
+ }
+
+ /**
* Sets value.
*
* @param x Value.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 5d2f6a2..e47dfb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -120,6 +120,12 @@ public class PoolProcessor extends GridProcessorAdapter {
/** */
public static final String THRD_FACTORY_DESC = "Class name of thread factory used to create new threads.";
+ /** Task execution time metric name. */
+ public static final String TASK_EXEC_TIME = "TaskExecutionTime";
+
+ /** Task execution time metric description. */
+ public static final String TASK_EXEC_TIME_DESC = "Tasks execution times as histogram (milliseconds).";
+
/** Name of the system view for a data streamer {@link StripedExecutor} queue view. */
public static final String STREAM_POOL_QUEUE_VIEW = metricName("datastream", "threadpool", "queue");
@@ -135,6 +141,9 @@ public class PoolProcessor extends GridProcessorAdapter {
/** Group for a thread pools. */
public static final String THREAD_POOLS = "threadPools";
+ /** Histogram buckets for the task execution time metric (in milliseconds). */
+ public static final long[] TASK_EXEC_TIME_HISTOGRAM_BUCKETS = new long[] {100, 1000, 10000, 30000, 60000};
+
/** Executor service. */
@GridToStringExclude
private ThreadPoolExecutor execSvc;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridMutableLong.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridMutableLong.java
index fd8a29f..fbdecc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridMutableLong.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridMutableLong.java
@@ -57,6 +57,15 @@ public class GridMutableLong {
return v;
}
+ /**
+ * Sets the new value.
+ *
+ * @param v New value.
+ */
+ public void set(long v) {
+ this.v = v;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return Long.toString(v);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 18468bc..50d9b9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -38,6 +38,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.processors.pool.MetricsAwareExecutorService;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -51,8 +52,12 @@ import org.jetbrains.annotations.NotNull;
import static java.util.stream.IntStream.range;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD;
+import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_SHUTDOWN_DESC;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_TERMINATED_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_EXEC_TIME;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_EXEC_TIME_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_EXEC_TIME_HISTOGRAM_BUCKETS;
/**
* Striped executor.
@@ -70,6 +75,10 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
/** */
private final IgniteLogger log;
+ /** Task execution time metric. */
+ @GridToStringExclude
+ private volatile HistogramMetricImpl execTime;
+
/**
* @param cnt Count.
* @param igniteInstanceName Node name.
@@ -117,13 +126,15 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
threshold = failureDetectionTimeout;
+ execTime = new HistogramMetricImpl(TASK_EXEC_TIME, TASK_EXEC_TIME_DESC, TASK_EXEC_TIME_HISTOGRAM_BUCKETS);
+
this.log = log;
try {
for (int i = 0; i < cnt; i++) {
stripes[i] = stealTasks
- ? new StripeConcurrentQueue(igniteInstanceName, poolName, i, log, stripes, errHnd, gridWorkerLsnr)
- : new StripeConcurrentQueue(igniteInstanceName, poolName, i, log, errHnd, gridWorkerLsnr);
+ ? new StripeConcurrentQueue(igniteInstanceName, poolName, i, log, stripes, errHnd, gridWorkerLsnr, execTime)
+ : new StripeConcurrentQueue(igniteInstanceName, poolName, i, log, errHnd, gridWorkerLsnr, execTime);
}
for (int i = 0; i < cnt; i++)
@@ -511,6 +522,15 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
this::stripesQueueSizes,
int[].class,
"Size of queue per stripe.");
+
+ HistogramMetricImpl execTime0 = execTime;
+
+ execTime = new HistogramMetricImpl(metricName(mreg.name(), TASK_EXEC_TIME), execTime0);
+
+ mreg.register(execTime);
+
+ for (Stripe stripe : stripes)
+ stripe.execTime = execTime;
}
/** {@inheritDoc} */
@@ -544,7 +564,10 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
protected Thread thread;
/** Critical failure handler. */
- private IgniteInClosure<Throwable> errHnd;
+ private final IgniteInClosure<Throwable> errHnd;
+
+ /** Task execution time consumer. */
+ private volatile HistogramMetricImpl execTime;
/**
* @param igniteInstanceName Ignite instance name.
@@ -553,6 +576,7 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
* @param log Logger.
* @param errHnd Exception handler.
* @param gridWorkerLsnr listener to link with stripe worker.
+ * @param execTime Task execution time consumer.
*/
public Stripe(
String igniteInstanceName,
@@ -560,7 +584,8 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
int idx,
IgniteLogger log,
IgniteInClosure<Throwable> errHnd,
- GridWorkerListener gridWorkerLsnr
+ GridWorkerListener gridWorkerLsnr,
+ HistogramMetricImpl execTime
) {
super(igniteInstanceName, poolName + "-stripe-" + idx, log, gridWorkerLsnr);
@@ -568,6 +593,7 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
this.idx = idx;
this.log = log;
this.errHnd = errHnd;
+ this.execTime = execTime;
}
/**
@@ -613,6 +639,8 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
finally {
active = false;
completedCnt++;
+
+ execTime.value(U.currentTimeMillis() - lastStartedTs);
}
}
@@ -703,6 +731,7 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
* @param log Logger.
* @param errHnd Critical failure handler.
* @param gridWorkerLsnr listener to link with stripe worker.
+ * @param execTime Task execution time metric.
*/
StripeConcurrentQueue(
String igniteInstanceName,
@@ -710,9 +739,10 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
int idx,
IgniteLogger log,
IgniteInClosure<Throwable> errHnd,
- GridWorkerListener gridWorkerLsnr
+ GridWorkerListener gridWorkerLsnr,
+ HistogramMetricImpl execTime
) {
- this(igniteInstanceName, poolName, idx, log, null, errHnd, gridWorkerLsnr);
+ this(igniteInstanceName, poolName, idx, log, null, errHnd, gridWorkerLsnr, execTime);
}
/**
@@ -722,6 +752,7 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
* @param log Logger.
* @param errHnd Critical failure handler.
* @param gridWorkerLsnr listener to link with stripe worker.
+ * @param execTime Task execution time metric.
*/
StripeConcurrentQueue(
String igniteInstanceName,
@@ -730,7 +761,8 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
IgniteLogger log,
Stripe[] others,
IgniteInClosure<Throwable> errHnd,
- GridWorkerListener gridWorkerLsnr
+ GridWorkerListener gridWorkerLsnr,
+ HistogramMetricImpl execTime
) {
super(
igniteInstanceName,
@@ -738,7 +770,8 @@ public class StripedExecutor implements ExecutorService, MetricsAwareExecutorSer
idx,
log,
errHnd,
- gridWorkerLsnr);
+ gridWorkerLsnr,
+ execTime);
this.others = others;
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index cc28c00..c56866b 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -30,10 +30,13 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.processors.pool.MetricsAwareExecutorService;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.NotNull;
+import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.ACTIVE_COUNT_DESC;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.COMPLETED_TASK_DESC;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.CORE_SIZE_DESC;
@@ -47,6 +50,8 @@ import static org.apache.ignite.internal.processors.pool.PoolProcessor.POOL_SIZE
import static org.apache.ignite.internal.processors.pool.PoolProcessor.QUEUE_SIZE_DESC;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.REJ_HND_DESC;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_COUNT_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_EXEC_TIME;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_EXEC_TIME_HISTOGRAM_BUCKETS;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.THRD_FACTORY_DESC;
/**
@@ -56,6 +61,10 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService, Metrics
/** Stripe pools. */
private final IgniteThreadPoolExecutor[] execs;
+ /** Task execution time metric. */
+ @GridToStringExclude
+ private volatile HistogramMetricImpl execTime;
+
/**
* Create striped thread pool.
*
@@ -76,6 +85,7 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService, Metrics
boolean allowCoreThreadTimeOut,
long keepAliveTime) {
execs = new IgniteThreadPoolExecutor[concurrentLvl];
+ execTime = new HistogramMetricImpl(TASK_EXEC_TIME, TASK_COUNT_DESC, TASK_EXEC_TIME_HISTOGRAM_BUCKETS);
ThreadFactory factory = new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, eHnd);
@@ -85,7 +95,8 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService, Metrics
1,
keepAliveTime,
new LinkedBlockingQueue<>(),
- factory);
+ factory,
+ execTime);
executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
@@ -226,6 +237,15 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService, Metrics
.value(execs[0].getRejectedExecutionHandler().getClass().getName());
mreg.objectMetric("ThreadFactoryClass", String.class, THRD_FACTORY_DESC)
.value(execs[0].getThreadFactory().getClass().getName());
+
+ HistogramMetricImpl execTime0 = execTime;
+
+ execTime = new HistogramMetricImpl(metricName(mreg.name(), TASK_EXEC_TIME), execTime0);
+
+ mreg.register(execTime);
+
+ for (IgniteThreadPoolExecutor exec : execs)
+ exec.executionTimeMetric(execTime);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
index 2c08514..1eb0038 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
@@ -25,8 +25,14 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.processors.pool.MetricsAwareExecutorService;
+import org.apache.ignite.internal.util.GridMutableLong;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.ACTIVE_COUNT_DESC;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.COMPLETED_TASK_DESC;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.CORE_SIZE_DESC;
@@ -40,12 +46,23 @@ import static org.apache.ignite.internal.processors.pool.PoolProcessor.POOL_SIZE
import static org.apache.ignite.internal.processors.pool.PoolProcessor.QUEUE_SIZE_DESC;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.REJ_HND_DESC;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_COUNT_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_EXEC_TIME;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_EXEC_TIME_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_EXEC_TIME_HISTOGRAM_BUCKETS;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.THRD_FACTORY_DESC;
/**
* An {@link ExecutorService} that executes submitted tasks using pooled grid threads.
*/
public class IgniteThreadPoolExecutor extends ThreadPoolExecutor implements MetricsAwareExecutorService {
+ /** Thread local task start time. */
+ @GridToStringExclude
+ private final ThreadLocal<GridMutableLong> taskStartTime = ThreadLocal.withInitial(() -> new GridMutableLong(0));
+
+ /** Task execution time metric. */
+ @GridToStringExclude
+ private volatile HistogramMetricImpl execTime;
+
/**
* Creates a new service with the given initial parameters.
*
@@ -104,11 +121,10 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor implements Metr
BlockingQueue<Runnable> workQ,
byte plc,
UncaughtExceptionHandler eHnd) {
- super(
+ this(
corePoolSize,
maxPoolSize,
keepAliveTime,
- TimeUnit.MILLISECONDS,
workQ,
new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, plc, eHnd)
);
@@ -134,6 +150,38 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor implements Metr
long keepAliveTime,
BlockingQueue<Runnable> workQ,
ThreadFactory threadFactory) {
+ this(
+ corePoolSize,
+ maxPoolSize,
+ keepAliveTime,
+ workQ,
+ threadFactory,
+ null
+ );
+ }
+
+ /**
+ * Creates a new service with the given initial parameters.
+ *
+ * NOTE: There is a known bug. If 'corePoolSize' equals {@code 0},
+ * then the pool will degrade to a single-threaded pool.
+ *
+ * @param corePoolSize The number of threads to keep in the pool, even if they are idle.
+ * @param maxPoolSize The maximum number of threads to allow in the pool.
+ * @param keepAliveTime When the number of threads is greater than the core, this is the maximum time
+ * that excess idle threads will wait for new tasks before terminating.
+ * @param workQ The queue to use for holding tasks before they are executed. This queue will hold only the
+ * runnable tasks submitted by the {@link #execute(Runnable)} method.
+ * @param threadFactory Thread factory.
+ * @param execTime Task execution time metric.
+ */
+ protected IgniteThreadPoolExecutor(
+ int corePoolSize,
+ int maxPoolSize,
+ long keepAliveTime,
+ BlockingQueue<Runnable> workQ,
+ ThreadFactory threadFactory,
+ @Nullable HistogramMetricImpl execTime) {
super(
corePoolSize,
maxPoolSize,
@@ -143,6 +191,26 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor implements Metr
threadFactory,
new AbortPolicy()
);
+
+ this.execTime = execTime != null
+ ? execTime
+ : new HistogramMetricImpl(TASK_EXEC_TIME, TASK_EXEC_TIME_DESC, TASK_EXEC_TIME_HISTOGRAM_BUCKETS);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeExecute(Thread t, Runnable r) {
+ super.beforeExecute(t, r);
+
+ taskStartTime.get().set(U.currentTimeMillis());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterExecute(Runnable r, Throwable t) {
+ GridMutableLong val = taskStartTime.get();
+
+ execTime.value(U.currentTimeMillis() - val.get());
+
+ super.afterExecute(r, t);
}
/** {@inheritDoc} */
@@ -163,5 +231,18 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor implements Metr
() -> getRejectedExecutionHandler().getClass().getName(), String.class, REJ_HND_DESC);
mreg.register("ThreadFactoryClass",
() -> getThreadFactory().getClass().getName(), String.class, THRD_FACTORY_DESC);
+
+ HistogramMetricImpl execTime0 = execTime;
+
+ execTime = new HistogramMetricImpl(metricName(mreg.name(), TASK_EXEC_TIME), execTime0);
+
+ mreg.register(execTime);
+ }
+
+ /**
+ * @param execTime Task execution time metric.
+ */
+ protected void executionTimeMetric(HistogramMetricImpl execTime) {
+ this.execTime = execTime;
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/thread/ThreadPoolMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/thread/ThreadPoolMetricsTest.java
index 5bbf705..18132a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/thread/ThreadPoolMetricsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/thread/ThreadPoolMetricsTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.AbstractTestPluginProvider;
import org.apache.ignite.spi.metric.BooleanMetric;
+import org.apache.ignite.spi.metric.HistogramMetric;
import org.apache.ignite.spi.metric.IntMetric;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
@@ -52,6 +53,7 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static java.util.Arrays.stream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.internal.IgnitionEx.gridx;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
@@ -206,8 +208,13 @@ public class ThreadPoolMetricsTest extends GridCommonAbstractTest {
for (Map.Entry<String, Function<PoolProcessor, ExecutorService>> entry : THREAD_POOL_METRICS.entrySet()) {
String metricsName = entry.getKey();
ExecutorService execSvc = entry.getValue().apply(poolProc);
+ MetricRegistry mreg = ignite.context().metric().registry(metricsName);
+ HistogramMetric execTimeMetric = mreg.findMetric(PoolProcessor.TASK_EXEC_TIME);
boolean stripedExecutor = execSvc instanceof StripedExecutor;
+ // Ensure that the execution time histogram can be reset.
+ execTimeMetric.reset();
+
cntr.set(taskCnt);
for (int i = 0; i < tasks.size(); i++) {
@@ -219,13 +226,13 @@ public class ThreadPoolMetricsTest extends GridCommonAbstractTest {
execSvc.execute(task);
}
- MetricRegistry mreg = ignite.context().metric().registry(metricsName);
String errMsg = "pool=" + mreg.name();
assertTrue(GridTestUtils.waitForCondition(() -> cntr.get() == 0, getTestTimeout()));
assertFalse(errMsg, ((BooleanMetric)mreg.findMetric("Shutdown")).value());
assertFalse(errMsg, ((BooleanMetric)mreg.findMetric("Terminated")).value());
assertTrue(errMsg, ((IntMetric)mreg.findMetric("ActiveCount")).value() > 0);
+ assertTrue(errMsg, stream(execTimeMetric.value()).sum() >= taskCnt);
if (stripedExecutor) {
assertTrue(errMsg, ((IntMetric)mreg.findMetric("StripesCount")).value() > 0);