You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2022/04/26 16:44:59 UTC

[druid] branch master updated: Worker level task metrics (#12446)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 564d6defd4 Worker level task metrics (#12446)
564d6defd4 is described below

commit 564d6defd47749d55dd07e5549d7264cbc1c4019
Author: zachjsh <za...@gmail.com>
AuthorDate: Tue Apr 26 12:44:44 2022 -0400

    Worker level task metrics (#12446)
    
    * * fix metric name inconsistency
    
    * * add task slot metrics for middle managers
    
    * * add new WorkerTaskCountStatsMonitor to report task count metrics
      from worker
    
    * * more stuff
    
    * * remove unused variable
    
    * * more stuff
    
    * * add javadocs
    
    * * fix checkstyle
    
    * * fix hadoop test failure
    
    * * cleanup
    
    * * add more code coverage in tests
    
    * * fix test failure
    
    * * add docs
    
    * * increase code coverage
    
    * * fix spelling
    
    * * fix failing tests
    
    * * remove dead code
    
    * * fix spelling
---
 docs/configuration/index.md                        |   2 +
 docs/operations/metrics.md                         |   5 +
 .../main/resources/defaultMetricDimensions.json    |   8 +-
 .../druid/indexing/overlord/ForkingTaskRunner.java |  73 ++++++-
 .../indexing/overlord/ForkingTaskRunnerTest.java   |  16 ++
 .../metrics/WorkerTaskCountStatsMonitor.java       |  79 ++++++++
 .../metrics/WorkerTaskCountStatsProvider.java      |  63 ++++++
 .../metrics/WorkerTaskCountStatsMonitorTest.java   | 216 +++++++++++++++++++++
 .../org/apache/druid/cli/CliMiddleManager.java     |   2 +
 website/.spelling                                  |   1 +
 10 files changed, 454 insertions(+), 11 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index f4c268ae9d..e598b36ea0 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -383,6 +383,8 @@ Metric monitoring is an essential part of Druid operations.  The following monit
 |`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|
 |`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: https://github.com/apache/druid/pull/4973.|
 |`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.|
+|`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics about task slot usage per emission period.|
+|`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting, the number of successful/failed tasks, and metrics about task slot usage for the reporting worker, per emission period. Only supported by middleManager node types.|
 
 For example, you might configure monitors on all processes for system and JVM information within `common.runtime.properties` as follows:
 
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index a5894ed005..8124acd1c9 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -214,6 +214,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
 |`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
 |`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
 |`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.|
+|`worker/task/failed/count`|Number of failed tasks run on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.|
+|`worker/task/success/count`|Number of successful tasks run on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.|
+|`worker/taskSlot/idle/count`|Number of idle task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.|
+|`worker/taskSlot/total/count`|Number of total task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|
+|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|
 
 ## Shuffle metrics (Native parallel task)
 
diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index 6c0fa75d1a..a1f5a85479 100644
--- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -63,9 +63,15 @@
   "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
   "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
 
+  "worker/task/failed/count" : { "dimensions" : ["category", "version"], "type" : "count" },
+  "worker/task/success/count" : { "dimensions" : ["category", "version"], "type" : "count" },
+  "worker/taskSlot/idle/count" : { "dimensions" : ["category", "version"], "type" : "gauge" },
+  "worker/taskSlot/total/count" : { "dimensions" : ["category", "version"], "type" : "gauge" },
+  "worker/taskSlot/used/count" : { "dimensions" : ["category", "version"], "type" : "gauge" },
+
   "taskSlot/total/count" : { "dimensions" : ["category"], "type" : "gauge" },
   "taskSlot/idle/count" : { "dimensions" : ["category"], "type" : "gauge" },
-  "taskSlot/busy/count" : { "dimensions" : ["category"], "type" : "gauge" },
+  "taskSlot/used/count" : { "dimensions" : ["category"], "type" : "gauge" },
   "taskSlot/lazy/count" : { "dimensions" : ["category"], "type" : "gauge" },
   "taskSlot/blacklisted/count" : { "dimensions" : ["category"], "type" : "gauge" },
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index c35e10cb25..77c515502d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -63,6 +63,7 @@ import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
 import org.apache.druid.server.metrics.MonitorsConfig;
+import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider;
 import org.apache.druid.tasklogs.TaskLogPusher;
 import org.apache.druid.tasklogs.TaskLogStreamer;
 import org.joda.time.DateTime;
@@ -83,13 +84,14 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Runs tasks in separate processes using the "internal peon" verb.
  */
 public class ForkingTaskRunner
     extends BaseRestorableTaskRunner<ForkingTaskRunner.ForkingTaskRunnerWorkItem>
-    implements TaskLogStreamer
+    implements TaskLogStreamer, WorkerTaskCountStatsProvider
 {
   private static final EmittingLogger LOGGER = new EmittingLogger(ForkingTaskRunner.class);
   private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
@@ -104,6 +106,11 @@ public class ForkingTaskRunner
 
   private volatile boolean stopping = false;
 
+  private static final AtomicLong LAST_REPORTED_FAILED_TASK_COUNT = new AtomicLong();
+  private static final AtomicLong FAILED_TASK_COUNT = new AtomicLong();
+  private static final AtomicLong SUCCESSFUL_TASK_COUNT = new AtomicLong();
+  private static final AtomicLong LAST_REPORTED_SUCCESSFUL_TASK_COUNT = new AtomicLong();
+
   @Inject
   public ForkingTaskRunner(
       ForkingTaskRunnerConfig config,
@@ -399,7 +406,11 @@ public class ForkingTaskRunner
                             )
                         );
                       }
-
+                      if (status.isSuccess()) {
+                        SUCCESSFUL_TASK_COUNT.incrementAndGet();
+                      } else {
+                        FAILED_TASK_COUNT.incrementAndGet();
+                      }
                       TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
                       return status;
                     }
