You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2021/11/19 06:59:41 UTC

[druid] branch master updated: Add worker category dimension (#11554)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c51136  Add worker category dimension (#11554)
3c51136 is described below

commit 3c511360989a5710e47f7d40cdaa5873f60652d5
Author: Nikhil Navadiya <nn...@gmail.com>
AuthorDate: Thu Nov 18 22:59:07 2021 -0800

    Add worker category dimension (#11554)
    
    * Add worker category as dimension in TaskSlotCountStatsMonitor
    
    * Change description
    
    * Add workerConfig as field
    
    * Modify HttpRemoteTaskRunnerTest to test worker category in taskslot metrics
    
    * Fixing tests
    
    * Fixing alerts
    
    * Adding unit test in SingleTaskBackgroundRunnerTest for task slot metrics APIs
    
    * Resolving false positive spell check
    
    * addressing comments
    
    * throw UnsupportedOperationException for tasklotmetrics APIs in SingleTaskBackgroundRunner
    
    Co-authored-by: Nikhil Navadiya <nn...@twitter.com>
---
 docs/operations/metrics.md                         | 10 ++--
 .../main/resources/defaultMetricDimensions.json    | 10 ++--
 .../druid/indexing/overlord/ForkingTaskRunner.java | 32 ++++++++---
 .../druid/indexing/overlord/RemoteTaskRunner.java  | 56 +++++++++++++------
 .../overlord/SingleTaskBackgroundRunner.java       | 26 +++++----
 .../apache/druid/indexing/overlord/TaskMaster.java | 10 ++--
 .../apache/druid/indexing/overlord/TaskRunner.java | 13 ++---
 .../indexing/overlord/ThreadingTaskRunner.java     | 28 +++++++---
 .../overlord/hrtr/HttpRemoteTaskRunner.java        | 56 +++++++++++++------
 .../indexing/common/task/IngestionTestBase.java    | 11 ++--
 .../indexing/overlord/RemoteTaskRunnerTest.java    | 35 ++++++------
 .../druid/indexing/overlord/TaskQueueTest.java     | 21 ++++----
 .../druid/indexing/overlord/TestTaskRunner.java    | 10 ++--
 .../overlord/hrtr/HttpRemoteTaskRunnerTest.java    | 63 +++++++++++++---------
 .../druid/indexing/overlord/http/OverlordTest.java | 12 +++--
 .../server/metrics/TaskSlotCountStatsMonitor.java  | 11 ++--
 .../server/metrics/TaskSlotCountStatsProvider.java | 12 +++--
 .../metrics/TaskSlotCountStatsMonitorTest.java     | 23 ++++----
 18 files changed, 278 insertions(+), 161 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 1c7c736..ef39085 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -210,11 +210,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
 |`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
 |`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
 |`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
-|`taskSlot/total/count`|Number of total task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
-|`taskSlot/idle/count`|Number of idle task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
-|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
-|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
-|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.| |Varies.|
+|`taskSlot/total/count`|Number of total task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
+|`taskSlot/idle/count`|Number of idle task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
+|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
+|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
+|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
 
 ## Shuffle metrics (Native parallel task)
 
diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index 5ac0886..6c0fa75 100644
--- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -63,11 +63,11 @@
   "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
   "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
 
-  "taskSlot/total/count" : { "dimensions" : [], "type" : "gauge" },
-  "taskSlot/idle/count" : { "dimensions" : [], "type" : "gauge" },
-  "taskSlot/busy/count" : { "dimensions" : [], "type" : "gauge" },
-  "taskSlot/lazy/count" : { "dimensions" : [], "type" : "gauge" },
-  "taskSlot/blacklisted/count" : { "dimensions" : [], "type" : "gauge" },
+  "taskSlot/total/count" : { "dimensions" : ["category"], "type" : "gauge" },
+  "taskSlot/idle/count" : { "dimensions" : ["category"], "type" : "gauge" },
+  "taskSlot/busy/count" : { "dimensions" : ["category"], "type" : "gauge" },
+  "taskSlot/lazy/count" : { "dimensions" : ["category"], "type" : "gauge" },
+  "taskSlot/blacklisted/count" : { "dimensions" : ["category"], "type" : "gauge" },
 
   "task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer" },
   "segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 212c039..f9c36ef 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -27,6 +27,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -98,6 +99,7 @@ public class ForkingTaskRunner
   private final ListeningExecutorService exec;
   private final PortFinder portFinder;
   private final StartupLoggingConfig startupLoggingConfig;
+  private final WorkerConfig workerConfig;
 
   private volatile boolean stopping = false;
 
@@ -120,6 +122,7 @@ public class ForkingTaskRunner
     this.node = node;
     this.portFinder = new PortFinder(config.getStartPort(), config.getEndPort(), config.getPorts());
     this.startupLoggingConfig = startupLoggingConfig;
+    this.workerConfig = workerConfig;
     this.exec = MoreExecutors.listeningDecorator(
         Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d")
     );
@@ -666,7 +669,15 @@ public class ForkingTaskRunner
   }
 
   @Override
-  public long getTotalTaskSlotCount()
+  public Map<String, Long> getTotalTaskSlotCount()
+  {
+    if (config.getPorts() != null && !config.getPorts().isEmpty()) {
+      return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getPorts().size()));
+    }
+    return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getEndPort() - config.getStartPort() + 1));
+  }
+
+  public long getTotalTaskSlotCountLong()
   {
     if (config.getPorts() != null && !config.getPorts().isEmpty()) {
       return config.getPorts().size();
@@ -675,27 +686,32 @@ public class ForkingTaskRunner
   }
 
   @Override
-  public long getIdleTaskSlotCount()
+  public Map<String, Long> getIdleTaskSlotCount()
   {
-    return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0);
+    return ImmutableMap.of(workerConfig.getCategory(), Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0));
   }
 
   @Override
-  public long getUsedTaskSlotCount()
+  public Map<String, Long> getUsedTaskSlotCount()
+  {
+    return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(portFinder.findUsedPortCount()));
+  }
+
+  public long getUsedTaskSlotCountLong()
   {
     return portFinder.findUsedPortCount();
   }
 
   @Override
-  public long getLazyTaskSlotCount()
+  public Map<String, Long> getLazyTaskSlotCount()
   {
-    return 0;
+    return ImmutableMap.of(workerConfig.getCategory(), 0L);
   }
 
   @Override
-  public long getBlacklistedTaskSlotCount()
+  public Map<String, Long> getBlacklistedTaskSlotCount()
   {
-    return 0;
+    return ImmutableMap.of(workerConfig.getCategory(), 0L);
   }
 
   protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index de0713b..3c1cbb6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -95,6 +95,7 @@ import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -1514,55 +1515,80 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   }
 
   @Override
-  public long getTotalTaskSlotCount()
+  public Map<String, Long> getTotalTaskSlotCount()
   {
-    long totalPeons = 0;
+    Map<String, Long> totalPeons = new HashMap<>();
     for (ImmutableWorkerInfo worker : getWorkers()) {
-      totalPeons += worker.getWorker().getCapacity();
+      String workerCategory = worker.getWorker().getCategory();
+      int workerCapacity = worker.getWorker().getCapacity();
+      totalPeons.compute(
+          workerCategory,
+          (category, totalCapacity) -> totalCapacity == null ? workerCapacity : totalCapacity + workerCapacity
+      );
     }
 
     return totalPeons;
   }
 
   @Override
-  public long getIdleTaskSlotCount()
+  public Map<String, Long> getIdleTaskSlotCount()
   {
-    long totalIdlePeons = 0;
+    Map<String, Long> totalIdlePeons = new HashMap<>();
     for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) {
-      totalIdlePeons += worker.getAvailableCapacity();
+      String workerCategory = worker.getWorker().getCategory();
+      int workerAvailableCapacity = worker.getAvailableCapacity();
+      totalIdlePeons.compute(
+          workerCategory,
+          (category, availableCapacity) -> availableCapacity == null ? workerAvailableCapacity : availableCapacity + workerAvailableCapacity
+      );
     }
 
     return totalIdlePeons;
   }
 
   @Override
-  public long getUsedTaskSlotCount()
+  public Map<String, Long> getUsedTaskSlotCount()
   {
-    long totalUsedPeons = 0;
+    Map<String, Long> totalUsedPeons = new HashMap<>();
     for (ImmutableWorkerInfo worker : getWorkers()) {
-      totalUsedPeons += worker.getCurrCapacityUsed();
+      String workerCategory = worker.getWorker().getCategory();
+      int workerUsedCapacity = worker.getCurrCapacityUsed();
+      totalUsedPeons.compute(
+          workerCategory,
+          (category, usedCapacity) -> usedCapacity == null ? workerUsedCapacity : usedCapacity + workerUsedCapacity
+      );
     }
 
     return totalUsedPeons;
   }
 
   @Override
-  public long getLazyTaskSlotCount()
+  public Map<String, Long> getLazyTaskSlotCount()
   {
-    long totalLazyPeons = 0;
+    Map<String, Long> totalLazyPeons = new HashMap<>();
     for (Worker worker : getLazyWorkers()) {
-      totalLazyPeons += worker.getCapacity();
+      String workerCategory = worker.getCategory();
+      int workerLazyPeons = worker.getCapacity();
+      totalLazyPeons.compute(
+          workerCategory,
+          (category, lazyPeons) -> lazyPeons == null ? workerLazyPeons : lazyPeons + workerLazyPeons
+      );
     }
 
     return totalLazyPeons;
   }
 
   @Override
-  public long getBlacklistedTaskSlotCount()
+  public Map<String, Long> getBlacklistedTaskSlotCount()
   {
-    long totalBlacklistedPeons = 0;
+    Map<String, Long> totalBlacklistedPeons = new HashMap<>();
     for (ImmutableWorkerInfo worker : getBlackListedWorkers()) {
-      totalBlacklistedPeons += worker.getWorker().getCapacity();
+      String workerCategory = worker.getWorker().getCategory();
+      int workerBlacklistedPeons = worker.getWorker().getCapacity();
+      totalBlacklistedPeons.compute(
+          workerCategory,
+          (category, blacklistedPeons) -> blacklistedPeons == null ? workerBlacklistedPeons : blacklistedPeons + workerBlacklistedPeons
+      );
     }
 
     return totalBlacklistedPeons;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index 24dba4f..b421f97 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -59,6 +59,7 @@ import org.joda.time.Interval;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
@@ -334,34 +335,39 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
     return Optional.absent();
   }
 
+  /* This method should be never called in peons */
   @Override
-  public long getTotalTaskSlotCount()
+  public Map<String, Long> getTotalTaskSlotCount()
   {
-    return 1;
+    throw new UnsupportedOperationException();
   }
 
+  /* This method should be never called in peons */
   @Override
-  public long getIdleTaskSlotCount()
+  public Map<String, Long> getIdleTaskSlotCount()
   {
-    return runningItem == null ? 1 : 0;
+    throw new UnsupportedOperationException();
   }
 
+  /* This method should be never called in peons */
   @Override
-  public long getUsedTaskSlotCount()
+  public Map<String, Long> getUsedTaskSlotCount()
   {
-    return runningItem == null ? 0 : 1;
+    throw new UnsupportedOperationException();
   }
 
+  /* This method should be never called in peons */
   @Override
-  public long getLazyTaskSlotCount()
+  public Map<String, Long> getLazyTaskSlotCount()
   {
-    return 0;
+    throw new UnsupportedOperationException();
   }
 
+  /* This method should be never called in peons */
   @Override
-  public long getBlacklistedTaskSlotCount()
+  public Map<String, Long> getBlacklistedTaskSlotCount()
   {
-    return 0;
+    throw new UnsupportedOperationException();
   }
 
   @Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 54287ed..5825a2f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -346,7 +346,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
 
   @Override
   @Nullable
-  public Long getTotalTaskSlotCount()
+  public Map<String, Long> getTotalTaskSlotCount()
   {
     Optional<TaskRunner> taskRunner = getTaskRunner();
     if (taskRunner.isPresent()) {
@@ -358,7 +358,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
 
   @Override
   @Nullable
-  public Long getIdleTaskSlotCount()
+  public Map<String, Long> getIdleTaskSlotCount()
   {
     Optional<TaskRunner> taskRunner = getTaskRunner();
     if (taskRunner.isPresent()) {
@@ -370,7 +370,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
 
   @Override
   @Nullable
-  public Long getUsedTaskSlotCount()
+  public Map<String, Long> getUsedTaskSlotCount()
   {
     Optional<TaskRunner> taskRunner = getTaskRunner();
     if (taskRunner.isPresent()) {
@@ -382,7 +382,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
 
   @Override
   @Nullable
-  public Long getLazyTaskSlotCount()
+  public Map<String, Long> getLazyTaskSlotCount()
   {
     Optional<TaskRunner> taskRunner = getTaskRunner();
     if (taskRunner.isPresent()) {
@@ -394,7 +394,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
 
   @Override
   @Nullable
-  public Long getBlacklistedTaskSlotCount()
+  public Map<String, Long> getBlacklistedTaskSlotCount()
   {
     Optional<TaskRunner> taskRunner = getTaskRunner();
     if (taskRunner.isPresent()) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
index 8623e82..5cbdabb 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
@@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import javax.annotation.Nullable;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executor;
 
 /**
@@ -124,14 +125,14 @@ public interface TaskRunner
 
   /**
    * APIs useful for emitting statistics for @TaskSlotCountStatsMonitor
-  */
-  long getTotalTaskSlotCount();
+   */
+  Map<String, Long> getTotalTaskSlotCount();
 
-  long getIdleTaskSlotCount();
+  Map<String, Long> getIdleTaskSlotCount();
 
-  long getUsedTaskSlotCount();
+  Map<String, Long> getUsedTaskSlotCount();
 
-  long getLazyTaskSlotCount();
+  Map<String, Long> getLazyTaskSlotCount();
 
-  long getBlacklistedTaskSlotCount();
+  Map<String, Long> getBlacklistedTaskSlotCount();
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
index bff6d61..79a67b4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Futures;
@@ -63,6 +64,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -452,33 +454,43 @@ public class ThreadingTaskRunner
   }
 
   @Override
-  public long getTotalTaskSlotCount()
+  public Map<String, Long> getTotalTaskSlotCount()
+  {
+    return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(workerConfig.getCapacity()));
+  }
+
+  public long getTotalTaskSlotCountLong()
   {
     return workerConfig.getCapacity();
   }
 
   @Override
-  public long getIdleTaskSlotCount()
+  public Map<String, Long> getIdleTaskSlotCount()
   {
-    return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0);
+    return ImmutableMap.of(workerConfig.getCategory(), Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0));
   }
 
   @Override
-  public long getUsedTaskSlotCount()
+  public Map<String, Long> getUsedTaskSlotCount()
+  {
+    return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(getRunningTasks().size()));
+  }
+
+  public long getUsedTaskSlotCountLong()
   {
     return getRunningTasks().size();
   }
 
   @Override
-  public long getLazyTaskSlotCount()
+  public Map<String, Long> getLazyTaskSlotCount()
   {
-    return 0;
+    return ImmutableMap.of(workerConfig.getCategory(), 0L);
   }
 
   @Override
-  public long getBlacklistedTaskSlotCount()
+  public Map<String, Long> getBlacklistedTaskSlotCount()
   {
-    return 0;
+    return ImmutableMap.of(workerConfig.getCategory(), 0L);
   }
 
   @Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 59085a2..45af215 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -90,6 +90,7 @@ import java.io.InputStream;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -1628,55 +1629,80 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   }
 
   @Override
-  public long getTotalTaskSlotCount()
+  public Map<String, Long> getTotalTaskSlotCount()
   {
-    long totalPeons = 0;
+    Map<String, Long> totalPeons = new HashMap<>();
     for (ImmutableWorkerInfo worker : getWorkers()) {
-      totalPeons += worker.getWorker().getCapacity();
+      String workerCategory = worker.getWorker().getCategory();
+      int workerCapacity = worker.getWorker().getCapacity();
+      totalPeons.compute(
+          workerCategory,
+          (category, totalCapacity) -> totalCapacity == null ? workerCapacity : totalCapacity + workerCapacity
+      );
     }
 
     return totalPeons;
   }
 
   @Override
-  public long getIdleTaskSlotCount()
+  public Map<String, Long> getIdleTaskSlotCount()
   {
-    long totalIdlePeons = 0;
+    Map<String, Long> totalIdlePeons = new HashMap<>();
     for (ImmutableWorkerInfo worker : getWorkersEligibleToRunTasks().values()) {
-      totalIdlePeons += worker.getAvailableCapacity();
+      String workerCategory = worker.getWorker().getCategory();
+      int workerAvailableCapacity = worker.getAvailableCapacity();
+      totalIdlePeons.compute(
+          workerCategory,
+          (category, availableCapacity) -> availableCapacity == null ? workerAvailableCapacity : availableCapacity + workerAvailableCapacity
+      );
     }
 
     return totalIdlePeons;
   }
 
   @Override
-  public long getUsedTaskSlotCount()
+  public Map<String, Long> getUsedTaskSlotCount()
   {
-    long totalUsedPeons = 0;
+    Map<String, Long> totalUsedPeons = new HashMap<>();
     for (ImmutableWorkerInfo worker : getWorkers()) {
-      totalUsedPeons += worker.getCurrCapacityUsed();
+      String workerCategory = worker.getWorker().getCategory();
+      int workerUsedCapacity = worker.getCurrCapacityUsed();
+      totalUsedPeons.compute(
+          workerCategory,
+          (category, usedCapacity) -> usedCapacity == null ? workerUsedCapacity : usedCapacity + workerUsedCapacity
+      );
     }
 
     return totalUsedPeons;
   }
 
   @Override
-  public long getLazyTaskSlotCount()
+  public Map<String, Long> getLazyTaskSlotCount()
   {
-    long totalLazyPeons = 0;
+    Map<String, Long> totalLazyPeons = new HashMap<>();
     for (Worker worker : getLazyWorkers()) {
-      totalLazyPeons += worker.getCapacity();
+      String workerCategory = worker.getCategory();
+      int workerLazyPeons = worker.getCapacity();
+      totalLazyPeons.compute(
+          workerCategory,
+          (category, lazyPeons) -> lazyPeons == null ? workerLazyPeons : lazyPeons + workerLazyPeons
+      );
     }
 
     return totalLazyPeons;
   }
 
   @Override
-  public long getBlacklistedTaskSlotCount()
+  public Map<String, Long> getBlacklistedTaskSlotCount()
   {
-    long totalBlacklistedPeons = 0;
+    Map<String, Long> totalBlacklistedPeons = new HashMap<>();
     for (ImmutableWorkerInfo worker : getBlackListedWorkers()) {
-      totalBlacklistedPeons += worker.getWorker().getCapacity();
+      String workerCategory = worker.getWorker().getCategory();
+      int workerBlacklistedPeons = worker.getWorker().getCapacity();
+      totalBlacklistedPeons.compute(
+          workerCategory,
+          (category, blacklistedPeons) -> blacklistedPeons == null ? workerBlacklistedPeons : blacklistedPeons + workerBlacklistedPeons
+      );
     }
 
     return totalBlacklistedPeons;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 81a6bcc..f2cdf5c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -82,6 +82,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 
@@ -405,31 +406,31 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
     }
 
     @Override
-    public long getTotalTaskSlotCount()
+    public Map<String, Long> getTotalTaskSlotCount()
     {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public long getIdleTaskSlotCount()
+    public Map<String, Long> getIdleTaskSlotCount()
     {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public long getUsedTaskSlotCount()
+    public Map<String, Long> getUsedTaskSlotCount()
     {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public long getLazyTaskSlotCount()
+    public Map<String, Long> getLazyTaskSlotCount()
     {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public long getBlacklistedTaskSlotCount()
+    public Map<String, Long> getBlacklistedTaskSlotCount()
     {
       throw new UnsupportedOperationException();
     }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 589c6e1..edb6f64 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
 import org.apache.druid.indexing.worker.Worker;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -107,9 +108,9 @@ public class RemoteTaskRunnerTest
   {
     doSetup();
 
-    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
-    Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
-    Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount());
+    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
 
     ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
 
@@ -124,9 +125,9 @@ public class RemoteTaskRunnerTest
 
     cf.delete().guaranteed().forPath(JOINER.join(STATUS_PATH, task.getId()));
 
-    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
-    Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
-    Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount());
+    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
   }
 
   @Test
@@ -437,8 +438,8 @@ public class RemoteTaskRunnerTest
   public void testWorkerRemoved() throws Exception
   {
     doSetup();
-    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
-    Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount());
+    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
 
     Future<TaskStatus> future = remoteTaskRunner.run(task);
 
@@ -471,8 +472,8 @@ public class RemoteTaskRunnerTest
     );
     Assert.assertNull(cf.checkExists().forPath(STATUS_PATH));
 
-    Assert.assertEquals(0, remoteTaskRunner.getTotalTaskSlotCount());
-    Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount());
+    Assert.assertFalse(remoteTaskRunner.getTotalTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
+    Assert.assertFalse(remoteTaskRunner.getIdleTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
   }
 
   @Test
@@ -677,9 +678,9 @@ public class RemoteTaskRunnerTest
     );
     Assert.assertEquals(1, lazyworkers.size());
     Assert.assertEquals(1, remoteTaskRunner.getLazyWorkers().size());
-    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount());
-    Assert.assertEquals(0, remoteTaskRunner.getIdleTaskSlotCount());
-    Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount());
+    Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertFalse(remoteTaskRunner.getIdleTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
+    Assert.assertEquals(3, remoteTaskRunner.getLazyTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
   }
 
   @Test
@@ -990,12 +991,12 @@ public class RemoteTaskRunnerTest
     mockWorkerCompleteFailedTask(task1);
     Assert.assertTrue(taskFuture1.get().isFailure());
     Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
-    Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
+    Assert.assertFalse(remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
 
     Future<TaskStatus> taskFuture2 = remoteTaskRunner.run(task2);
     Assert.assertTrue(taskAnnounced(task2.getId()));
     mockWorkerRunningTask(task2);
-    Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
+    Assert.assertFalse(remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
 
     Future<TaskStatus> taskFuture3 = remoteTaskRunner.run(task3);
     Assert.assertTrue(taskAnnounced(task3.getId()));
@@ -1003,12 +1004,12 @@ public class RemoteTaskRunnerTest
     mockWorkerCompleteFailedTask(task3);
     Assert.assertTrue(taskFuture3.get().isFailure());
     Assert.assertEquals(1, remoteTaskRunner.getBlackListedWorkers().size());
-    Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount());
+    Assert.assertEquals(3, remoteTaskRunner.getBlacklistedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
 
     mockWorkerCompleteSuccessfulTask(task2);
     Assert.assertTrue(taskFuture2.get().isSuccess());
     Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
-    Assert.assertEquals(0, remoteTaskRunner.getBlacklistedTaskSlotCount());
+    Assert.assertFalse(remoteTaskRunner.getBlacklistedTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
   }
 
   @Test
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 0b3d872..dc5fc0e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
 import org.apache.druid.indexing.overlord.config.TaskLockConfig;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
@@ -493,33 +494,33 @@ public class TaskQueueTest extends IngestionTestBase
     }
 
     @Override
-    public long getTotalTaskSlotCount()
+    public Map<String, Long> getTotalTaskSlotCount()
     {
-      return 0;
+      return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
     }
 
     @Override
-    public long getIdleTaskSlotCount()
+    public Map<String, Long> getIdleTaskSlotCount()
     {
-      return 0;
+      return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
     }
 
     @Override
-    public long getUsedTaskSlotCount()
+    public Map<String, Long> getUsedTaskSlotCount()
     {
-      return 0;
+      return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
     }
 
     @Override
-    public long getLazyTaskSlotCount()
+    public Map<String, Long> getLazyTaskSlotCount()
     {
-      return 0;
+      return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
     }
 
     @Override
-    public long getBlacklistedTaskSlotCount()
+    public Map<String, Long> getBlacklistedTaskSlotCount()
     {
-      return 0;
+      return ImmutableMap.of(WorkerConfig.DEFAULT_CATEGORY, 0L);
     }
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
index df293f0..3a5df86 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java
@@ -273,31 +273,31 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
   }
 
   @Override
-  public long getTotalTaskSlotCount()
+  public Map<String, Long> getTotalTaskSlotCount()
   {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public long getIdleTaskSlotCount()
+  public Map<String, Long> getIdleTaskSlotCount()
   {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public long getUsedTaskSlotCount()
+  public Map<String, Long> getUsedTaskSlotCount()
   {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public long getLazyTaskSlotCount()
+  public Map<String, Long> getLazyTaskSlotCount()
   {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public long getBlacklistedTaskSlotCount()
+  public Map<String, Long> getBlacklistedTaskSlotCount()
   {
     throw new UnsupportedOperationException();
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 7eeb50e..5a67611 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -813,6 +813,7 @@ public class HttpRemoteTaskRunnerTest
 
     Task task1 = NoopTask.create("task-id-1", 0);
     Task task2 = NoopTask.create("task-id-2", 0);
+    String additionalWorkerCategory = "category2";
 
     ConcurrentMap<String, CustomFunction> workerHolders = new ConcurrentHashMap<>();
 
@@ -864,9 +865,9 @@ public class HttpRemoteTaskRunnerTest
 
     taskRunner.start();
 
-    Assert.assertEquals(0, taskRunner.getTotalTaskSlotCount());
-    Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount());
-    Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount());
+    Assert.assertTrue(taskRunner.getTotalTaskSlotCount().isEmpty());
+    Assert.assertTrue(taskRunner.getIdleTaskSlotCount().isEmpty());
+    Assert.assertTrue(taskRunner.getUsedTaskSlotCount().isEmpty());
 
     AtomicInteger ticks = new AtomicInteger();
 
@@ -910,9 +911,9 @@ public class HttpRemoteTaskRunnerTest
 
     druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode1));
 
-    Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount());
-    Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount());
-    Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
 
     taskRunner.run(task1);
 
@@ -920,16 +921,16 @@ public class HttpRemoteTaskRunnerTest
       Thread.sleep(100);
     }
 
-    Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount());
-    Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount());
-    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
 
     DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
         new DruidNode("service", "host2", false, 8080, null, true, false),
         NodeRole.MIDDLE_MANAGER,
         ImmutableMap.of(
             WorkerNodeService.DISCOVERY_SERVICE_KEY,
-            new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY)
+            new WorkerNodeService("ip2", 1, "0", additionalWorkerCategory)
         )
     );
 
@@ -952,9 +953,12 @@ public class HttpRemoteTaskRunnerTest
 
     druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode2));
 
-    Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount());
-    Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount());
-    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue());
 
     taskRunner.run(task2);
 
@@ -962,9 +966,12 @@ public class HttpRemoteTaskRunnerTest
       Thread.sleep(100);
     }
 
-    Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount());
-    Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount());
-    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
+    Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory));
+    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue());
 
     DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode(
         new DruidNode("service", "host3", false, 8080, null, true, false),
@@ -994,10 +1001,14 @@ public class HttpRemoteTaskRunnerTest
 
     druidNodeDiscovery.listener.nodesAdded(ImmutableList.of(druidNode3));
 
-    Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount());
-    Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount());
-    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
-    Assert.assertEquals(0, taskRunner.getLazyTaskSlotCount());
+    Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertEquals(1, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory));
+    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(WorkerConfig.DEFAULT_CATEGORY));
+    Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(additionalWorkerCategory));
 
     Assert.assertEquals(task1.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
     Assert.assertEquals(task2.getId(), Iterables.getOnlyElement(taskRunner.getPendingTasks()).getTaskId());
@@ -1008,10 +1019,14 @@ public class HttpRemoteTaskRunnerTest
                  .getHost()
     );
 
-    Assert.assertEquals(3, taskRunner.getTotalTaskSlotCount());
-    Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount());
-    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount());
-    Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount());
+    Assert.assertEquals(2, taskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(1, taskRunner.getTotalTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertEquals(0, taskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertFalse(taskRunner.getIdleTaskSlotCount().containsKey(additionalWorkerCategory));
+    Assert.assertEquals(1, taskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertEquals(0, taskRunner.getUsedTaskSlotCount().get(additionalWorkerCategory).longValue());
+    Assert.assertEquals(1, taskRunner.getLazyTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
+    Assert.assertFalse(taskRunner.getLazyTaskSlotCount().containsKey(additionalWorkerCategory));
   }
 
   /*
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index aa70b2c..5655a13 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -80,9 +80,11 @@ import org.junit.Test;
 import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -455,31 +457,31 @@ public class OverlordTest
     }
 
     @Override
-    public long getTotalTaskSlotCount()
+    public Map<String, Long> getTotalTaskSlotCount()
     {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public long getIdleTaskSlotCount()
+    public Map<String, Long> getIdleTaskSlotCount()
     {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public long getUsedTaskSlotCount()
+    public Map<String, Long> getUsedTaskSlotCount()
     {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public long getLazyTaskSlotCount()
+    public Map<String, Long> getLazyTaskSlotCount()
     {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public long getBlacklistedTaskSlotCount()
+    public Map<String, Long> getBlacklistedTaskSlotCount()
     {
       throw new UnsupportedOperationException();
     }
diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java
index d8accc8..3431e1e 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitor.java
@@ -24,6 +24,8 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.AbstractMonitor;
 
+import java.util.Map;
+
 public class TaskSlotCountStatsMonitor extends AbstractMonitor
 {
   private final TaskSlotCountStatsProvider statsProvider;
@@ -47,11 +49,14 @@ public class TaskSlotCountStatsMonitor extends AbstractMonitor
     return true;
   }
 
-  private void emit(ServiceEmitter emitter, String key, Long count)
+  private void emit(ServiceEmitter emitter, String key, Map<String, Long> counts)
   {
     final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
-    if (count != null) {
-      emitter.emit(builder.build(key, count.longValue()));
+    if (counts != null) {
+      counts.forEach((k, v) -> {
+        builder.setDimension("category", k);
+        emitter.emit(builder.build(key, v));
+      });
     }
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
index eb46fa5..e7a7249 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
@@ -21,35 +21,37 @@ package org.apache.druid.server.metrics;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
+
 public interface TaskSlotCountStatsProvider
 {
   /**
    * Return the number of total task slots during emission period.
    */
   @Nullable
-  Long getTotalTaskSlotCount();
+  Map<String, Long> getTotalTaskSlotCount();
 
   /**
    * Return the number of idle task slots during emission period.
    */
   @Nullable
-  Long getIdleTaskSlotCount();
+  Map<String, Long> getIdleTaskSlotCount();
 
   /**
    * Return the number of used task slots during emission period.
    */
   @Nullable
-  Long getUsedTaskSlotCount();
+  Map<String, Long> getUsedTaskSlotCount();
 
   /**
    * Return the total number of task slots in lazy marked middlemanagers and indexers during emission period.
    */
   @Nullable
-  Long getLazyTaskSlotCount();
+  Map<String, Long> getLazyTaskSlotCount();
 
   /**
    * Return the total number of task slots in blacklisted middlemanagers and indexers during emission period.
    */
   @Nullable
-  Long getBlacklistedTaskSlotCount();
+  Map<String, Long> getBlacklistedTaskSlotCount();
 }
diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
index 2c5c52b..be33a4c 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java
@@ -19,11 +19,14 @@
 
 package org.apache.druid.server.metrics;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Map;
+
 public class TaskSlotCountStatsMonitorTest
 {
   private TaskSlotCountStatsProvider statsProvider;
@@ -34,33 +37,33 @@ public class TaskSlotCountStatsMonitorTest
     statsProvider = new TaskSlotCountStatsProvider()
     {
       @Override
-      public Long getTotalTaskSlotCount()
+      public Map<String, Long> getTotalTaskSlotCount()
       {
-        return 1L;
+        return ImmutableMap.of("c1", 1L);
       }
 
       @Override
-      public Long getIdleTaskSlotCount()
+      public Map<String, Long> getIdleTaskSlotCount()
       {
-        return 1L;
+        return ImmutableMap.of("c1", 1L);
       }
 
       @Override
-      public Long getUsedTaskSlotCount()
+      public Map<String, Long> getUsedTaskSlotCount()
       {
-        return 1L;
+        return ImmutableMap.of("c1", 1L);
       }
 
       @Override
-      public Long getLazyTaskSlotCount()
+      public Map<String, Long> getLazyTaskSlotCount()
       {
-        return 1L;
+        return ImmutableMap.of("c1", 1L);
       }
 
       @Override
-      public Long getBlacklistedTaskSlotCount()
+      public Map<String, Long> getBlacklistedTaskSlotCount()
       {
-        return 1L;
+        return ImmutableMap.of("c1", 1L);
       }
     };
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org