You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2020/09/29 06:50:56 UTC

[druid] branch master updated: Adding task slot count metrics to Druid Overlord (#10379)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8168e14  Adding task slot count metrics to Druid Overlord (#10379)
8168e14 is described below

commit 8168e14e9224c9459efda07b038269815975cf50
Author: Mainak Ghosh <mg...@twitter.com>
AuthorDate: Mon Sep 28 23:50:38 2020 -0700

    Adding task slot count metrics to Druid Overlord (#10379)
    
    * Adding more worker metrics to Druid Overlord
    
    * Changing the nomenclature from worker to peon as that represents the metrics that we want to monitor better
    
    * Few more instance of worker usage replaced with peon
    
    * Modifying the peon idle count logic to only use eligible workers available capacity
    
    * Changing the naming to task slot count instead of peon
    
    * Adding some unit test coverage for the new test runner apis
    
    * Addressing Review Comments
    
    * Modifying the TaskSlotCountStatsProvider apis so that overlords which are not leader do not emit these metrics
    
    * Fixing the spelling issue in the docs
    
    * Setting the annotation Nullable on the TaskSlotCountStatsProvider methods
---
 docs/operations/metrics.md                         |  5 ++
 .../main/resources/defaultMetricDimensions.json    |  6 ++
 .../druid/indexing/overlord/ForkingTaskRunner.java | 33 +++++++++
 .../apache/druid/indexing/overlord/PortFinder.java |  5 ++
 .../druid/indexing/overlord/RemoteTaskRunner.java  | 80 +++++++++++++++++---
 .../overlord/SingleTaskBackgroundRunner.java       | 30 ++++++++
 .../apache/druid/indexing/overlord/TaskMaster.java | 64 +++++++++++++++-
 .../apache/druid/indexing/overlord/TaskRunner.java | 13 ++++
 .../indexing/overlord/ThreadingTaskRunner.java     | 32 ++++++++
 .../overlord/hrtr/HttpRemoteTaskRunner.java        | 61 +++++++++++++++
 .../indexing/common/task/IngestionTestBase.java    | 30 ++++++++
 .../indexing/overlord/RemoteTaskRunnerTest.java    | 25 ++++++-
 .../druid/indexing/overlord/TestTaskRunner.java    | 30 ++++++++
 .../overlord/hrtr/HttpRemoteTaskRunnerTest.java    | 30 ++++++++
 .../druid/indexing/overlord/http/OverlordTest.java | 30 ++++++++
 .../server/metrics/TaskSlotCountStatsMonitor.java  | 57 ++++++++++++++
 .../server/metrics/TaskSlotCountStatsProvider.java | 55 ++++++++++++++
 .../metrics/TaskSlotCountStatsMonitorTest.java     | 86 ++++++++++++++++++++++
 .../java/org/apache/druid/cli/CliOverlord.java     |  2 +
 website/.spelling                                  |  1 +
 20 files changed, 662 insertions(+), 13 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 62b6f57..1b4ed7f 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -196,6 +196,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
 |`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
 |`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
 |`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`taskSlot/total/count`|Number of total task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
+|`taskSlot/idle/count`|Number of idle task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
+|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
+|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
+|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
 
 ## Coordination
 
diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index 859a9c6..1a62d70 100644
--- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -62,6 +62,12 @@
   "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "count" },
   "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "count" },
 
+  "taskSlot/total/count" : { "dimensions" : [], "type" : "count" },
+  "taskSlot/idle/count" : { "dimensions" : [], "type" : "count" },
+  "taskSlot/busy/count" : { "dimensions" : [], "type" : "count" },
+  "taskSlot/lazy/count" : { "dimensions" : [], "type" : "count" },
+  "taskSlot/blacklisted/count" : { "dimensions" : [], "type" : "count" },
+
   "task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer" },
   "segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
   "segment/moved/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index f29af34..2d08338 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -649,6 +649,39 @@ public class ForkingTaskRunner
     return Joiner.on(" ").join(maskedIterator);
   }
 
+  @Override
+  public long getTotalTaskSlotCount()
+  {
+    if (config.getPorts() != null && !config.getPorts().isEmpty()) {
+      return config.getPorts().size();
+    }
+    return config.getEndPort() - config.getStartPort() + 1;
+  }
+
+  @Override
+  public long getIdleTaskSlotCount()
+  {
+    return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0);
+  }
+
+  @Override
+  public long getUsedTaskSlotCount()
+  {
+    return portFinder.findUsedPortCount();
+  }
+
+  @Override
+  public long getLazyTaskSlotCount()
+  {
+    return 0;
+  }
+
+  @Override
+  public long getBlacklistedTaskSlotCount()
+  {
+    return 0;
+  }
+
   protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
   {
     private final Task task;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/PortFinder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/PortFinder.java
index 1a66d1f..25cbfa6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/PortFinder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/PortFinder.java
@@ -79,6 +79,11 @@ public class PortFinder
     usedPorts.remove(port);
   }
 
+  public synchronized int findUsedPortCount()
+  {
+    return usedPorts.size();
+  }
+
   private int chooseFromCandidates()
   {
     for (int port : candidatePorts) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 352f75f..6fd3f7f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -823,17 +823,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
         synchronized (workersWithUnacknowledgedTask) {
           immutableZkWorker = strategy.findWorkerForTask(
               config,
-              ImmutableMap.copyOf(
-                  Maps.transformEntries(
-                      Maps.filterEntries(
-                          zkWorkers,
-                          input -> !lazyWorkers.containsKey(input.getKey()) &&
-                                   !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
-                                   !blackListedWorkers.contains(input.getValue())
-                      ),
-                      (String key, ZkWorker value) -> value.toImmutable()
-                  )
-              ),
+              ImmutableMap.copyOf(getWorkersEligibleToRunTasks()),
               task
           );
 
@@ -867,6 +857,19 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
     }
   }
 
+  Map<String, ImmutableWorkerInfo> getWorkersEligibleToRunTasks()
+  {
+    return Maps.transformEntries(
+        Maps.filterEntries(
+            zkWorkers,
+            input -> !lazyWorkers.containsKey(input.getKey()) &&
+                     !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
+                     !blackListedWorkers.contains(input.getValue())
+        ),
+        (String key, ZkWorker value) -> value.toImmutable()
+    );
+  }
+
   /**
    * Creates a ZK entry under a specific path associated with a worker. The worker is responsible for
    * removing the task ZK entry and creating a task status ZK entry.
@@ -1434,4 +1437,59 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   {
     return workersWithUnacknowledgedTask;
   }
+
+  @Override
+  public long getTotalTaskSlotCount()
+  {
+    long totalPeons = 0;
+    for (ImmutableWorkerInfo worker : getWorkers()) {
+      totalPeons += worker.getWorker().getCapacity();
+    }
+
+    return totalPeons;
+  }
+
+  @Override
+  public long getIdleTaskSlotCount()
+  {
+    long totalIdlePeons = 0;
+    for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) {
+      totalIdlePeons += worker.getAvailableCapacity();
+    }
+
+    return totalIdlePeons;
+  }
+
+  @Override
+  public long getUsedTaskSlotCount()
+  {
+    long totalUsedPeons = 0;
+    for (ImmutableWorkerInfo worker : getWorkers()) {
+      totalUsedPeons += worker.getCurrCapacityUsed();
+    }
+
+    return totalUsedPeons;
+  }
+
+  @Override
+  public long getLazyTaskSlotCount()
+  {
+    long totalLazyPeons = 0;
+    for (Worker worker : getLazyWorkers()) {
+      totalLazyPeons += worker.getCapacity();
+    }
+
+    return totalLazyPeons;
+  }
+
+  @Override
+  public long getBlacklistedTaskSlotCount()
+  {
+    long totalBlacklistedPeons = 0;
+    for (ImmutableWorkerInfo worker : getBlackListedWorkers()) {
+      totalBlacklistedPeons += worker.getWorker().getCapacity();
+    }
+
+    return totalBlacklistedPeons;
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index 13bd490..cfbbab4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -314,6 +314,36 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
   }
 
   @Override
+  public long getTotalTaskSlotCount()
+  {
+    return 1;
+  }
+
+  @Override
+  public long getIdleTaskSlotCount()
+  {
+    return runningItem == null ? 1 : 0;
+  }
+
+  @Override
+  public long getUsedTaskSlotCount()
+  {
+    return runningItem == null ? 0 : 1;
+  }
+
+  @Override
+  public long getLazyTaskSlotCount()
+  {
+    return 0;
+  }
+
+  @Override
+  public long getBlacklistedTaskSlotCount()
+  {
+    return 0;
+  }
+
+  @Override
   public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
   {
     return getQueryRunnerImpl(query);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 0145439..a0dbcda 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -42,7 +42,9 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
 import org.apache.druid.server.metrics.TaskCountStatsProvider;
+import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
@@ -50,7 +52,7 @@ import java.util.concurrent.locks.ReentrantLock;
 /**
  * Encapsulates the indexer leadership lifecycle.
  */
-public class TaskMaster implements TaskCountStatsProvider
+public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsProvider
 {
   private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
 
@@ -338,4 +340,64 @@ public class TaskMaster implements TaskCountStatsProvider
       // fail silently since we are stopping anyway
     }
   }
+
+  @Override
+  @Nullable
+  public Long getTotalTaskSlotCount()
+  {
+    Optional<TaskRunner> taskRunner = getTaskRunner();
+    if (taskRunner.isPresent()) {
+      return taskRunner.get().getTotalTaskSlotCount();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  @Nullable
+  public Long getIdleTaskSlotCount()
+  {
+    Optional<TaskRunner> taskRunner = getTaskRunner();
+    if (taskRunner.isPresent()) {
+      return taskRunner.get().getIdleTaskSlotCount();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  @Nullable
+  public Long getUsedTaskSlotCount()
+  {
+    Optional<TaskRunner> taskRunner = getTaskRunner();
+    if (taskRunner.isPresent()) {
+      return taskRunner.get().getUsedTaskSlotCount();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  @Nullable
+  public Long getLazyTaskSlotCount()
+  {
+    Optional<TaskRunner> taskRunner = getTaskRunner();
+    if (taskRunner.isPresent()) {
+      return taskRunner.get().getLazyTaskSlotCount();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  @Nullable
+  public Long getBlacklistedTaskSlotCount()
+  {
+    Optional<TaskRunner> taskRunner = getTaskRunner();
+    if (taskRunner.isPresent()) {
+      return taskRunner.get().getBlacklistedTaskSlotCount();
+    } else {
+      return null;
+    }
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
index e9dab7f..8623e82 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
@@ -121,4 +121,17 @@ public interface TaskRunner
    * @return ScalingStats if the runner has an underlying resource which can scale, Optional.absent() otherwise
    */
   Optional<ScalingStats> getScalingStats();
+
+  /**
+   * APIs useful for emitting statistics for @TaskSlotCountStatsMonitor
+  */
+  long getTotalTaskSlotCount();
+
+  long getIdleTaskSlotCount();
+
+  long getUsedTaskSlotCount();
+
+  long getLazyTaskSlotCount();
+
+  long getBlacklistedTaskSlotCount();
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
index 82cf9bc..e463d83 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -95,6 +95,7 @@ public class ThreadingTaskRunner
   private final MultipleFileTaskReportFileWriter taskReportFileWriter;
   private final ListeningExecutorService taskExecutor;
   private final ListeningExecutorService controlThreadExecutor;
+  private final WorkerConfig workerConfig;
 
   private volatile boolean stopping = false;
 
@@ -116,6 +117,7 @@ public class ThreadingTaskRunner
     this.node = node;
     this.appenderatorsManager = appenderatorsManager;
     this.taskReportFileWriter = (MultipleFileTaskReportFileWriter) taskReportFileWriter;
+    this.workerConfig = workerConfig;
     this.taskExecutor = MoreExecutors.listeningDecorator(
         Execs.multiThreaded(workerConfig.getCapacity(), "threading-task-runner-executor-%d")
     );
@@ -452,6 +454,36 @@ public class ThreadingTaskRunner
   }
 
   @Override
+  public long getTotalTaskSlotCount()
+  {
+    return workerConfig.getCapacity();
+  }
+
+  @Override
+  public long getIdleTaskSlotCount()
+  {
+    return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0);
+  }
+
+  @Override
+  public long getUsedTaskSlotCount()
+  {
+    return getRunningTasks().size();
+  }
+
+  @Override
+  public long getLazyTaskSlotCount()
+  {
+    return 0;
+  }
+
+  @Override
+  public long getBlacklistedTaskSlotCount()
+  {
+    return 0;
+  }
+
+  @Override
   public <T> QueryRunner<T> getQueryRunnerForIntervals(
       Query<T> query,
       Iterable<Interval> intervals
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index d8a7cb6..67e86a0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
+import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -1342,6 +1343,11 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
     ).collect(Collectors.toList());
   }
 
+  public Collection<ImmutableWorkerInfo> getBlackListedWorkers()
+  {
+    return ImmutableList.copyOf(Collections2.transform(blackListedWorkers.values(), WorkerHolder::toImmutable));
+  }
+
   /**
    * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} , used for read only.
    */
@@ -1547,6 +1553,61 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
     }
   }
 
+  @Override
+  public long getTotalTaskSlotCount()
+  {
+    long totalPeons = 0;
+    for (ImmutableWorkerInfo worker : getWorkers()) {
+      totalPeons += worker.getWorker().getCapacity();
+    }
+
+    return totalPeons;
+  }
+
+  @Override
+  public long getIdleTaskSlotCount()
+  {
+    long totalIdlePeons = 0;
+    for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) {
+      totalIdlePeons += worker.getAvailableCapacity();
+    }
+
+    return totalIdlePeons;
+  }
+
+  @Override
+  public long getUsedTaskSlotCount()
+  {
+    long totalUsedPeons = 0;
+    for (ImmutableWorkerInfo worker : getWorkers()) {
+      totalUsedPeons += worker.getCurrCapacityUsed();
+    }
+
+    return totalUsedPeons;
+  }
+
+  @Override
+  public long getLazyTaskSlotCount()
+  {
+    long totalLazyPeons = 0;
+    for (Worker worker : getLazyWorkers()) {
+      totalLazyPeons += worker.getCapacity();
+    }
+
+    return totalLazyPeons;
+  }
+
+  @Override
+  public long getBlacklistedTaskSlotCount()
+  {
+    long totalBlacklistedPeons = 0;
+    for (ImmutableWorkerInfo worker : getBlackListedWorkers()) {
+      totalBlacklistedPeons += worker.getWorker().getCapacity();
+    }
+
+    return totalBlacklistedPeons;
+  }
+
   private static class HttpRemoteTaskRunnerWorkItem extends RemoteTaskRunnerWorkItem
   {
     enum State
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 80ce36f..e63b3e4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -383,5 +383,35 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
     {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public long getTotalTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getIdleTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getUsedTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getLazyTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getBlacklistedTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index b23e063..e7bf022 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -104,16 +104,26 @@ public class RemoteTaskRunnerTest
   {
     doSetup();
 
+    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
+    Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
+    Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount());
+
     ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
 
     Assert.assertTrue(taskAnnounced(task.getId()));
     mockWorkerRunningTask(task);
     Assert.assertTrue(workerRunningTask(task.getId()));
+
     mockWorkerCompleteSuccessfulTask(task);
     Assert.assertTrue(workerCompletedTask(result));
-
     Assert.assertEquals(task.getId(), result.get().getId());
     Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
+
+    cf.delete().guaranteed().forPath(JOINER.join(STATUS_PATH, task.getId()));
+
+    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
+    Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
+    Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount());
   }
 
   @Test
@@ -421,6 +431,9 @@ public class RemoteTaskRunnerTest
   public void testWorkerRemoved() throws Exception
   {
     doSetup();
+    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
+    Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
+
     Future<TaskStatus> future = remoteTaskRunner.run(task);
 
     Assert.assertTrue(taskAnnounced(task.getId()));
@@ -449,6 +462,9 @@ public class RemoteTaskRunnerTest
         )
     );
     Assert.assertNull(cf.checkExists().forPath(STATUS_PATH));
+
+    Assert.assertEquals(0, remoteTaskRunner.getTotalTaskSlotCount());
+    Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount());
   }
 
   @Test
@@ -621,6 +637,9 @@ public class RemoteTaskRunnerTest
     );
     Assert.assertEquals(1, lazyworkers.size());
     Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size());
+    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
+    Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount());
+    Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount());
   }
 
   @Test
@@ -931,10 +950,12 @@ public class RemoteTaskRunnerTest
     mockWorkerCompleteFailedTask(task1);
     Assert.assertTrue(taskFuture1.get().isFailure());
     Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
+    Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
 
     Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2);
     Assert.assertTrue(taskAnnounced(task2.getId()));
     mockWorkerRunningTask(task2);
+    Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
 
     Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3);
     Assert.assertTrue(taskAnnounced(task3.getId()));
@@ -942,10 +963,12 @@ public class RemoteTaskRunnerTest
     mockWorkerCompleteFailedTask(task3);
     Assert.assertTrue(taskFuture3.get().isFailure());
     Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
+    Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount());
 
     mockWorkerCompleteSuccessfulTask(task2);
     Assert.assertTrue(taskFuture2.get().isSuccess());
     Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
+    Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
   }
 
   @Test
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
index 41a4366..5fc2989 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
@@ -269,6 +269,36 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
   }
 
   @Override
+  public long getTotalTaskSlotCount()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getIdleTaskSlotCount()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getUsedTaskSlotCount()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getLazyTaskSlotCount()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getBlacklistedTaskSlotCount()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void start()
   {
     // No state startup required
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index fe1df77..2ca1dff 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -842,6 +842,10 @@ public class HttpRemoteTaskRunnerTest
 
     taskRunner.start();
 
+    Assert.assertEquals(0, taskRunner.getTotalTaskSlotCount());
+    Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount());
+    Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount());
+
     AtomicInteger ticks = new AtomicInteger();
 
     DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
@@ -884,12 +888,20 @@ public class HttpRemoteTaskRunnerTest
 
     druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1));
 
+    Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount());
+    Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount());
+
     taskRunner.run(task1);
 
     while (ticks.get() < 1) {
       Thread.sleep(100);
     }
 
+    Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount());
+    Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
+
     DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
         new DruidNode("service", "host2", false, 8080, null, true, false),
         NodeRole.MIDDLE_MANAGER,
@@ -918,12 +930,20 @@ public class HttpRemoteTaskRunnerTest
 
     druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode2));
 
+    Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
+
     taskRunner.run(task2);
 
     while (ticks.get() < 2) {
       Thread.sleep(100);
     }
 
+    Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount());
+    Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
+
     DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode(
         new DruidNode("service", "host3", false, 8080, null, true, false),
         NodeRole.MIDDLE_MANAGER,
@@ -952,6 +972,11 @@ public class HttpRemoteTaskRunnerTest
 
     druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode3));
 
+    Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
+    Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount());
+
     Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
     Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());
 
@@ -960,6 +985,11 @@ public class HttpRemoteTaskRunnerTest
         Iterables.getOnlyElement(taskRunner.markWorkersLazy(Predicates.alwaysTrue(), Integer.MAX_VALUE))
                  .getHost()
     );
+
+    Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount());
+    Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount());
   }
 
   /*
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 52373ea..3a73a38 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -453,6 +453,36 @@ public class OverlordTest
     }
 
     @Override
+    public long getTotalTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getIdleTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getUsedTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getLazyTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getBlacklistedTaskSlotCount()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
     public void start()
     {
       //Do nothing
diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java
new file mode 100644
index 0000000..d8accc8
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.inject.Inject;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+
+public class TaskSlotCountStatsMonitor extends AbstractMonitor
+{
+  private final TaskSlotCountStatsProvider statsProvider;
+
+  @Inject
+  public TaskSlotCountStatsMonitor(
+      TaskSlotCountStatsProvider statsProvider
+  )
+  {
+    this.statsProvider = statsProvider;
+  }
+
+  @Override
+  public boolean doMonitor(ServiceEmitter emitter)
+  {
+    emit(emitter, "taskSlot/total/count", statsProvider.getTotalTaskSlotCount());
+    emit(emitter, "taskSlot/idle/count", statsProvider.getIdleTaskSlotCount());
+    emit(emitter, "taskSlot/used/count", statsProvider.getUsedTaskSlotCount());
+    emit(emitter, "taskSlot/lazy/count", statsProvider.getLazyTaskSlotCount());
+    emit(emitter, "taskSlot/blacklisted/count", statsProvider.getBlacklistedTaskSlotCount());
+    return true;
+  }
+
+  private void emit(ServiceEmitter emitter, String key, Long count)
+  {
+    final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
+    if (count != null) {
+      emitter.emit(builder.build(key, count.longValue()));
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
new file mode 100644
index 0000000..eb46fa5
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import javax.annotation.Nullable;
+
+public interface TaskSlotCountStatsProvider
+{
+  /**
+   * Return the number of total task slots during emission period.
+   */
+  @Nullable
+  Long getTotalTaskSlotCount();
+
+  /**
+   * Return the number of idle task slots during emission period.
+   */
+  @Nullable
+  Long getIdleTaskSlotCount();
+
+  /**
+   * Return the number of used task slots during emission period.
+   */
+  @Nullable
+  Long getUsedTaskSlotCount();
+
+  /**
+   * Return the total number of task slots in lazy marked middlemanagers and indexers during emission period.
+   */
+  @Nullable
+  Long getLazyTaskSlotCount();
+
+  /**
+   * Return the total number of task slots in blacklisted middlemanagers and indexers during emission period.
+   */
+  @Nullable
+  Long getBlacklistedTaskSlotCount();
+}
diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
new file mode 100644
index 0000000..2c5c52b
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TaskSlotCountStatsMonitorTest
+{
+  private TaskSlotCountStatsProvider statsProvider;
+
+  @Before
+  public void setUp()
+  {
+    statsProvider = new TaskSlotCountStatsProvider()
+    {
+      @Override
+      public Long getTotalTaskSlotCount()
+      {
+        return 1L;
+      }
+
+      @Override
+      public Long getIdleTaskSlotCount()
+      {
+        return 1L;
+      }
+
+      @Override
+      public Long getUsedTaskSlotCount()
+      {
+        return 1L;
+      }
+
+      @Override
+      public Long getLazyTaskSlotCount()
+      {
+        return 1L;
+      }
+
+      @Override
+      public Long getBlacklistedTaskSlotCount()
+      {
+        return 1L;
+      }
+    };
+  }
+
+  @Test
+  public void testMonitor()
+  {
+    final TaskSlotCountStatsMonitor monitor = new TaskSlotCountStatsMonitor(statsProvider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+    monitor.doMonitor(emitter);
+    Assert.assertEquals(5, emitter.getEvents().size());
+    Assert.assertEquals("taskSlot/total/count", emitter.getEvents().get(0).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(0).toMap().get("value"));
+    Assert.assertEquals("taskSlot/idle/count", emitter.getEvents().get(1).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(1).toMap().get("value"));
+    Assert.assertEquals("taskSlot/used/count", emitter.getEvents().get(2).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(2).toMap().get("value"));
+    Assert.assertEquals("taskSlot/lazy/count", emitter.getEvents().get(3).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(3).toMap().get("value"));
+    Assert.assertEquals("taskSlot/blacklisted/count", emitter.getEvents().get(4).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value"));
+  }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index e08e5ea..50cef33 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -113,6 +113,7 @@ import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
 import org.apache.druid.server.metrics.TaskCountStatsProvider;
+import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthenticationUtils;
 import org.apache.druid.server.security.Authenticator;
@@ -179,6 +180,7 @@ public class CliOverlord extends ServerRunnable
 
             binder.bind(TaskMaster.class).in(ManageLifecycle.class);
             binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class);
+            binder.bind(TaskSlotCountStatsProvider.class).to(TaskMaster.class);
 
             binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class);
             binder.bind(
diff --git a/website/.spelling b/website/.spelling
index dcf6140..a783d32 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1192,6 +1192,7 @@ RealtimeMetricsMonitor
 Sys
 SysMonitor
 TaskCountStatsMonitor
+TaskSlotCountStatsMonitor
 bufferCapacity
 bufferpoolName
 cms


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org