@@ -690,18 +701,12 @@ public class ForkingTaskRunner
   @Override
   public Map<String, Long> getTotalTaskSlotCount()
   {
-    if (config.getPorts() != null && !config.getPorts().isEmpty()) {
-      return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getPorts().size()));
-    }
-    return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getEndPort() - config.getStartPort() + 1));
+    return ImmutableMap.of(workerConfig.getCategory(), getTotalTaskSlotCountLong());
   }
 
   public long getTotalTaskSlotCountLong()
   {
-    if (config.getPorts() != null && !config.getPorts().isEmpty()) {
-      return config.getPorts().size();
-    }
-    return config.getEndPort() - config.getStartPort() + 1;
+    return workerConfig.getCapacity();
   }
 
   @Override
@@ -733,6 +738,54 @@ public class ForkingTaskRunner
     return ImmutableMap.of(workerConfig.getCategory(), 0L);
   }
 
+  @Override
+  public Long getWorkerFailedTaskCount()
+  {
+    long failedTaskCount = FAILED_TASK_COUNT.get();
+    long lastReportedFailedTaskCount = LAST_REPORTED_FAILED_TASK_COUNT.get();
+    LAST_REPORTED_FAILED_TASK_COUNT.set(failedTaskCount);
+    return failedTaskCount - lastReportedFailedTaskCount;
+  }
+
+  @Override
+  public Long getWorkerIdleTaskSlotCount()
+  {
+    return Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0);
+  }
+
+  @Override
+  public Long getWorkerUsedTaskSlotCount()
+  {
+    return (long) portFinder.findUsedPortCount();
+  }
+
+  @Override
+  public Long getWorkerTotalTaskSlotCount()
+  {
+    return getTotalTaskSlotCountLong();
+  }
+
+  @Override
+  public String getWorkerCategory()
+  {
+    return workerConfig.getCategory();
+  }
+
+  @Override
+  public String getWorkerVersion()
+  {
+    return workerConfig.getVersion();
+  }
+
+  @Override
+  public Long getWorkerSuccessfulTaskCount()
+  {
+    long successfulTaskCount = SUCCESSFUL_TASK_COUNT.get();
+    long lastReportedSuccessfulTaskCount = LAST_REPORTED_SUCCESSFUL_TASK_COUNT.get();
+    LAST_REPORTED_SUCCESSFUL_TASK_COUNT.set(successfulTaskCount);
+    return successfulTaskCount - lastReportedSuccessfulTaskCount;
+  }
+
   protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
   {
     private final Task task;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
index ae5c49d4af..2ba8e9d59d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
@@ -231,6 +231,12 @@ public class ForkingTaskRunnerTest
       @Override
       int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
       {
+        WorkerConfig workerConfig = new WorkerConfig();
+        Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount());
+        Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount());
+        Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount());
+        Assert.assertEquals(workerConfig.getCategory(), this.getWorkerCategory());
+        Assert.assertEquals(workerConfig.getVersion(), this.getWorkerVersion());
         // Emulate task process failure
         return 1;
       }
