You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pp...@apache.org on 2022/03/17 15:29:41 UTC

[ignite] branch master updated: IGNITE-16698 Fixed metrics in the IgniteStripedThreadPoolExecutor (#9893)

This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aed85a  IGNITE-16698 Fixed metrics in the IgniteStripedThreadPoolExecutor (#9893)
3aed85a is described below

commit 3aed85a2a8775b3ac51996dbbe1312cf493f665d
Author: Pavel Pereslegin <xx...@gmail.com>
AuthorDate: Thu Mar 17 18:29:04 2022 +0300

    IGNITE-16698 Fixed metrics in the IgniteStripedThreadPoolExecutor (#9893)
---
 .../pool/MetricsAwareExecutorService.java          |  32 ++++
 .../internal/processors/pool/PoolProcessor.java    | 111 +-----------
 .../ignite/internal/util/StripedExecutor.java      |  44 ++++-
 .../thread/IgniteStripedThreadPoolExecutor.java    | 133 ++++++++++++++-
 .../ignite/thread/IgniteThreadPoolExecutor.java    |  39 ++++-
 .../ignite/thread/ThreadPoolMetricsTest.java       | 188 ++++++++++++++++-----
 6 files changed, 398 insertions(+), 149 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/MetricsAwareExecutorService.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/MetricsAwareExecutorService.java
new file mode 100644
index 0000000..bfdd4d5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/MetricsAwareExecutorService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.pool;
+
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+
+/**
+ * Executor service that supports self-registration of metrics.
+ */
+public interface MetricsAwareExecutorService {
+    /**
+     * Register thread pool metrics.
+     *
+     * @param mreg Metrics registry.
+     */
+    public void registerMetrics(MetricRegistry mreg);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index ec15118..199e7dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -28,8 +28,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -69,7 +67,6 @@ import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_RUNNER_THREAD_PREFIX;
@@ -601,8 +598,8 @@ public class PoolProcessor extends GridProcessorAdapter {
         monitorExecutor("GridSchemaExecutor", schemaExecSvc);
         monitorExecutor("GridRebalanceExecutor", rebalanceExecSvc);
         monitorExecutor("GridRebalanceStripedExecutor", rebalanceStripedExecSvc);
-
-        monitorStripedPool("GridDataStreamExecutor", dataStreamerExecSvc);
+        monitorExecutor("GridDataStreamExecutor", dataStreamerExecSvc);
+        monitorExecutor("StripedExecutor", stripedExecSvc);
 
         if (idxExecSvc != null)
             monitorExecutor("GridIndexingExecutor", idxExecSvc);
@@ -610,11 +607,6 @@ public class PoolProcessor extends GridProcessorAdapter {
         if (ctx.config().getConnectorConfiguration() != null)
             monitorExecutor("GridRestExecutor", restExecSvc);
 
-        if (stripedExecSvc != null) {
-            // Striped executor uses a custom adapter.
-            monitorStripedPool("StripedExecutor", stripedExecSvc);
-        }
-
         if (snpExecSvc != null)
             monitorExecutor("GridSnapshotExecutor", snpExecSvc);
 
@@ -928,103 +920,12 @@ public class PoolProcessor extends GridProcessorAdapter {
      * @param execSvc Executor to register a metric for.
      */
     private void monitorExecutor(String name, ExecutorService execSvc) {
-        MetricRegistry mreg = ctx.metric().registry(metricName(THREAD_POOLS, name));
-
-        if (execSvc instanceof ThreadPoolExecutor) {
-            ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
-
-            mreg.register("ActiveCount", exec::getActiveCount, ACTIVE_COUNT_DESC);
-            mreg.register("CompletedTaskCount", exec::getCompletedTaskCount, COMPLETED_TASK_DESC);
-            mreg.register("CorePoolSize", exec::getCorePoolSize, CORE_SIZE_DESC);
-            mreg.register("LargestPoolSize", exec::getLargestPoolSize, LARGEST_SIZE_DESC);
-            mreg.register("MaximumPoolSize", exec::getMaximumPoolSize, MAX_SIZE_DESC);
-            mreg.register("PoolSize", exec::getPoolSize, POOL_SIZE_DESC);
-            mreg.register("TaskCount", exec::getTaskCount, TASK_COUNT_DESC);
-            mreg.register("QueueSize", () -> exec.getQueue().size(), QUEUE_SIZE_DESC);
-            mreg.register("KeepAliveTime", () -> exec.getKeepAliveTime(MILLISECONDS), KEEP_ALIVE_TIME_DESC);
-            mreg.register("Shutdown", exec::isShutdown, IS_SHUTDOWN_DESC);
-            mreg.register("Terminated", exec::isTerminated, IS_TERMINATED_DESC);
-            mreg.register("Terminating", exec::isTerminating, IS_TERMINATING_DESC);
-            mreg.register("RejectedExecutionHandlerClass", () -> {
-                RejectedExecutionHandler hnd = exec.getRejectedExecutionHandler();
-
-                return hnd == null ? "" : hnd.getClass().getName();
-            }, String.class, REJ_HND_DESC);
-            mreg.register("ThreadFactoryClass", () -> {
-                ThreadFactory factory = exec.getThreadFactory();
-
-                return factory == null ? "" : factory.getClass().getName();
-            }, String.class, THRD_FACTORY_DESC);
-        }
-        else {
-            mreg.longMetric("ActiveCount", ACTIVE_COUNT_DESC).value(0);
-            mreg.longMetric("CompletedTaskCount", COMPLETED_TASK_DESC).value(0);
-            mreg.longMetric("CorePoolSize", CORE_SIZE_DESC).value(0);
-            mreg.longMetric("LargestPoolSize", LARGEST_SIZE_DESC).value(0);
-            mreg.longMetric("MaximumPoolSize", MAX_SIZE_DESC).value(0);
-            mreg.longMetric("PoolSize", POOL_SIZE_DESC).value(0);
-            mreg.longMetric("TaskCount", TASK_COUNT_DESC);
-            mreg.longMetric("QueueSize", QUEUE_SIZE_DESC).value(0);
-            mreg.longMetric("KeepAliveTime", KEEP_ALIVE_TIME_DESC).value(0);
-            mreg.register("Shutdown", execSvc::isShutdown, IS_SHUTDOWN_DESC);
-            mreg.register("Terminated", execSvc::isTerminated, IS_TERMINATED_DESC);
-            mreg.longMetric("Terminating", IS_TERMINATING_DESC);
-            mreg.objectMetric("RejectedExecutionHandlerClass", String.class, REJ_HND_DESC).value("");
-            mreg.objectMetric("ThreadFactoryClass", String.class, THRD_FACTORY_DESC).value("");
+        if (!(execSvc instanceof MetricsAwareExecutorService)) {
+            throw new UnsupportedOperationException(
+                "Executor '" + name + "' does not implement '" + MetricsAwareExecutorService.class.getSimpleName() + "'.");
         }
-    }
 
-    /**
-     * Creates a {@link MetricRegistry} for a stripped executor.
-     *
-     * @param name name of the bean to register
-     * @param svc Executor.
-     */
-    private void monitorStripedPool(String name, StripedExecutor svc) {
-        MetricRegistry mreg = ctx.metric().registry(metricName(THREAD_POOLS, name));
-
-        mreg.register("DetectStarvation",
-            svc::detectStarvation,
-            "True if possible starvation in striped pool is detected.");
-
-        mreg.register("StripesCount",
-            svc::stripesCount,
-            "Stripes count.");
-
-        mreg.register("Shutdown",
-            svc::isShutdown,
-            "True if this executor has been shut down.");
-
-        mreg.register("Terminated",
-            svc::isTerminated,
-            "True if all tasks have completed following shut down.");
-
-        mreg.register("TotalQueueSize",
-            svc::queueSize,
-            "Total queue size of all stripes.");
-
-        mreg.register("TotalCompletedTasksCount",
-            svc::completedTasks,
-            "Completed tasks count of all stripes.");
-
-        mreg.register("StripesCompletedTasksCounts",
-            svc::stripesCompletedTasks,
-            long[].class,
-            "Number of completed tasks per stripe.");
-
-        mreg.register("ActiveCount",
-            svc::activeStripesCount,
-            "Number of active tasks of all stripes.");
-
-        mreg.register("StripesActiveStatuses",
-            svc::stripesActiveStatuses,
-            boolean[].class,
-            "Number of active tasks per stripe.");
-
-        mreg.register("StripesQueueSizes",
-            svc::stripesQueueSizes,
-            int[].class,
-            "Size of queue per stripe.");
+        ((MetricsAwareExecutorService)execSvc).registerMetrics(ctx.metric().registry(metricName(THREAD_POOLS, name)));
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index fb78f68..18468bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -37,6 +37,8 @@ import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.pool.MetricsAwareExecutorService;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -49,11 +51,13 @@ import org.jetbrains.annotations.NotNull;
 
 import static java.util.stream.IntStream.range;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_SHUTDOWN_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_TERMINATED_DESC;
 
 /**
  * Striped executor.
  */
-public class StripedExecutor implements ExecutorService {
+public class StripedExecutor implements ExecutorService, MetricsAwareExecutorService {
     /** @see IgniteSystemProperties#IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD */
     public static final int DFLT_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD = 4;
 
@@ -472,6 +476,44 @@ public class StripedExecutor implements ExecutorService {
     }
 
     /** {@inheritDoc} */
+    @Override public void registerMetrics(MetricRegistry mreg) {
+        mreg.register("StripesCount", this::stripesCount, "Stripes count.");
+        mreg.register("Shutdown", this::isShutdown, IS_SHUTDOWN_DESC);
+        mreg.register("Terminated", this::isTerminated, IS_TERMINATED_DESC);
+
+        mreg.register("DetectStarvation",
+            this::detectStarvation,
+            "True if possible starvation in striped pool is detected.");
+
+        mreg.register("TotalQueueSize",
+            this::queueSize,
+            "Total queue size of all stripes.");
+
+        mreg.register("TotalCompletedTasksCount",
+            this::completedTasks,
+            "Completed tasks count of all stripes.");
+
+        mreg.register("StripesCompletedTasksCounts",
+            this::stripesCompletedTasks,
+            long[].class,
+            "Number of completed tasks per stripe.");
+
+        mreg.register("ActiveCount",
+            this::activeStripesCount,
+            "Number of active tasks of all stripes.");
+
+        mreg.register("StripesActiveStatuses",
+            this::stripesActiveStatuses,
+            boolean[].class,
+            "Number of active tasks per stripe.");
+
+        mreg.register("StripesQueueSizes",
+            this::stripesQueueSizes,
+            int[].class,
+            "Size of queue per stripe.");
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(StripedExecutor.class, this);
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 8a40dc2..cc28c00 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -29,15 +29,32 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.pool.MetricsAwareExecutorService;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.NotNull;
 
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.ACTIVE_COUNT_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.COMPLETED_TASK_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.CORE_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_SHUTDOWN_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_TERMINATED_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_TERMINATING_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.KEEP_ALIVE_TIME_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.LARGEST_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.MAX_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.POOL_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.QUEUE_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.REJ_HND_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_COUNT_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.THRD_FACTORY_DESC;
+
 /**
  * An {@link ExecutorService} that executes submitted tasks using pooled grid threads.
  */
-public class IgniteStripedThreadPoolExecutor implements ExecutorService {
-    /** */
-    private final ExecutorService[] execs;
+public class IgniteStripedThreadPoolExecutor implements ExecutorService, MetricsAwareExecutorService {
+    /** Stripe pools. */
+    private final IgniteThreadPoolExecutor[] execs;
 
     /**
      * Create striped thread pool.
@@ -58,7 +75,7 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
         UncaughtExceptionHandler eHnd,
         boolean allowCoreThreadTimeOut,
         long keepAliveTime) {
-        execs = new ExecutorService[concurrentLvl];
+        execs = new IgniteThreadPoolExecutor[concurrentLvl];
 
         ThreadFactory factory = new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, eHnd);
 
@@ -192,6 +209,114 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
     }
 
     /** {@inheritDoc} */
+    @Override public void registerMetrics(MetricRegistry mreg) {
+        mreg.register("ActiveCount", this::activeCount, ACTIVE_COUNT_DESC);
+        mreg.register("CompletedTaskCount", this::completedTaskCount, COMPLETED_TASK_DESC);
+        mreg.intMetric("CorePoolSize", CORE_SIZE_DESC).value(execs.length);
+        mreg.register("LargestPoolSize", this::largestPoolSize, LARGEST_SIZE_DESC);
+        mreg.intMetric("MaximumPoolSize", MAX_SIZE_DESC).value(execs.length);
+        mreg.register("PoolSize", this::poolSize, POOL_SIZE_DESC);
+        mreg.register("TaskCount", this::taskCount, TASK_COUNT_DESC);
+        mreg.register("QueueSize", this::queueSize, QUEUE_SIZE_DESC);
+        mreg.longMetric("KeepAliveTime", KEEP_ALIVE_TIME_DESC).value(execs[0].getKeepAliveTime(TimeUnit.MILLISECONDS));
+        mreg.register("Shutdown", this::isShutdown, IS_SHUTDOWN_DESC);
+        mreg.register("Terminated", this::isTerminated, IS_TERMINATED_DESC);
+        mreg.register("Terminating", this::terminating, IS_TERMINATING_DESC);
+        mreg.objectMetric("RejectedExecutionHandlerClass", String.class, REJ_HND_DESC)
+            .value(execs[0].getRejectedExecutionHandler().getClass().getName());
+        mreg.objectMetric("ThreadFactoryClass", String.class, THRD_FACTORY_DESC)
+            .value(execs[0].getThreadFactory().getClass().getName());
+    }
+
+    /**
+     * @return Returns true if this executor is in the process of terminating after shutdown or shutdownNow but has not
+     * completely terminated.
+     */
+    private boolean terminating() {
+        for (IgniteThreadPoolExecutor exec : execs) {
+            if (!exec.isTerminating())
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * @return Size of the task queue used by executor.
+     */
+    private int queueSize() {
+        int queueSize = 0;
+
+        for (IgniteThreadPoolExecutor exec : execs)
+            queueSize += exec.getQueue().size();
+
+        return queueSize;
+    }
+
+    /**
+     * @return Approximate total number of tasks that have ever been scheduled for execution. Because the states of
+     * tasks and threads may change dynamically during computation, the returned value is only an approximation.
+     */
+    private long taskCount() {
+        long taskCnt = 0;
+
+        for (IgniteThreadPoolExecutor exec : execs)
+            taskCnt += exec.getTaskCount();
+
+        return taskCnt;
+    }
+
+    /**
+     * @return Approximate total number of tasks that have completed execution. Because the states of tasks and threads
+     * may change dynamically during computation, the returned value is only an approximation, but one that does not
+     * ever decrease across successive calls.
+     */
+    private long completedTaskCount() {
+        long completedTaskCnt = 0;
+
+        for (IgniteThreadPoolExecutor exec : execs)
+            completedTaskCnt += exec.getCompletedTaskCount();
+
+        return completedTaskCnt;
+    }
+
+    /**
+     * @return Approximate number of threads that are actively executing tasks.
+     */
+    private int activeCount() {
+        int activeCnt = 0;
+
+        for (IgniteThreadPoolExecutor exec : execs)
+            activeCnt += exec.getActiveCount();
+
+        return activeCnt;
+    }
+
+    /**
+     * @return current number of threads in the pool.
+     */
+    private int poolSize() {
+        int poolSize = 0;
+
+        for (IgniteThreadPoolExecutor exec : execs)
+            poolSize += exec.getPoolSize();
+
+        return poolSize;
+    }
+
+    /**
+     * @return Largest number of threads that have ever simultaneously been in the pool.
+     */
+    private int largestPoolSize() {
+        int largestPoolSize = 0;
+
+        for (IgniteThreadPoolExecutor exec : execs)
+            largestPoolSize += exec.getLargestPoolSize();
+
+        return largestPoolSize;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteStripedThreadPoolExecutor.class, this);
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
index 20d2e49..2c08514 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
@@ -24,11 +24,28 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.pool.MetricsAwareExecutorService;
+
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.ACTIVE_COUNT_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.COMPLETED_TASK_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.CORE_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_SHUTDOWN_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_TERMINATED_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.IS_TERMINATING_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.KEEP_ALIVE_TIME_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.LARGEST_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.MAX_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.POOL_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.QUEUE_SIZE_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.REJ_HND_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.TASK_COUNT_DESC;
+import static org.apache.ignite.internal.processors.pool.PoolProcessor.THRD_FACTORY_DESC;
 
 /**
  * An {@link ExecutorService} that executes submitted tasks using pooled grid threads.
  */
-public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
+public class IgniteThreadPoolExecutor extends ThreadPoolExecutor implements MetricsAwareExecutorService {
     /**
      * Creates a new service with the given initial parameters.
      *
@@ -127,4 +144,24 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
             new AbortPolicy()
         );
     }
+
+    /** {@inheritDoc} */
+    @Override public void registerMetrics(MetricRegistry mreg) {
+        mreg.register("ActiveCount", this::getActiveCount, ACTIVE_COUNT_DESC);
+        mreg.register("CompletedTaskCount", this::getCompletedTaskCount, COMPLETED_TASK_DESC);
+        mreg.register("CorePoolSize", this::getCorePoolSize, CORE_SIZE_DESC);
+        mreg.register("LargestPoolSize", this::getLargestPoolSize, LARGEST_SIZE_DESC);
+        mreg.register("MaximumPoolSize", this::getMaximumPoolSize, MAX_SIZE_DESC);
+        mreg.register("PoolSize", this::getPoolSize, POOL_SIZE_DESC);
+        mreg.register("TaskCount", this::getTaskCount, TASK_COUNT_DESC);
+        mreg.register("QueueSize", () -> getQueue().size(), QUEUE_SIZE_DESC);
+        mreg.register("KeepAliveTime", () -> getKeepAliveTime(TimeUnit.MILLISECONDS), KEEP_ALIVE_TIME_DESC);
+        mreg.register("Shutdown", this::isShutdown, IS_SHUTDOWN_DESC);
+        mreg.register("Terminated", this::isTerminated, IS_TERMINATED_DESC);
+        mreg.register("Terminating", this::isTerminating, IS_TERMINATING_DESC);
+        mreg.register("RejectedExecutionHandlerClass",
+            () -> getRejectedExecutionHandler().getClass().getName(), String.class, REJ_HND_DESC);
+        mreg.register("ThreadFactoryClass",
+            () -> getThreadFactory().getClass().getName(), String.class, THRD_FACTORY_DESC);
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/thread/ThreadPoolMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/thread/ThreadPoolMetricsTest.java
index 12c9527..a0e6ea7 100644
--- a/modules/core/src/test/java/org/apache/ignite/thread/ThreadPoolMetricsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/thread/ThreadPoolMetricsTest.java
@@ -17,18 +17,38 @@
 
 package org.apache.ignite.thread;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.ExecutorConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
+import org.apache.ignite.internal.util.StripedExecutor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.spi.metric.BooleanMetric;
+import org.apache.ignite.spi.metric.IntMetric;
+import org.apache.ignite.spi.metric.LongMetric;
 import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
 import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
@@ -44,23 +64,8 @@ import static org.apache.ignite.testframework.GridTestUtils.runAsync;
  * Tests that thread pool metrics are available before the start of all Ignite components happened.
  */
 public class ThreadPoolMetricsTest extends GridCommonAbstractTest {
-    /** Names of the general thread pool metrics. */
-    private static final Collection<String> THREAD_POOL_METRICS = Arrays.asList(
-        metricName(THREAD_POOLS, "GridUtilityCacheExecutor"),
-        metricName(THREAD_POOLS, "GridExecutionExecutor"),
-        metricName(THREAD_POOLS, "GridServicesExecutor"),
-        metricName(THREAD_POOLS, "GridSystemExecutor"),
-        metricName(THREAD_POOLS, "GridClassLoadingExecutor"),
-        metricName(THREAD_POOLS, "GridManagementExecutor"),
-        metricName(THREAD_POOLS, "GridAffinityExecutor"),
-        metricName(THREAD_POOLS, "GridCallbackExecutor"),
-        metricName(THREAD_POOLS, "GridQueryExecutor"),
-        metricName(THREAD_POOLS, "GridSchemaExecutor"),
-        metricName(THREAD_POOLS, "GridRebalanceExecutor"),
-        metricName(THREAD_POOLS, "GridThinClientExecutor"),
-        metricName(THREAD_POOLS, "GridRebalanceStripedExecutor"),
-        metricName(THREAD_POOLS, "GridDataStreamExecutor")
-    );
+    /** Custom executor name. */
+    private static final String CUSTOM_EXEC_NAME = "user-pool";
 
     /** Names of the system views for the thread pools. */
     private static final Collection<String> THREAD_POOL_VIEWS = Arrays.asList(
@@ -68,22 +73,67 @@ public class ThreadPoolMetricsTest extends GridCommonAbstractTest {
         STREAM_POOL_QUEUE_VIEW
     );
 
-    /** Latch that indicates whether the start of Ignite components was launched. */
-    public final CountDownLatch startLaunchedLatch = new CountDownLatch(1);
-
-    /** Latch that indicates whether the start of Ignite components was unblocked. */
-    public final CountDownLatch startUnblockedLatch = new CountDownLatch(1);
+    /** Mapping of the metric group name to the thread pool instance. */
+    private static final Map<String, Function<PoolProcessor, ExecutorService>> THREAD_POOL_METRICS =
+        new HashMap<String, Function<PoolProcessor, ExecutorService>>() {{
+            put(metricName(THREAD_POOLS, "GridUtilityCacheExecutor"), PoolProcessor::utilityCachePool);
+            put(metricName(THREAD_POOLS, "GridExecutionExecutor"), PoolProcessor::getExecutorService);
+            put(metricName(THREAD_POOLS, "GridServicesExecutor"), PoolProcessor::getServiceExecutorService);
+            put(metricName(THREAD_POOLS, "GridSystemExecutor"), PoolProcessor::getSystemExecutorService);
+            put(metricName(THREAD_POOLS, "GridClassLoadingExecutor"), PoolProcessor::getPeerClassLoadingExecutorService);
+            put(metricName(THREAD_POOLS, "GridManagementExecutor"), PoolProcessor::getManagementExecutorService);
+            put(metricName(THREAD_POOLS, "GridAffinityExecutor"), PoolProcessor::getAffinityExecutorService);
+            put(metricName(THREAD_POOLS, "GridCallbackExecutor"), PoolProcessor::asyncCallbackPool);
+            put(metricName(THREAD_POOLS, "GridQueryExecutor"), PoolProcessor::getQueryExecutorService);
+            put(metricName(THREAD_POOLS, "GridSchemaExecutor"), PoolProcessor::getSchemaExecutorService);
+            put(metricName(THREAD_POOLS, "GridRebalanceExecutor"), PoolProcessor::getRebalanceExecutorService);
+            put(metricName(THREAD_POOLS, "GridRebalanceStripedExecutor"), PoolProcessor::getStripedRebalanceExecutorService);
+            put(metricName(THREAD_POOLS, "GridThinClientExecutor"), PoolProcessor::getThinClientExecutorService);
+            put(metricName(THREAD_POOLS, "GridDataStreamExecutor"), PoolProcessor::getDataStreamerExecutorService);
+            put(metricName(THREAD_POOLS, "StripedExecutor"), PoolProcessor::getStripedExecutorService);
+            put(metricName(THREAD_POOLS, "GridRestExecutor"), PoolProcessor::getRestExecutorService);
+            put(metricName(THREAD_POOLS, "GridSnapshotExecutor"), PoolProcessor::getSnapshotExecutorService);
+            put(metricName(THREAD_POOLS, "GridReencryptionExecutor"), PoolProcessor::getReencryptionExecutorService);
+            put(metricName(THREAD_POOLS, CUSTOM_EXEC_NAME), proc -> (ExecutorService)proc.customExecutor(CUSTOM_EXEC_NAME));
+        }};
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         return super.getConfiguration(igniteInstanceName)
-            .setPluginProviders(new AbstractTestPluginProvider() {
-                /** {@inheritDoc} */
+            .setDataStorageConfiguration(
+                new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration().setPersistenceEnabled(true))
+            )
+            .setConnectorConfiguration(new ConnectorConfiguration())
+            .setExecutorConfiguration(new ExecutorConfiguration().setName(CUSTOM_EXEC_NAME));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Tests that thread pool metrics are available before the start of all Ignite components happened.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testThreadPoolMetricsRegistry() throws Exception {
+        // Latch that indicates whether the start of Ignite components was launched.
+        CountDownLatch startLaunchedLatch = new CountDownLatch(1);
+
+        // Latch that indicates whether the start of Ignite components was unblocked.
+        CountDownLatch startUnblockedLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<IgniteEx> startFut = runAsync(() -> startGrid(
+            getConfiguration(getTestIgniteInstanceName()).setPluginProviders(new AbstractTestPluginProvider() {
                 @Override public String name() {
                     return "test-stuck-plugin";
                 }
 
-                /** {@inheritDoc} */
                 @Override public void onIgniteStart() {
                     startLaunchedLatch.countDown();
 
@@ -94,18 +144,7 @@ public class ThreadPoolMetricsTest extends GridCommonAbstractTest {
                         throw new IgniteException(e);
                     }
                 }
-            });
-    }
-
-    /**
-     * Tests that thread pool metrics are available before the start of all Ignite components happened.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    @SuppressWarnings("Convert2MethodRef")
-    public void testThreadPoolMetrics() throws Exception {
-        IgniteInternalFuture<IgniteEx> startFut = runAsync(() -> startGrid());
+            })));
 
         try {
             assertTrue(startLaunchedLatch.await(getTestTimeout(), MILLISECONDS));
@@ -115,7 +154,7 @@ public class ThreadPoolMetricsTest extends GridCommonAbstractTest {
             assertTrue(StreamSupport.stream(srv.context().metric().spliterator(), false)
                 .map(ReadOnlyMetricRegistry::name)
                 .collect(Collectors.toSet())
-                .containsAll(THREAD_POOL_METRICS));
+                .containsAll(THREAD_POOL_METRICS.keySet()));
 
             assertTrue(StreamSupport.stream(srv.context().systemView().spliterator(), false)
                 .map(SystemView::name)
@@ -128,4 +167,77 @@ public class ThreadPoolMetricsTest extends GridCommonAbstractTest {
 
         startFut.get(getTestTimeout(), MILLISECONDS);
     }
+
+    /**
+     * Tests basic thread pool metrics.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testThreadPoolMetrics() throws Exception {
+        IgniteEx ignite = startGrid(getTestIgniteInstanceName());
+        CountDownLatch longTaskLatch = new CountDownLatch(1);
+        PoolProcessor poolProc = ignite.context().pools();
+        List<Runnable> tasks = new ArrayList<>();
+        AtomicInteger cntr = new AtomicInteger();
+        int taskCnt = 10;
+
+        for (int i = 0; i < taskCnt; i++) {
+            tasks.add(new GridTestUtils.IgniteRunnableX() {
+                @Override public void runx() throws Exception {
+                    U.sleep(1);
+
+                    cntr.decrementAndGet();
+                }
+            });
+        }
+
+        // Add a long task to check the metric of active tasks.
+        tasks.add(new GridTestUtils.IgniteRunnableX() {
+            @Override public void runx() throws Exception {
+                U.await(longTaskLatch, getTestTimeout(), MILLISECONDS);
+            }
+        });
+
+        for (Map.Entry<String, Function<PoolProcessor, ExecutorService>> entry : THREAD_POOL_METRICS.entrySet()) {
+            String metricsName = entry.getKey();
+            ExecutorService execSvc = entry.getValue().apply(poolProc);
+            boolean stripedExecutor = execSvc instanceof StripedExecutor;
+
+            cntr.set(taskCnt);
+
+            for (int i = 0; i < tasks.size(); i++) {
+                Runnable task = tasks.get(i);
+
+                if (execSvc instanceof IgniteStripedThreadPoolExecutor)
+                    ((IgniteStripedThreadPoolExecutor)execSvc).execute(task, i);
+                else
+                    execSvc.execute(task);
+            }
+
+            MetricRegistry mreg = ignite.context().metric().registry(metricsName);
+            String errMsg = "pool=" + mreg.name();
+
+            assertTrue(GridTestUtils.waitForCondition(() -> cntr.get() == 0, getTestTimeout()));
+            assertFalse(errMsg, ((BooleanMetric)mreg.findMetric("Shutdown")).value());
+            assertFalse(errMsg, ((BooleanMetric)mreg.findMetric("Terminated")).value());
+            assertTrue(errMsg, ((IntMetric)mreg.findMetric("ActiveCount")).value() > 0);
+
+            if (stripedExecutor) {
+                assertTrue(errMsg, ((IntMetric)mreg.findMetric("StripesCount")).value() > 0);
+                assertTrue(errMsg, ((LongMetric)mreg.findMetric("TotalCompletedTasksCount")).value() >= taskCnt);
+
+                continue;
+            }
+
+            assertTrue(errMsg, ((LongMetric)mreg.findMetric("CompletedTaskCount")).value() >= taskCnt);
+            assertFalse(errMsg, F.isEmpty(mreg.findMetric("ThreadFactoryClass").getAsString()));
+            assertFalse(errMsg, F.isEmpty(mreg.findMetric("RejectedExecutionHandlerClass").getAsString()));
+            assertTrue(errMsg, ((IntMetric)mreg.findMetric("CorePoolSize")).value() > 0);
+            assertTrue(errMsg, ((IntMetric)mreg.findMetric("LargestPoolSize")).value() > 0);
+            assertTrue(errMsg, ((IntMetric)mreg.findMetric("MaximumPoolSize")).value() > 0);
+        }
+
+        longTaskLatch.countDown();
+    }
 }