You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2022/03/17 15:29:41 UTC
[ignite] branch master updated: IGNITE-16698 Fixed metrics in the IgniteStripedThreadPoolExecutor (#9893)
This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3aed85a IGNITE-16698 Fixed metrics in the IgniteStripedThreadPoolExecutor (#9893)
3aed85a is described below
commit 3aed85a2a8775b3ac51996dbbe1312cf493f665d
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Thu Mar 17 18:29:04 2022 +0300
IGNITE-16698 Fixed metrics in the IgniteStripedThreadPoolExecutor (#9893)
---
.../pool/MetricsAwareExecutorService.java | 32 ++++
.../internal/processors/pool/PoolProcessor.java | 111 +-----------
.../ignite/internal/util/StripedExecutor.java | 44 ++++-
.../thread/IgniteStripedThreadPoolExecutor.java | 133 ++++++++++++++-
.../ignite/thread/IgniteThreadPoolExecutor.java | 39 ++++-
.../ignite/thread/ThreadPoolMetricsTest.java | 188 ++++++++++++++++-----
6 files changed, 398 insertions(+), 149 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/MetricsAwareExecutorService.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/MetricsAwareExecutorService.java
new file mode 100644
index 0000000..bfdd4d5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/MetricsAwareExecutorService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.pool;
+
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+
+/**
+ * Executor service that supports self-registration of metrics.
+ */
+public interface MetricsAwareExecutorService {
+ /**
+ * Register thread pool metrics.
+ *
+ * @param mreg Metrics registry.
+ */
+ public void registerMetrics(MetricRegistry mreg);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index ec15118..199e7dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -28,8 +28,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -69,7 +67,6 @@ import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_RUNNER_THREAD_PREFIX;
@@ -601,8 +598,8 @@ public class PoolProcessor extends GridProcessorAdapter {
monitorExecutor("GridSchemaExecutor", schemaExecSvc);
monitorExecutor("GridRebalanceExecutor", rebalanceExecSvc);
monitorExecutor("GridRebalanceStripedExecutor", rebalanceStripedExecSvc);
-
- monitorStripedPool("GridDataStreamExecutor", dataStreamerExecSvc);
+ monitorExecutor("GridDataStreamExecutor", dataStreamerExecSvc);
+ monitorExecutor("StripedExecutor", stripedExecSvc);
if (idxExecSvc != null)
monitorExecutor("GridIndexingExecutor", idxExecSvc);
@@ -610,11 +607,6 @@ public class PoolProcessor extends GridProcessorAdapter {
if (ctx.config().getConnectorConfiguration() != null)
monitorExecutor("GridRestExecutor", restExecSvc);
- if (stripedExecSvc != null) {
- // Striped executor uses a custom adapter.
- monitorStripedPool("StripedExecutor", stripedExecSvc);
- }
-
if (snpExecSvc != null)
monitorExecutor("GridSnapshotExecutor", snpExecSvc);
@@ -928,103 +920,12 @@ public class PoolProcessor extends GridProcessorAdapter {
* @param execSvc Executor to register a metric for.
*/
private void monitorExecutor(String name, ExecutorService execSvc) {
- MetricRegistry mreg = ctx.metric().registry(metricName(THREAD_POOLS, name));
-
- if (execSvc instanceof ThreadPoolExecutor) {
- ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
-
- mreg.register("ActiveCount", exec::getActiveCount, ACTIVE_COUNT_DESC);
- mreg.register("CompletedTaskCount", exec::getCompletedTaskCount, COMPLETED_TASK_DESC);
- mreg.register("CorePoolSize", exec::getCorePoolSize, CORE_SIZE_DESC);
- mreg.register("LargestPoolSize", exec::getLargestPoolSize, LARGEST_SIZE_DESC);
- mreg.register("MaximumPoolSize", exec::getMaximumPoolSize, MAX_SIZE_DESC);
- mreg.register("PoolSize", exec::getPoolSize, POOL_SIZE_DESC);
- mreg.register("TaskCount", exec::getTaskCount, TASK_COUNT_DESC);
- mreg.register("QueueSize", () -> exec.getQueue().size(), QUEUE_SIZE_DESC);
- mreg.register("KeepAliveTime", () -> exec.getKeepAliveTime(MILLISECONDS), KEEP_ALIVE_TIME_DESC);
- mreg.register("Shutdown", exec::isShutdown, IS_SHUTDOWN_DESC);
- mreg.register("Terminated", exec::isTerminated, IS_TERMINATED_DESC);
- mreg.register("Terminating", exec::isTerminating, IS_TERMINATING_DESC);
- mreg.register("RejectedExecutionHandlerClass", () -> {
- RejectedExecutionHandler hnd = exec.getRejectedExecutionHandler();
-
- return hnd == null ? "" : hnd.getClass().getName();
- }, String.class, REJ_HND_DESC);
- mreg.register("ThreadFactoryClass", () -> {
- ThreadFactory factory = exec.getThreadFactory();
-
- return factory == null ? "" : factory.getClass().getName();
- }, String.class, THRD_FACTORY_DESC);
- }
- else {
- mreg.longMetric("ActiveCount", ACTIVE_COUNT_DESC).value(0);
- mreg.longMetric("CompletedTaskCount", COMPLETED_TASK_DESC).value(0);
- mreg.longMetric("CorePoolSize", CORE_SIZE_DESC).value(0);
- mreg.longMetric("LargestPoolSize", LARGEST_SIZE_DESC).value(0);
- mreg.longMetric("MaximumPoolSize", MAX_SIZE_DESC).value(0);
- mreg.longMetric("PoolSize", POOL_SIZE_DESC).value(0);
- mreg.longMetric("TaskCount", TASK_COUNT_DESC);
- mreg.longMetric("QueueSize", QUEUE_SIZE_DESC).value(0);
- mreg.longMetric("KeepAliveTime", KEEP_ALIVE_TIME_DESC).value(0);
- mreg.register("Shutdown", execSvc::isShutdown, IS_SHUTDOWN_DESC);
- mreg.register("Terminated", execSvc::isTerminated, IS_TERMINATED_DESC);
- mreg.longMetric("Terminating", IS_TERMINATING_DESC);
- mreg.objectMetric("RejectedExecutionHandlerClass", String.class, REJ_HND_DESC).value("");
- mreg.objectMetric("ThreadFactoryClass", String.class, THRD_FACTORY_DESC).value("");
+ if (!(execSvc instanceof MetricsAwareExecutorService)) {
+ throw new UnsupportedOperationException(
+ "Executor '" + name + "' does not implement '" + MetricsAwareExecutorService.class.getSimpleName() + "'.");
}
- }
- /**
- * Creates a {@link MetricRegistry} for a stripped executor.
- *
- * @param name name of the bean to register
- * @param svc Executor.
- */
- private void monitorStripedPool(String name, StripedExecutor svc) {
- MetricRegistry mreg = ctx.metric().registry(metricName(THREAD_POOLS, name));
-
- mreg.register("DetectStarvation",
- svc::detectStarvation,
- "True if possible starvation in striped pool is detected.");
-
- mreg.register("StripesCount",
- svc::stripesCount,
- "Stripes count.");
-
- mreg.register("Shutdown",
- svc::isShutdown,
- "True if this executor has been shut down.");
-
- mreg.register("Terminated",
- svc::isTerminated,
- "True if all tasks have completed following shut down.");
-
- mreg.register("TotalQueueSize",
- svc::queueSize,
- "Total queue size of all stripes.");
-
- mreg.register("TotalCompletedTasksCount",
- svc::completedTasks,
- "Completed tasks count of all stripes.");
-
- mreg.register("StripesCompletedTasksCounts",
- svc::stripesCompletedTasks,
- long[].class,
- "Number of completed tasks per stripe.");
-
- mreg.register("ActiveCount",
- svc::activeStripesCount,
- "Number of active tasks of all stripes.");
-
- mreg.register("StripesActiveStatuses",
- svc::stripesActiveStatuses,
- boolean[].class,
- "Number of active tasks per stripe.");
-
- mreg.register("StripesQueueSizes",
- svc::stripesQueueSizes,
- int[].class,
- "Size of queue per stripe.");
+ ((MetricsAwareExecutorService)execSvc).registerMetrics(ctx.metric().registry(metricName(THREAD_POOLS, name)));
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index fb78f68..18468bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -37,6 +37,8 @@ import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.pool.MetricsAwareExecutorService;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -49,11 +51,13 @@ import org.jetbrains.annotations.NotNull;
import static java.util.stream.IntStream.range;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_SHUTDOWN_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_TERMINATED_DESC;
/**
* Striped executor.
*/
-public class StripedExecutor implements ExecutorService {
+public class StripedExecutor implements ExecutorService, MetricsAwareExecutorService {
/** @see IgniteSystemProperties#IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD */
public static final int DFLT_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD = 4;
@@ -472,6 +476,44 @@ public class StripedExecutor implements ExecutorService {
}
/** {@inheritDoc} */
+ @Override public void registerMetrics(MetricRegistry mreg) {
+ mreg.register("StripesCount", this::stripesCount, "Stripes count.");
+ mreg.register("Shutdown", this::isShutdown, IS_SHUTDOWN_DESC);
+ mreg.register("Terminated", this::isTerminated, IS_TERMINATED_DESC);
+
+ mreg.register("DetectStarvation",
+ this::detectStarvation,
+ "True if possible starvation in striped pool is detected.");
+
+ mreg.register("TotalQueueSize",
+ this::queueSize,
+ "Total queue size of all stripes.");
+
+ mreg.register("TotalCompletedTasksCount",
+ this::completedTasks,
+ "Completed tasks count of all stripes.");
+
+ mreg.register("StripesCompletedTasksCounts",
+ this::stripesCompletedTasks,
+ long[].class,
+ "Number of completed tasks per stripe.");
+
+ mreg.register("ActiveCount",
+ this::activeStripesCount,
+ "Number of active tasks of all stripes.");
+
+ mreg.register("StripesActiveStatuses",
+ this::stripesActiveStatuses,
+ boolean[].class,
+ "Number of active tasks per stripe.");
+
+ mreg.register("StripesQueueSizes",
+ this::stripesQueueSizes,
+ int[].class,
+ "Size of queue per stripe.");
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(StripedExecutor.class, this);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 8a40dc2..cc28c00 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -29,15 +29,32 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.pool.MetricsAwareExecutorService;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.NotNull;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.ACTIVE_COUNT_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.COMPLETED_TASK_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.CORE_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_SHUTDOWN_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_TERMINATED_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_TERMINATING_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.KEEP_ALIVE_TIME_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.LARGEST_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.MAX_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.POOL_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.QUEUE_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.REJ_HND_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_COUNT_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.THRD_FACTORY_DESC;
+
/**
* An {@link ExecutorService} that executes submitted tasks using pooled grid threads.
*/
-public class IgniteStripedThreadPoolExecutor implements ExecutorService {
- /** */
- private final ExecutorService[] execs;
+public class IgniteStripedThreadPoolExecutor implements ExecutorService, MetricsAwareExecutorService {
+ /** Stripe pools. */
+ private final IgniteThreadPoolExecutor[] execs;
/**
* Create striped thread pool.
@@ -58,7 +75,7 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
UncaughtExceptionHandler eHnd,
boolean allowCoreThreadTimeOut,
long keepAliveTime) {
- execs = new ExecutorService[concurrentLvl];
+ execs = new IgniteThreadPoolExecutor[concurrentLvl];
ThreadFactory factory = new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, eHnd);
@@ -192,6 +209,114 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
}
/** {@inheritDoc} */
+ @Override public void registerMetrics(MetricRegistry mreg) {
+ mreg.register("ActiveCount", this::activeCount, ACTIVE_COUNT_DESC);
+ mreg.register("CompletedTaskCount", this::completedTaskCount, COMPLETED_TASK_DESC);
+ mreg.intMetric("CorePoolSize", CORE_SIZE_DESC).value(execs.length);
+ mreg.register("LargestPoolSize", this::largestPoolSize, LARGEST_SIZE_DESC);
+ mreg.intMetric("MaximumPoolSize", MAX_SIZE_DESC).value(execs.length);
+ mreg.register("PoolSize", this::poolSize, POOL_SIZE_DESC);
+ mreg.register("TaskCount", this::taskCount, TASK_COUNT_DESC);
+ mreg.register("QueueSize", this::queueSize, QUEUE_SIZE_DESC);
+ mreg.longMetric("KeepAliveTime", KEEP_ALIVE_TIME_DESC).value(execs[0].getKeepAliveTime(TimeUnit.MILLISECONDS));
+ mreg.register("Shutdown", this::isShutdown, IS_SHUTDOWN_DESC);
+ mreg.register("Terminated", this::isTerminated, IS_TERMINATED_DESC);
+ mreg.register("Terminating", this::terminating, IS_TERMINATING_DESC);
+ mreg.objectMetric("RejectedExecutionHandlerClass", String.class, REJ_HND_DESC)
+ .value(execs[0].getRejectedExecutionHandler().getClass().getName());
+ mreg.objectMetric("ThreadFactoryClass", String.class, THRD_FACTORY_DESC)
+ .value(execs[0].getThreadFactory().getClass().getName());
+ }
+
+ /**
+ * @return Returns true if this executor is in the process of terminating after shutdown or shutdownNow but has not
+ * completely terminated.
+ */
+ private boolean terminating() {
+ for (IgniteThreadPoolExecutor exec : execs) {
+ if (!exec.isTerminating())
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * @return Size of the task queue used by executor.
+ */
+ private int queueSize() {
+ int queueSize = 0;
+
+ for (IgniteThreadPoolExecutor exec : execs)
+ queueSize += exec.getQueue().size();
+
+ return queueSize;
+ }
+
+ /**
+ * @return Approximate total number of tasks that have ever been scheduled for execution. Because the states of
+ * tasks and threads may change dynamically during computation, the returned value is only an approximation.
+ */
+ private long taskCount() {
+ long taskCnt = 0;
+
+ for (IgniteThreadPoolExecutor exec : execs)
+ taskCnt += exec.getTaskCount();
+
+ return taskCnt;
+ }
+
+ /**
+ * @return Approximate total number of tasks that have completed execution. Because the states of tasks and threads
+ * may change dynamically during computation, the returned value is only an approximation, but one that does not
+ * ever decrease across successive calls.
+ */
+ private long completedTaskCount() {
+ long completedTaskCnt = 0;
+
+ for (IgniteThreadPoolExecutor exec : execs)
+ completedTaskCnt += exec.getCompletedTaskCount();
+
+ return completedTaskCnt;
+ }
+
+ /**
+ * @return Approximate number of threads that are actively executing tasks.
+ */
+ private int activeCount() {
+ int activeCnt = 0;
+
+ for (IgniteThreadPoolExecutor exec : execs)
+ activeCnt += exec.getActiveCount();
+
+ return activeCnt;
+ }
+
+ /**
+ * @return current number of threads in the pool.
+ */
+ private int poolSize() {
+ int poolSize = 0;
+
+ for (IgniteThreadPoolExecutor exec : execs)
+ poolSize += exec.getPoolSize();
+
+ return poolSize;
+ }
+
+ /**
+ * @return Largest number of threads that have ever simultaneously been in the pool.
+ */
+ private int largestPoolSize() {
+ int largestPoolSize = 0;
+
+ for (IgniteThreadPoolExecutor exec : execs)
+ largestPoolSize += exec.getLargestPoolSize();
+
+ return largestPoolSize;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteStripedThreadPoolExecutor.class, this);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
index 20d2e49..2c08514 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
@@ -24,11 +24,28 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.pool.MetricsAwareExecutorService;
+
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.ACTIVE_COUNT_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.COMPLETED_TASK_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.CORE_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_SHUTDOWN_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_TERMINATED_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_TERMINATING_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.KEEP_ALIVE_TIME_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.LARGEST_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.MAX_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.POOL_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.QUEUE_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.REJ_HND_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_COUNT_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.THRD_FACTORY_DESC;
/**
* An {@link ExecutorService} that executes submitted tasks using pooled grid threads.
*/
-public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
+public class IgniteThreadPoolExecutor extends ThreadPoolExecutor implements MetricsAwareExecutorService {
/**
* Creates a new service with the given initial parameters.
*
@@ -127,4 +144,24 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
new AbortPolicy()
);
}
+
+ /** {@inheritDoc} */
+ @Override public void registerMetrics(MetricRegistry mreg) {
+ mreg.register("ActiveCount", this::getActiveCount, ACTIVE_COUNT_DESC);
+ mreg.register("CompletedTaskCount", this::getCompletedTaskCount, COMPLETED_TASK_DESC);
+ mreg.register("CorePoolSize", this::getCorePoolSize, CORE_SIZE_DESC);
+ mreg.register("LargestPoolSize", this::getLargestPoolSize, LARGEST_SIZE_DESC);
+ mreg.register("MaximumPoolSize", this::getMaximumPoolSize, MAX_SIZE_DESC);
+ mreg.register("PoolSize", this::getPoolSize, POOL_SIZE_DESC);
+ mreg.register("TaskCount", this::getTaskCount, TASK_COUNT_DESC);
+ mreg.register("QueueSize", () -> getQueue().size(), QUEUE_SIZE_DESC);
+ mreg.register("KeepAliveTime", () -> getKeepAliveTime(TimeUnit.MILLISECONDS), KEEP_ALIVE_TIME_DESC);
+ mreg.register("Shutdown", this::isShutdown, IS_SHUTDOWN_DESC);
+ mreg.register("Terminated", this::isTerminated, IS_TERMINATED_DESC);
+ mreg.register("Terminating", this::isTerminating, IS_TERMINATING_DESC);
+ mreg.register("RejectedExecutionHandlerClass",
+ () -> getRejectedExecutionHandler().getClass().getName(), String.class, REJ_HND_DESC);
+ mreg.register("ThreadFactoryClass",
+ () -> getThreadFactory().getClass().getName(), String.class, THRD_FACTORY_DESC);
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/thread/ThreadPoolMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/thread/ThreadPoolMetricsTest.java
index 12c9527..a0e6ea7 100644
--- a/modules/core/src/test/java/org/apache/ignite/thread/ThreadPoolMetricsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/thread/ThreadPoolMetricsTest.java
@@ -17,18 +17,38 @@
package org.apache.ignite.thread;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.ExecutorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
+import org.apache.ignite.internal.util.StripedExecutor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.spi.metric.BooleanMetric;
+import org.apache.ignite.spi.metric.IntMetric;
+import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -44,23 +64,8 @@ import static org.apache.ignite.testframework.GridTestUtils.runAsync;
* Tests that thread pool metrics are available before the start of all Ignite components happened.
*/
public class ThreadPoolMetricsTest extends GridCommonAbstractTest {
- /** Names of the general thread pool metrics. */
- private static final Collection<String> THREAD_POOL_METRICS = Arrays.asList(
- metricName(THREAD_POOLS, "GridUtilityCacheExecutor"),
- metricName(THREAD_POOLS, "GridExecutionExecutor"),
- metricName(THREAD_POOLS, "GridServicesExecutor"),
- metricName(THREAD_POOLS, "GridSystemExecutor"),
- metricName(THREAD_POOLS, "GridClassLoadingExecutor"),
- metricName(THREAD_POOLS, "GridManagementExecutor"),
- metricName(THREAD_POOLS, "GridAffinityExecutor"),
- metricName(THREAD_POOLS, "GridCallbackExecutor"),
- metricName(THREAD_POOLS, "GridQueryExecutor"),
- metricName(THREAD_POOLS, "GridSchemaExecutor"),
- metricName(THREAD_POOLS, "GridRebalanceExecutor"),
- metricName(THREAD_POOLS, "GridThinClientExecutor"),
- metricName(THREAD_POOLS, "GridRebalanceStripedExecutor"),
- metricName(THREAD_POOLS, "GridDataStreamExecutor")
- );
+ /** Custom executor name. */
+ private static final String CUSTOM_EXEC_NAME = "user-pool";
/** Names of the system views for the thread pools. */
private static final Collection<String> THREAD_POOL_VIEWS = Arrays.asList(
@@ -68,22 +73,67 @@ public class ThreadPoolMetricsTest extends GridCommonAbstractTest {
STREAM_POOL_QUEUE_VIEW
);
- /** Latch that indicates whether the start of Ignite components was launched. */
- public final CountDownLatch startLaunchedLatch = new CountDownLatch(1);
-
- /** Latch that indicates whether the start of Ignite components was unblocked. */
- public final CountDownLatch startUnblockedLatch = new CountDownLatch(1);
+ /** Mapping of the metric group name to the thread pool instance. */
+ private static final Map<String, Function<PoolProcessor, ExecutorService>> THREAD_POOL_METRICS =
+ new HashMap<String, Function<PoolProcessor, ExecutorService>>() {{
+ put(metricName(THREAD_POOLS, "GridUtilityCacheExecutor"), PoolProcessor::utilityCachePool);
+ put(metricName(THREAD_POOLS, "GridExecutionExecutor"), PoolProcessor::getExecutorService);
+ put(metricName(THREAD_POOLS, "GridServicesExecutor"), PoolProcessor::getServiceExecutorService);
+ put(metricName(THREAD_POOLS, "GridSystemExecutor"), PoolProcessor::getSystemExecutorService);
+ put(metricName(THREAD_POOLS, "GridClassLoadingExecutor"), PoolProcessor::getPeerClassLoadingExecutorService);
+ put(metricName(THREAD_POOLS, "GridManagementExecutor"), PoolProcessor::getManagementExecutorService);
+ put(metricName(THREAD_POOLS, "GridAffinityExecutor"), PoolProcessor::getAffinityExecutorService);
+ put(metricName(THREAD_POOLS, "GridCallbackExecutor"), PoolProcessor::asyncCallbackPool);
+ put(metricName(THREAD_POOLS, "GridQueryExecutor"), PoolProcessor::getQueryExecutorService);
+ put(metricName(THREAD_POOLS, "GridSchemaExecutor"), PoolProcessor::getSchemaExecutorService);
+ put(metricName(THREAD_POOLS, "GridRebalanceExecutor"), PoolProcessor::getRebalanceExecutorService);
+ put(metricName(THREAD_POOLS, "GridRebalanceStripedExecutor"), PoolProcessor::getStripedRebalanceExecutorService);
+ put(metricName(THREAD_POOLS, "GridThinClientExecutor"), PoolProcessor::getThinClientExecutorService);
+ put(metricName(THREAD_POOLS, "GridDataStreamExecutor"), PoolProcessor::getDataStreamerExecutorService);
+ put(metricName(THREAD_POOLS, "StripedExecutor"), PoolProcessor::getStripedExecutorService);
+ put(metricName(THREAD_POOLS, "GridRestExecutor"), PoolProcessor::getRestExecutorService);
+ put(metricName(THREAD_POOLS, "GridSnapshotExecutor"), PoolProcessor::getSnapshotExecutorService);
+ put(metricName(THREAD_POOLS, "GridReencryptionExecutor"), PoolProcessor::getReencryptionExecutorService);
+ put(metricName(THREAD_POOLS, CUSTOM_EXEC_NAME), proc -> (ExecutorService)proc.customExecutor(CUSTOM_EXEC_NAME));
+ }};
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
- .setPluginProviders(new AbstractTestPluginProvider() {
- /** {@inheritDoc} */
+ .setDataStorageConfiguration(
+ new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(true))
+ )
+ .setConnectorConfiguration(new ConnectorConfiguration())
+ .setExecutorConfiguration(new ExecutorConfiguration().setName(CUSTOM_EXEC_NAME));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Tests that thread pool metrics are available before the start of all Ignite components happened.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testThreadPoolMetricsRegistry() throws Exception {
+ // Latch that indicates whether the start of Ignite components was launched.
+ CountDownLatch startLaunchedLatch = new CountDownLatch(1);
+
+ // Latch that indicates whether the start of Ignite components was unblocked.
+ CountDownLatch startUnblockedLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<IgniteEx> startFut = runAsync(() -> startGrid(
+ getConfiguration(getTestIgniteInstanceName()).setPluginProviders(new AbstractTestPluginProvider() {
@Override public String name() {
return "test-stuck-plugin";
}
- /** {@inheritDoc} */
@Override public void onIgniteStart() {
startLaunchedLatch.countDown();
@@ -94,18 +144,7 @@ public class ThreadPoolMetricsTest extends GridCommonAbstractTest {
throw new IgniteException(e);
}
}
- });
- }
-
- /**
- * Tests that thread pool metrics are available before the start of all Ignite components happened.
- *
- * @throws Exception If failed.
- */
- @Test
- @SuppressWarnings("Convert2MethodRef")
- public void testThreadPoolMetrics() throws Exception {
- IgniteInternalFuture<IgniteEx> startFut = runAsync(() -> startGrid());
+ })));
try {
assertTrue(startLaunchedLatch.await(getTestTimeout(), MILLISECONDS));
@@ -115,7 +154,7 @@ public class ThreadPoolMetricsTest extends GridCommonAbstractTest {
assertTrue(StreamSupport.stream(srv.context().metric().spliterator(), false)
.map(ReadOnlyMetricRegistry::name)
.collect(Collectors.toSet())
- .containsAll(THREAD_POOL_METRICS));
+ .containsAll(THREAD_POOL_METRICS.keySet()));
assertTrue(StreamSupport.stream(srv.context().systemView().spliterator(), false)
.map(SystemView::name)
@@ -128,4 +167,77 @@ public class ThreadPoolMetricsTest extends GridCommonAbstractTest {
startFut.get(getTestTimeout(), MILLISECONDS);
}
+
+ /**
+ * Tests basic thread pool metrics.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testThreadPoolMetrics() throws Exception {
+ IgniteEx ignite = startGrid(getTestIgniteInstanceName());
+ CountDownLatch longTaskLatch = new CountDownLatch(1);
+ PoolProcessor poolProc = ignite.context().pools();
+ List<Runnable> tasks = new ArrayList<>();
+ AtomicInteger cntr = new AtomicInteger();
+ int taskCnt = 10;
+
+ for (int i = 0; i < taskCnt; i++) {
+ tasks.add(new GridTestUtils.IgniteRunnableX() {
+ @Override public void runx() throws Exception {
+ U.sleep(1);
+
+ cntr.decrementAndGet();
+ }
+ });
+ }
+
+ // Add a long task to check the metric of active tasks.
+ tasks.add(new GridTestUtils.IgniteRunnableX() {
+ @Override public void runx() throws Exception {
+ U.await(longTaskLatch, getTestTimeout(), MILLISECONDS);
+ }
+ });
+
+ for (Map.Entry<String, Function<PoolProcessor, ExecutorService>> entry : THREAD_POOL_METRICS.entrySet()) {
+ String metricsName = entry.getKey();
+ ExecutorService execSvc = entry.getValue().apply(poolProc);
+ boolean stripedExecutor = execSvc instanceof StripedExecutor;
+
+ cntr.set(taskCnt);
+
+ for (int i = 0; i < tasks.size(); i++) {
+ Runnable task = tasks.get(i);
+
+ if (execSvc instanceof IgniteStripedThreadPoolExecutor)
+ ((IgniteStripedThreadPoolExecutor)execSvc).execute(task, i);
+ else
+ execSvc.execute(task);
+ }
+
+ MetricRegistry mreg = ignite.context().metric().registry(metricsName);
+ String errMsg = "pool=" + mreg.name();
+
+ assertTrue(GridTestUtils.waitForCondition(() -> cntr.get() == 0, getTestTimeout()));
+ assertFalse(errMsg, ((BooleanMetric)mreg.findMetric("Shutdown")).value());
+ assertFalse(errMsg, ((BooleanMetric)mreg.findMetric("Terminated")).value());
+ assertTrue(errMsg, ((IntMetric)mreg.findMetric("ActiveCount")).value() > 0);
+
+ if (stripedExecutor) {
+ assertTrue(errMsg, ((IntMetric)mreg.findMetric("StripesCount")).value() > 0);
+ assertTrue(errMsg, ((LongMetric)mreg.findMetric("TotalCompletedTasksCount")).value() >= taskCnt);
+
+ continue;
+ }
+
+ assertTrue(errMsg, ((LongMetric)mreg.findMetric("CompletedTaskCount")).value() >= taskCnt);
+ assertFalse(errMsg, F.isEmpty(mreg.findMetric("ThreadFactoryClass").getAsString()));
+ assertFalse(errMsg, F.isEmpty(mreg.findMetric("RejectedExecutionHandlerClass").getAsString()));
+ assertTrue(errMsg, ((IntMetric)mreg.findMetric("CorePoolSize")).value() > 0);
+ assertTrue(errMsg, ((IntMetric)mreg.findMetric("LargestPoolSize")).value() > 0);
+ assertTrue(errMsg, ((IntMetric)mreg.findMetric("MaximumPoolSize")).value() > 0);
+ }
+
+ longTaskLatch.countDown();
+ }
}