@@ -242,6 +248,8 @@ public class ForkingTaskRunnerTest
         "Task execution process exited unsuccessfully with code[1]. See middleManager logs for more details.",
         status.getErrorMsg()
     );
+    Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerFailedTaskCount());
+    Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount());
   }
 
   @Test
@@ -294,6 +302,12 @@ public class ForkingTaskRunnerTest
       @Override
       int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
       {
+        WorkerConfig workerConfig = new WorkerConfig();
+        Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount());
+        Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount());
+        Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount());
+        Assert.assertEquals(workerConfig.getCategory(), this.getWorkerCategory());
+        Assert.assertEquals(workerConfig.getVersion(), this.getWorkerVersion());
         return 0;
       }
     };
@@ -301,6 +315,8 @@ public class ForkingTaskRunnerTest
     final TaskStatus status = forkingTaskRunner.run(task).get();
     Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
     Assert.assertNull(status.getErrorMsg());
+    Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerFailedTaskCount());
+    Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount());
   }
 
   @Test
diff --git a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java
new file mode 100644
index 0000000000..cfd01610a1
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+
+import java.util.Set;
+
+public class WorkerTaskCountStatsMonitor extends AbstractMonitor
+{
+  private final WorkerTaskCountStatsProvider statsProvider;
+  private final String workerCategory;
+  private final String workerVersion;
+  private final boolean isMiddleManager;
+
+  @Inject
+  public WorkerTaskCountStatsMonitor(
+      Injector injector,
+      @Self Set<NodeRole> nodeRoles
+  )
+  {
+    this.isMiddleManager = nodeRoles.contains(NodeRole.MIDDLE_MANAGER);
+    if (isMiddleManager) {
+      this.statsProvider = injector.getInstance(WorkerTaskCountStatsProvider.class);
+      this.workerCategory = statsProvider.getWorkerCategory();
+      this.workerVersion = statsProvider.getWorkerVersion();
+    } else {
+      this.statsProvider = null;
+      this.workerCategory = null;
+      this.workerVersion = null;
+    }
+  }
+
+  @Override
+  public boolean doMonitor(ServiceEmitter emitter)
+  {
+    if (isMiddleManager) {
+      emit(emitter, "worker/task/failed/count", statsProvider.getWorkerFailedTaskCount());
+      emit(emitter, "worker/task/success/count", statsProvider.getWorkerSuccessfulTaskCount());
+      emit(emitter, "worker/taskSlot/idle/count", statsProvider.getWorkerIdleTaskSlotCount());
+      emit(emitter, "worker/taskSlot/total/count", statsProvider.getWorkerTotalTaskSlotCount());
+      emit(emitter, "worker/taskSlot/used/count", statsProvider.getWorkerUsedTaskSlotCount());
+    }
+    return true;
+  }
+
+  private void emit(ServiceEmitter emitter, String metricName, Long value)
+  {
+    if (value != null) {
+      final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
+      builder.setDimension("category", workerCategory);
+      builder.setDimension("version", workerVersion);
+      emitter.emit(builder.build(metricName, value));
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsProvider.java
new file mode 100644
index 0000000000..3ca9df5624
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+/**
+ * Proides task / task count status at the level of individual worker nodes. These merics
+ * are repoerted by workers, like middle-managers.
+ */
+public interface WorkerTaskCountStatsProvider
+{
+  /**
+   * The number of failed tasks run on worker during emission period.
+   */
+  Long getWorkerFailedTaskCount();
+
+  /**
+   * The number of successful tasks run on worker during emission period.
+   */
+  Long getWorkerSuccessfulTaskCount();
+
+  /**
+   * The number of idle task slots on worker.
+   */
+  Long getWorkerIdleTaskSlotCount();
+
+  /**
+   * The number of total task slots on worker.
+   */
+  Long getWorkerTotalTaskSlotCount();
+
+  /**
+   * The number of used task slots on worker.
+   */
+  Long getWorkerUsedTaskSlotCount();
+
+
+  /**
+   * The worker category.
+   */
+  String getWorkerCategory();
+
+  /**
+   * The worker version.
+   */
+  String getWorkerVersion();
+}
diff --git a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
new file mode 100644
index 0000000000..17702c0fad
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+public class WorkerTaskCountStatsMonitorTest
+{
+  private Injector injectorForMiddleManager;
+  private Injector injectorForMiddleManagerNullStats;
+  private Injector injectorForPeon;
+
+  private WorkerTaskCountStatsProvider statsProvider;
+  private WorkerTaskCountStatsProvider nullStatsProvider;
+
+  @Before
+  public void setUp()
+  {
+    statsProvider = new WorkerTaskCountStatsProvider()
+    {
+      @Override
+      public Long getWorkerTotalTaskSlotCount()
+      {
+        return 5L;
+      }
+
+      @Override
+      public Long getWorkerFailedTaskCount()
+      {
+        return 4L;
+      }
+
+      @Override
+      public Long getWorkerIdleTaskSlotCount()
+      {
+        return 3L;
+      }
+
+      @Override
+      public Long getWorkerSuccessfulTaskCount()
+      {
+        return 2L;
+      }
+
+      @Override
+      public Long getWorkerUsedTaskSlotCount()
+      {
+        return 1L;
+      }
+
+      @Override
+      public String getWorkerCategory()
+      {
+        return "workerCategory";
+      }
+
+      @Override
+      public String getWorkerVersion()
+      {
+        return "workerVersion";
+      }
+    };
+
+    nullStatsProvider = new WorkerTaskCountStatsProvider()
+    {
+      @Nullable
+      @Override
+      public Long getWorkerTotalTaskSlotCount()
+      {
+        return null;
+      }
+
+      @Nullable
+      @Override
+      public Long getWorkerFailedTaskCount()
+      {
+        return null;
+      }
+
+      @Nullable
+      @Override
+      public Long getWorkerIdleTaskSlotCount()
+      {
+        return null;
+      }
+
+      @Nullable
+      @Override
+      public Long getWorkerSuccessfulTaskCount()
+      {
+        return null;
+      }
+
+      @Nullable
+      @Override
+      public Long getWorkerUsedTaskSlotCount()
+      {
+        return null;
+      }
+
+      @Nullable
+      @Override
+      public String getWorkerCategory()
+      {
+        return null;
+      }
+
+      @Nullable
+      @Override
+      public String getWorkerVersion()
+      {
+        return null;
+      }
+    };
+
+    injectorForMiddleManager = Guice.createInjector(
+        ImmutableList.of(
+            (Module) binder -> {
+              binder.bind(WorkerTaskCountStatsProvider.class).toInstance(statsProvider);
+            }
+        )
+    );
+
+    injectorForMiddleManagerNullStats = Guice.createInjector(
+        ImmutableList.of(
+            (Module) binder -> {
+              binder.bind(WorkerTaskCountStatsProvider.class).toInstance(nullStatsProvider);
+            }
+        )
+    );
+
+    injectorForPeon = Guice.createInjector(
+        ImmutableList.of(
+            (Module) binder -> {}
+        )
+    );
+  }
+
+  @Test
+  public void testMonitor()
+  {
+    final WorkerTaskCountStatsMonitor monitor =
+        new WorkerTaskCountStatsMonitor(injectorForMiddleManager, ImmutableSet.of(NodeRole.MIDDLE_MANAGER));
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+    monitor.doMonitor(emitter);
+    Assert.assertEquals(5, emitter.getEvents().size());
+    Assert.assertEquals("worker/task/failed/count", emitter.getEvents().get(0).toMap().get("metric"));
+    Assert.assertEquals("workerCategory", emitter.getEvents().get(0).toMap().get("category"));
+    Assert.assertEquals("workerVersion", emitter.getEvents().get(0).toMap().get("version"));
+    Assert.assertEquals(4L, emitter.getEvents().get(0).toMap().get("value"));
+    Assert.assertEquals("worker/task/success/count", emitter.getEvents().get(1).toMap().get("metric"));
+    Assert.assertEquals("workerCategory", emitter.getEvents().get(1).toMap().get("category"));
+    Assert.assertEquals("workerVersion", emitter.getEvents().get(1).toMap().get("version"));
+    Assert.assertEquals(2L, emitter.getEvents().get(1).toMap().get("value"));
+    Assert.assertEquals("worker/taskSlot/idle/count", emitter.getEvents().get(2).toMap().get("metric"));
+    Assert.assertEquals("workerCategory", emitter.getEvents().get(2).toMap().get("category"));
+    Assert.assertEquals("workerVersion", emitter.getEvents().get(2).toMap().get("version"));
+    Assert.assertEquals(3L, emitter.getEvents().get(2).toMap().get("value"));
+    Assert.assertEquals("worker/taskSlot/total/count", emitter.getEvents().get(3).toMap().get("metric"));
+    Assert.assertEquals("workerCategory", emitter.getEvents().get(3).toMap().get("category"));
+    Assert.assertEquals("workerVersion", emitter.getEvents().get(3).toMap().get("version"));
+    Assert.assertEquals(5L, emitter.getEvents().get(3).toMap().get("value"));
+    Assert.assertEquals("worker/taskSlot/used/count", emitter.getEvents().get(4).toMap().get("metric"));
+    Assert.assertEquals("workerCategory", emitter.getEvents().get(4).toMap().get("category"));
+    Assert.assertEquals("workerVersion", emitter.getEvents().get(4).toMap().get("version"));
+    Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value"));
+  }
+
+  @Test
+  public void testMonitorWithNulls()
+  {
+    final WorkerTaskCountStatsMonitor monitor =
+        new WorkerTaskCountStatsMonitor(injectorForMiddleManagerNullStats, ImmutableSet.of(NodeRole.MIDDLE_MANAGER));
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+    monitor.doMonitor(emitter);
+    Assert.assertEquals(0, emitter.getEvents().size());
+  }
+
+  @Test
+  public void testMonitorNotMiddleManager()
+  {
+    final WorkerTaskCountStatsMonitor monitor =
+        new WorkerTaskCountStatsMonitor(injectorForPeon, ImmutableSet.of(NodeRole.PEON));
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+    monitor.doMonitor(emitter);
+    Assert.assertEquals(0, emitter.getEvents().size());
+  }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index a005e3a48d..ed424ba037 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -78,6 +78,7 @@ import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.http.SelfDiscoveryResource;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
+import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider;
 import org.apache.druid.timeline.PruneLastCompactionState;
 import org.eclipse.jetty.server.Server;
 
@@ -137,6 +138,7 @@ public class CliMiddleManager extends ServerRunnable
 
             binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
             binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
+            binder.bind(WorkerTaskCountStatsProvider.class).to(ForkingTaskRunner.class);
 
             binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
             binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>>() {})
diff --git a/website/.spelling b/website/.spelling
index e83d1b8598..89ad85dd32 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1428,6 +1428,7 @@ Sys
 SysMonitor
 TaskCountStatsMonitor
 TaskSlotCountStatsMonitor
+WorkerTaskCountStatsMonitor
 bufferCapacity
 bufferpoolName
 cms


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org