You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2023/02/09 02:07:30 UTC

[druid] branch master updated: Allow users to add additional metadata to ingestion metrics (#13760)

This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 714ac07b52 Allow users to add additional metadata to ingestion metrics (#13760)
714ac07b52 is described below

commit 714ac07b524fcdc4d5a81a8f58af91ed03c78be3
Author: Suneet Saldanha <su...@apache.org>
AuthorDate: Wed Feb 8 18:07:23 2023 -0800

    Allow users to add additional metadata to ingestion metrics (#13760)
    
    * Allow users to add additional metadata to ingestion metrics
    
    When submitting an ingestion spec, users may pass a map of metadata
    in the ingestion spec config that will be added to ingestion metrics.
    
    This will make it possible for operators to tag metrics with other
    metadata that doesn't necessarily line up with the existing tags
    like taskId.
    
    Druid clusters that ingest these metrics can take advantage of the
    nested data columns feature to process this additional metadata.
    
    * rename to tags
    
    * docs
    
    * tests
    
    * fix test
    
    * make code cov happy
    
    * checkstyle
---
 .../util/emitter/service/ServiceMetricEvent.java   |   8 ++
 .../emitter/service/ServiceMetricEventTest.java    |  23 ++++
 docs/operations/metrics.md                         |  99 +++++++++---------
 .../common/TaskRealtimeMetricsMonitorBuilder.java  |   9 +-
 .../common/stats/TaskRealtimeMetricsMonitor.java   |   8 +-
 .../common/task/AbstractBatchIndexTask.java        |   2 +
 .../druid/indexing/common/task/IndexTaskUtils.java |  11 +-
 .../supervisor/SeekableStreamSupervisor.java       | 116 ++++++++++++---------
 .../supervisor/SeekableStreamSupervisorSpec.java   |   6 ++
 .../common/TaskRealtimeMetricsMonitorTest.java     |  97 +++++++++++++++++
 .../indexing/common/task/IndexTaskUtilsTest.java   |  82 +++++++++++++++
 .../SeekableStreamSupervisorSpecTest.java          |  97 +++++++++++++++--
 .../SeekableStreamSupervisorStateTest.java         |  15 +++
 .../java/org/apache/druid/query/DruidMetrics.java  |   2 +
 14 files changed, 466 insertions(+), 109 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java b/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
index f40491ce70..1ba264b4e3 100644
--- a/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
+++ b/core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
@@ -149,6 +149,14 @@ public class ServiceMetricEvent implements Event
       return this;
     }
 
+    public Builder setDimensionIfNotNull(String dim, Object value)
+    {
+      if (value != null) {
+        userDims.put(dim, value);
+      }
+      return this;
+    }
+
     public Builder setDimension(String dim, Object value)
     {
       userDims.put(dim, value);
diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java b/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
index 5a97f76564..42f299f238 100644
--- a/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java
@@ -27,8 +27,10 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 
 /**
+ *
  */
 public class ServiceMetricEventTest
 {
@@ -291,4 +293,25 @@ public class ServiceMetricEventTest
     ServiceMetricEvent.builder().build("foo", 0 / 0f);
   }
 
+  @Test
+  public void testSetDimensionIfNotNullSetsNonNullDimension()
+  {
+    Map<String, String> userDimMap = ImmutableMap.of("k1", "v1", "k2", "v2");
+    ServiceMetricEvent target = ServiceMetricEvent.builder()
+                                                  .setDimensionIfNotNull("userDimMap", userDimMap)
+                                                  .build("foo", 1)
+                                                  .build("service", "host");
+    Assert.assertEquals(userDimMap, target.getUserDims().get("userDimMap"));
+  }
+
+  @Test
+  public void testSetDimensionIfNotNullShouldNotSetNullDimension()
+  {
+    ServiceMetricEvent target = ServiceMetricEvent.builder()
+                                                  .setDimensionIfNotNull("userDimMap", null)
+                                                  .build("foo", 1)
+                                                  .build("service", "host");
+    Assert.assertTrue(target.getUserDims().isEmpty());
+    Assert.assertNull(target.getUserDims().get("userDimMap"));
+  }
 }
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 2d75b23e85..4d4f999b6b 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -152,11 +152,11 @@ If SQL is enabled, the Broker will emit the following metrics for SQL.
 
 ## General native ingestion metrics
 
-|Metric|Description|Dimensions|Normal Value|
-|------|-----------|----------|------------|
-|`ingest/count`|Count of `1` every time an ingestion job runs (includes compaction jobs). Aggregate using dimensions. |`dataSource`, `taskId`, `taskType`, `taskIngestionMode`|Always `1`.|
-|`ingest/segments/count`|Count of final segments created by job (includes tombstones). |`dataSource`, `taskId`, `taskType`, `taskIngestionMode`|At least `1`.|
-|`ingest/tombstones/count`|Count of tombstones created by job. |`dataSource`, `taskId`, `taskType`, `taskIngestionMode`|Zero or more for replace. Always zero for non-replace tasks (always zero for legacy replace, see below).|
+|Metric|Description| Dimensions                                              |Normal Value|
+|------|-----------|---------------------------------------------------------|------------|
+|`ingest/count`|Count of `1` every time an ingestion job runs (includes compaction jobs). Aggregate using dimensions. | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` |Always `1`.|
+|`ingest/segments/count`|Count of final segments created by job (includes tombstones). | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` |At least `1`.|
+|`ingest/tombstones/count`|Count of tombstones created by job. | `dataSource`, `taskId`, `taskType`, `taskIngestionMode`, `tags` |Zero or more for replace. Always zero for non-replace tasks (always zero for legacy replace, see below).|
 
 The `taskIngestionMode` dimension includes the following modes: 
 * `APPEND`: a native ingestion job appending to existing segments 
@@ -167,12 +167,15 @@ The mode is decided using the values
 of the `isAppendToExisting` and `isDropExisting` flags in the
 task's `IOConfig` as follows:
 
-|`isAppendToExisting` | `isDropExisting` | mode |
-|---------------------|-------------------|------|
-`true` | `false` | `APPEND`|
-`true` | `true  ` | Invalid combination, exception thrown. |
-`false` | `false` | `REPLACE_LEGACY` (this is the default for native batch ingestion). |
-`false` | `true` | `REPLACE`|
+| `isAppendToExisting` | `isDropExisting` | mode |
+|----------------------|-------------------|------|
+| `true`               | `false` | `APPEND`|
+| `true`               | `true  ` | Invalid combination, exception thrown. |
+| `false`              | `false` | `REPLACE_LEGACY` (this is the default for native batch ingestion). |
+ | `false`              | `true` | `REPLACE`|
+
+The `tags` dimension is reported only for metrics emitted from ingestion tasks whose ingest spec specifies the `tags`
+field in the `context` field of the ingestion spec. `tags` is expected to be a map of string to object.  
 
 ### Ingestion metrics for Kafka
 
@@ -180,10 +183,10 @@ These metrics apply to the [Kafka indexing service](../development/extensions-co
 
 |Metric|Description|Dimensions|Normal Value|
 |------|-----------|----------|------------|
-|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`|Greater than 0, should not be a very high number. |
-|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`|Greater than 0, should not be a very high number. |
-|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`|Greater than 0, should not be a very high number. |
-|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`|Greater than 0, should not be a very high number. |
+|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. |
+|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. |
+|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. |
+|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0, should not be a very high number. |
 
 ### Ingestion metrics for Kinesis
 
@@ -191,10 +194,10 @@ These metrics apply to the [Kinesis indexing service](../development/extensions-
 
 |Metric|Description|Dimensions|Normal Value|
 |------|-----------|----------|------------|
-|`ingest/kinesis/lag/time`|Total lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`|Greater than 0, up to max Kinesis retention period in milliseconds. |
-|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`|Greater than 0, up to max Kinesis retention period in milliseconds. |
-|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`|Greater than 0, up to max Kinesis retention period in milliseconds. |
-|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`|Greater than 0, up to max Kinesis retention period in milliseconds. |
+|`ingest/kinesis/lag/time`|Total lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |
+|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |
+|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis across all shards. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |
+|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds between the current message sequence number consumed by the Kinesis indexing tasks and latest sequence number in Kinesis. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0, up to max Kinesis retention period in milliseconds. |
 
 ### Other ingestion metrics
 
@@ -203,26 +206,26 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
 
 |Metric|Description|Dimensions|Normal Value|
 |------|-----------|----------|------------|
-|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`|0|
-|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`|0|
-|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`|0|
-|`ingest/events/processed`|Number of events successfully processed per emission period.|`dataSource`, `taskId`, `taskType`|Equal to the number of events per emission period.|
+|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`, `tags`|0|
+|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `tags`|0|
+|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `tags`|0|
+|`ingest/events/processed`|Number of events successfully processed per emission period.|`dataSource`, `taskId`, `taskType`, `tags`|Equal to the number of events per emission period.|
 |`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, `taskType`|Your number of events with rollup.|
-|`ingest/persists/count`|Number of times persist occurred.|`dataSource`, `taskId`, `taskType`|Depends on configuration.|
-|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|`dataSource`, `taskId`, `taskType`|Depends on configuration. Generally a few minutes at most.|
-|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|`dataSource`, `taskId`, `taskType`|Depends on configuration. Generally a few minutes at most.|
-|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and blocking waiting for them to finish.|`dataSource`, `taskId`, `taskType`|0 or very low|
-|`ingest/persists/failed`|Number of persists that failed.|`dataSource`, `taskId`, `taskType`|0|
-|`ingest/handoff/failed`|Number of handoffs that failed.|`dataSource`, `taskId`, `taskType`|0|
-|`ingest/merge/time`|Milliseconds spent merging intermediate segments.|`dataSource`, `taskId`, `taskType`|Depends on configuration. Generally a few minutes at most.|
-|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|`dataSource`, `taskId`, `taskType`|Depends on configuration. Generally a few minutes at most.|
-|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, `taskId`, `taskType`|Varies. Generally greater than 0 once every segment granular period if cluster operating normally.|
-|`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`, `taskType`|1~3|
-|`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. |`dataSource`, `taskId`, `taskType`|Greater than 0, depends on the time carried in event. |
-|`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator.|`dataSource`|Typically 0 and occasionally in lower single digits. Should not be a very high number. |
-|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`| < 1s |
-|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`| < 10 seconds|
-|`ingest/handoff/time`|Total time taken for each set of segments handed off.|`dataSource`, `taskId`, `taskType`|Depends on coordinator cycle time.|
+|`ingest/persists/count`|Number of times persist occurred.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration.|
+|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.|
+|`ingest/persists/cpu`|Cpu time in Nanoseconds spent on doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.|
+|`ingest/persists/backPressure`|Milliseconds spent creating persist tasks and blocking waiting for them to finish.|`dataSource`, `taskId`, `taskType`, `tags`|0 or very low|
+|`ingest/persists/failed`|Number of persists that failed.|`dataSource`, `taskId`, `taskType`, `tags`|0|
+|`ingest/handoff/failed`|Number of handoffs that failed.|`dataSource`, `taskId`, `taskType`, `tags`|0|
+|`ingest/merge/time`|Milliseconds spent merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.|
+|`ingest/merge/cpu`|Cpu time in Nanoseconds spent on merging intermediate segments.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on configuration. Generally a few minutes at most.|
+|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, `taskId`, `taskType`, `tags`|Varies. Generally greater than 0 once every segment granular period if cluster operating normally.|
+|`ingest/sink/count`|Number of sinks not handoffed.|`dataSource`, `taskId`, `taskType`, `tags`|1~3|
+|`ingest/events/messageGap`|Time gap in milliseconds between the latest ingested event timestamp and the current system timestamp of metrics emission. |`dataSource`, `taskId`, `taskType`, `tags`|Greater than 0, depends on the time carried in event. |
+|`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator.|`dataSource`, `tags`|Typically 0 and occasionally in lower single digits. Should not be a very high number. |
+|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s |
+|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
+|`ingest/handoff/time`|Total time taken for each set of segments handed off.|`dataSource`, `taskId`, `taskType`, `tags`|Depends on coordinator cycle time.|
 
 Note: If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
 
@@ -230,19 +233,20 @@ Note: If the JVM does not support CPU time measurement for the current thread, `
 
 |Metric|Description| Dimensions                                                 |Normal Value|
 |------|-----------|------------------------------------------------------------|------------|
-|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, `taskType`, `taskStatus`|Varies|
-|`task/pending/time`|Milliseconds taken for a task to wait for running.| `dataSource`, `taskId`, `taskType`|Varies|
-|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`, `taskActionType`|< 1000 (subsecond)|
-|`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies from subsecond to a few seconds, based on action type.|
-|`task/action/success/count`|Number of task actions that were executed successfully during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies|
-|`task/action/failed/count`|Number of task actions that failed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`|Varies|
+|`task/run/time`|Milliseconds taken to run a task.| `dataSource`, `taskId`, `taskType`, `taskStatus`, `tags`|Varies|
+|`task/pending/time`|Milliseconds taken for a task to wait for running.| `dataSource`, `taskId`, `taskType`, `tags`|Varies|
+|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|< 1000 (subsecond)|
+|`task/action/run/time`|Milliseconds taken to execute a task action.| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies from subsecond to a few seconds, based on action type.|
+|`task/action/success/count`|Number of task actions that were executed successfully during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies|
+|`task/action/failed/count`|Number of task actions that failed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskId`, `taskType`, `taskActionType`, `tags`|Varies|
 |`task/action/batch/queueTime`|Milliseconds spent by a batch of task actions in queue. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on the `batchAllocationWaitTime` and number of batches in queue.|
 |`task/action/batch/runTime`|Milliseconds taken to execute a batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies from subsecond to a few seconds, based on action type and batch size.|
 |`task/action/batch/size`|Number of task actions in a batch that was executed during the emission period. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|Varies based on number of concurrent task actions.|
 |`task/action/batch/attempts`|Number of execution attempts for a single batch of task actions. Currently only being emitted for [batched `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions).| `dataSource`, `taskActionType`, `interval`|1 if there are no failures or retries.|
-|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, `taskId`, `taskType`, `interval`|Varies|
-|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies|
-|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| `dataSource`, `taskId`, `taskType`, `interval`|Varies|
+|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.| `dataSource`, `taskType`, `taskId`, `segmentAvailabilityConfirmed`, `tags`|Varies|
+|`segment/added/bytes`|Size in bytes of new segments created.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies|
+|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies|
+|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| `dataSource`, `taskId`, `taskType`, `interval`, `tags`|Varies|
 |`task/success/count`|Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.| `dataSource`|Varies|
 |`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|`dataSource`|Varies|
 |`task/running/count`|Number of current running tasks. This metric is only available if the `TaskCountStatsMonitor` module is included.|`dataSource`|Varies|
@@ -253,7 +257,6 @@ Note: If the JVM does not support CPU time measurement for the current thread, `
 |`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the `TaskSlotCountStatsMonitor` module is included.| `category`|Varies|
 |`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the `TaskSlotCountStatsMonitor` module is included.| `category`|Varies|
 |`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the `TaskSlotCountStatsMonitor` module is included.| `category`|Varies|
-|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.| `dataSource`, `taskType`, `taskId`, `segmentAvailabilityConfirmed` |Varies|
 |`worker/task/failed/count`|Number of failed tasks run on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included, and is only supported for middleManager nodes.| `category`, `workerVersion`|Varies|
 |`worker/task/success/count`|Number of successful tasks run on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included, and is only supported for middleManager nodes.| `category`,`workerVersion`|Varies|
 |`worker/taskSlot/idle/count`|Number of idle task slots on the reporting worker per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included, and is only supported for middleManager nodes.| `category`, `workerVersion`|Varies|
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
index 03452ac355..a07ad4eaad 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java
@@ -45,7 +45,11 @@ public class TaskRealtimeMetricsMonitorBuilder
     );
   }
 
-  public static TaskRealtimeMetricsMonitor build(Task task, FireDepartment fireDepartment, RowIngestionMeters meters)
+  public static TaskRealtimeMetricsMonitor build(
+      Task task,
+      FireDepartment fireDepartment,
+      RowIngestionMeters meters
+  )
   {
     return new TaskRealtimeMetricsMonitor(
         fireDepartment,
@@ -53,7 +57,8 @@ public class TaskRealtimeMetricsMonitorBuilder
         ImmutableMap.of(
             DruidMetrics.TASK_ID, new String[]{task.getId()},
             DruidMetrics.TASK_TYPE, new String[]{task.getType()}
-        )
+            ),
+        task.getContextValue(DruidMetrics.TAGS)
     );
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
index 91c842a05f..f708bf95d8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
@@ -31,6 +31,7 @@ import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 
 /**
@@ -46,6 +47,8 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
   private final FireDepartment fireDepartment;
   private final RowIngestionMeters rowIngestionMeters;
   private final Map<String, String[]> dimensions;
+  @Nullable
+  private final Map<String, Object> metricTags;
 
   private FireDepartmentMetrics previousFireDepartmentMetrics;
   private RowIngestionMetersTotals previousRowIngestionMetersTotals;
@@ -53,12 +56,14 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
   public TaskRealtimeMetricsMonitor(
       FireDepartment fireDepartment,
       RowIngestionMeters rowIngestionMeters,
-      Map<String, String[]> dimensions
+      Map<String, String[]> dimensions,
+      @Nullable Map<String, Object> metricTags
   )
   {
     this.fireDepartment = fireDepartment;
     this.rowIngestionMeters = rowIngestionMeters;
     this.dimensions = ImmutableMap.copyOf(dimensions);
+    this.metricTags = metricTags;
     previousFireDepartmentMetrics = new FireDepartmentMetrics();
     previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0, 0);
   }
@@ -80,6 +85,7 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
           thrownAway
       );
     }
+    builder.setDimensionIfNotNull(DruidMetrics.TAGS, metricTags);
     emitter.emit(builder.build("ingest/events/thrownAway", thrownAway));
 
     final long unparseable = rowIngestionMetersTotals.getUnparseable()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 6e6e12f2ad..2f19c1a5a0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -60,6 +60,7 @@ import org.apache.druid.java.util.common.granularity.GranularityType;
 import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -691,6 +692,7 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
               .setDimension("dataSource", getDataSource())
               .setDimension("taskType", getType())
               .setDimension("taskId", getId())
+              .setDimensionIfNotNull(DruidMetrics.TAGS, getContextValue(DruidMetrics.TAGS))
               .setDimension("segmentAvailabilityConfirmed", segmentAvailabilityConfirmationCompleted)
               .build("task/segmentAvailability/wait/time", segmentAvailabilityWaitTimeMs)
       );
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index 44b838f69a..05be8c1941 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -40,6 +40,7 @@ import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 public class IndexTaskUtils
 {
@@ -112,6 +113,10 @@ public class IndexTaskUtils
     metricBuilder.setDimension(DruidMetrics.TASK_ID, task.getId());
     metricBuilder.setDimension(DruidMetrics.TASK_TYPE, task.getType());
     metricBuilder.setDimension(DruidMetrics.DATASOURCE, task.getDataSource());
+    metricBuilder.setDimensionIfNotNull(
+        DruidMetrics.TAGS,
+        task.<Map<String, Object>>getContextValue(DruidMetrics.TAGS)
+    );
   }
 
   public static void setTaskDimensions(final ServiceMetricEvent.Builder metricBuilder, final AbstractTask task)
@@ -119,7 +124,11 @@ public class IndexTaskUtils
     metricBuilder.setDimension(DruidMetrics.TASK_ID, task.getId());
     metricBuilder.setDimension(DruidMetrics.TASK_TYPE, task.getType());
     metricBuilder.setDimension(DruidMetrics.DATASOURCE, task.getDataSource());
-    metricBuilder.setDimension(DruidMetrics.TASK_INGESTION_MODE, ((AbstractTask) task).getIngestionMode());
+    metricBuilder.setDimension(DruidMetrics.TASK_INGESTION_MODE, task.getIngestionMode());
+    metricBuilder.setDimensionIfNotNull(
+        DruidMetrics.TAGS,
+        task.<Map<String, Object>>getContextValue(DruidMetrics.TAGS)
+    );
   }
 
   public static void setTaskStatusDimensions(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 68cf3bf796..0ae9aad963 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -143,7 +143,8 @@ import java.util.stream.Stream;
  * @param <PartitionIdType>    the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type
  * @param <SequenceOffsetType> the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers
  */
-public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity> implements Supervisor
+public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity>
+    implements Supervisor
 {
   public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
 
@@ -433,27 +434,31 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         try {
           long nowTime = System.currentTimeMillis();
           if (spec.isSuspended()) {
-            log.info("Skipping DynamicAllocationTasksNotice execution because [%s] supervisor is suspended",
-                    dataSource
+            log.info(
+                "Skipping DynamicAllocationTasksNotice execution because [%s] supervisor is suspended",
+                dataSource
             );
             return;
           }
           log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", pendingCompletionTaskGroups,
-                  dataSource
+                    dataSource
           );
           for (CopyOnWriteArrayList<TaskGroup> list : pendingCompletionTaskGroups.values()) {
             if (!list.isEmpty()) {
               log.info(
-                      "Skipping DynamicAllocationTasksNotice execution for datasource [%s] because following tasks are pending [%s]",
-                      dataSource, pendingCompletionTaskGroups
+                  "Skipping DynamicAllocationTasksNotice execution for datasource [%s] because following tasks are pending [%s]",
+                  dataSource,
+                  pendingCompletionTaskGroups
               );
               return;
             }
           }
           if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
             log.info(
-                    "DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
-                    nowTime - dynamicTriggerLastRunTime, autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
+                "DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+                nowTime - dynamicTriggerLastRunTime,
+                autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(),
+                dataSource
             );
             return;
           }
@@ -479,18 +484,20 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   /**
    * This method determines how to do scale actions based on collected lag points.
    * If scale action is triggered :
-   *    First of all, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing.
-   *    Secondly, clear all the stateful data structures: activelyReadingTaskGroups, partitionGroups, partitionOffsets, pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in the next 'RunNotice'.
-   *    Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage.
+   * First of all, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing.
+   * Secondly, clear all the stateful data structures: activelyReadingTaskGroups, partitionGroups, partitionOffsets, pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in the next 'RunNotice'.
+   * Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage.
    * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor.
+   *
    * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
    * @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 'changeTaskCount'.
-   *         If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
+   * If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
    * @throws InterruptedException
    * @throws ExecutionException
    * @throws TimeoutException
    */
-  private boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException, TimeoutException
+  private boolean changeTaskCount(int desiredActiveTaskCount)
+      throws InterruptedException, ExecutionException, TimeoutException
   {
     int currentActiveTaskCount;
     Collection<TaskGroup> activeTaskGroups = activelyReadingTaskGroups.values();
@@ -500,8 +507,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       return false;
     } else {
       log.info(
-              "Starting scale action, current active task count is [%d] and desired task count is [%d] for dataSource [%s].",
-              currentActiveTaskCount, desiredActiveTaskCount, dataSource
+          "Starting scale action, current active task count is [%d] and desired task count is [%d] for dataSource [%s].",
+          currentActiveTaskCount,
+          desiredActiveTaskCount,
+          dataSource
       );
       gracefulShutdownInternal();
       changeTaskCountInIOConfig(desiredActiveTaskCount);
@@ -796,14 +805,14 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       log.info("Running Task autoscaler for datasource [%s]", dataSource);
 
       workerThreads = (this.tuningConfig.getWorkerThreads() != null
-              ? this.tuningConfig.getWorkerThreads()
-              : Math.min(10, autoScalerConfig.getTaskCountMax()));
+                       ? this.tuningConfig.getWorkerThreads()
+                       : Math.min(10, autoScalerConfig.getTaskCountMax()));
 
       maxNumTasks = autoScalerConfig.getTaskCountMax() * this.ioConfig.getReplicas();
     } else {
       workerThreads = (this.tuningConfig.getWorkerThreads() != null
-              ? this.tuningConfig.getWorkerThreads()
-              : Math.min(10, this.ioConfig.getTaskCount()));
+                       ? this.tuningConfig.getWorkerThreads()
+                       : Math.min(10, this.ioConfig.getTaskCount()));
 
       maxNumTasks = this.ioConfig.getTaskCount() * this.ioConfig.getReplicas();
     }
@@ -1246,7 +1255,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * Collect row ingestion stats from all tasks managed by this supervisor.
    *
    * @return A map of groupId->taskId->task row stats
-   *
    * @throws InterruptedException
    * @throws ExecutionException
    * @throws TimeoutException
@@ -1321,7 +1329,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * Collect parse errors from all tasks managed by this supervisor.
    *
    * @return A list of parse error strings
-   *
    * @throws InterruptedException
    * @throws ExecutionException
    * @throws TimeoutException
@@ -1975,12 +1982,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   /**
    * Returns a Pair of information about a task:
-   *
+   * <p>
    * Left-hand side: Status of the task from {@link SeekableStreamIndexTaskClient#getStatusAsync}.
-   *
+   * <p>
    * Right-hand side: If status is {@link SeekableStreamIndexTaskRunner.Status#PUBLISHING}, end offsets from
    * {@link SeekableStreamIndexTaskClient#getEndOffsetsAsync}. Otherwise, null.
-   *
+   * <p>
    * Used by {@link #discoverTasks()}.
    */
   private ListenableFuture<Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdType, SequenceOffsetType>>> getStatusAndPossiblyEndOffsets(
@@ -2049,14 +2056,18 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                   log.info("Resumed task [%s] in first supervisor run.", taskId);
                 } else {
                   log.warn("Failed to resume task [%s] in first supervisor run.", taskId);
-                  killTask(taskId,
-                           "Killing forcefully as task could not be resumed in the first supervisor run after Overlord change.");
+                  killTask(
+                      taskId,
+                      "Killing forcefully as task could not be resumed in the first supervisor run after Overlord change."
+                  );
                 }
               }
               catch (Exception e) {
                 log.warn(e, "Failed to resume task [%s] in first supervisor run.", taskId);
-                killTask(taskId,
-                         "Killing forcefully as task could not be resumed in the first supervisor run after Overlord change.");
+                killTask(
+                    taskId,
+                    "Killing forcefully as task could not be resumed in the first supervisor run after Overlord change."
+                );
               }
             }
           },
@@ -2341,8 +2352,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * Determines whether a given task was created by the current version of the supervisor.
    * Uses the Task object mapped to this taskId in the {@code activeTaskMap}.
    * If not found in the map, fetch it from the metadata store.
-   * @param taskGroupId task group id
-   * @param taskId task id
+   *
+   * @param taskGroupId   task group id
+   * @param taskId        task id
    * @param activeTaskMap Set of active tasks that were pre-fetched
    * @return true if the task was created by the current supervisor
    */
@@ -2628,8 +2640,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     recordSupplierLock.lock();
     try {
       final Set<StreamPartition<PartitionIdType>> partitions = partitionIds.stream()
-                                         .map(partitionId -> new StreamPartition<>(ioConfig.getStream(), partitionId))
-                                         .collect(Collectors.toSet());
+                                                                           .map(partitionId -> new StreamPartition<>(
+                                                                               ioConfig.getStream(),
+                                                                               partitionId
+                                                                           ))
+                                                                           .collect(Collectors.toSet());
       if (!recordSupplier.getAssignment().containsAll(partitions)) {
         recordSupplier.assign(partitions);
         try {
@@ -2740,7 +2755,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * by this method.
    *
    * @param availablePartitions
-   *
    * @return a remapped copy of partitionGroups, containing only the partitions in availablePartitions
    */
   protected Map<Integer, Set<PartitionIdType>> recomputePartitionGroupsForExpiration(
@@ -2757,7 +2771,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    *
    * @param currentMetadata     The current DataSourceMetadata from metadata storage
    * @param expiredPartitionIds The set of expired partition IDs.
-   *
    * @return currentMetadata but with any expired partitions removed.
    */
   protected SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadataWithExpiredPartitions(
@@ -3396,7 +3409,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * should be removed from the starting offsets sent to the tasks.
    *
    * @param startingOffsets
-   *
    * @return startingOffsets with entries for expired partitions removed
    */
   protected Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> filterExpiredPartitionsFromStartingOffsets(
@@ -3821,7 +3833,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                 pendingCompletionTaskGroups
                     .values()
                     .stream()
-                    .flatMap(taskGroups -> taskGroups.stream().flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()))
+                    .flatMap(taskGroups -> taskGroups.stream()
+                                                     .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()))
                     .flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream())
             ).collect(Collectors.toMap(
                 Entry::getKey,
@@ -3829,7 +3842,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                 (v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
             ));
 
-        partitionIds.forEach(partitionId -> currentOffsets.putIfAbsent(partitionId, offsetsFromMetadataStorage.get(partitionId)));
+        partitionIds.forEach(partitionId -> currentOffsets.putIfAbsent(
+            partitionId,
+            offsetsFromMetadataStorage.get(partitionId)
+        ));
         return currentOffsets;
       }
     }
@@ -3935,6 +3951,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   /**
    * Get all active tasks from metadata storage
+   *
    * @return map from taskId to Task
    */
   private Map<String, Task> getActiveTaskMap()
@@ -3969,7 +3986,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * the given replicas count
    *
    * @return list of specific kafka/kinesis index taksks
-   *
    * @throws JsonProcessingException
    */
   protected abstract List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType>> createIndexTasks(
@@ -3987,7 +4003,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * different between Kafka/Kinesis since Kinesis uses String as partition id
    *
    * @param partition partition id
-   *
    * @return taskgroup id
    */
   protected abstract int getTaskGroupIdForPartition(PartitionIdType partition);
@@ -3997,7 +4012,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * of [kafka/kinesis]DataSourceMetadata
    *
    * @param metadata datasource metadata
-   *
    * @return true if isInstance else false
    */
   protected abstract boolean checkSourceMetadataMatch(DataSourceMetadata metadata);
@@ -4007,7 +4021,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * [Kafka/Kinesis]IndexTask
    *
    * @param task task
-   *
    * @return true if isInstance else false
    */
   protected abstract boolean doesTaskTypeMatchSupervisor(Task task);
@@ -4017,7 +4030,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    *
    * @param stream stream name
    * @param map    partitionId -> sequence
-   *
    * @return specific instance of datasource metadata
    */
   protected abstract SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetaDataForReset(
@@ -4152,9 +4164,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     try {
       emitter.emit(
           ServiceMetricEvent.builder()
-              .setDimension("noticeType", noticeType)
-              .setDimension("dataSource", dataSource)
-              .build("ingest/notices/time", timeInMillis)
+                            .setDimension("noticeType", noticeType)
+                            .setDimension("dataSource", dataSource)
+                            .setDimensionIfNotNull(DruidMetrics.TAGS, spec.getContextValue(DruidMetrics.TAGS))
+                            .build("ingest/notices/time", timeInMillis)
       );
     }
     catch (Exception e) {
@@ -4171,8 +4184,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     try {
       emitter.emit(
           ServiceMetricEvent.builder()
-              .setDimension("dataSource", dataSource)
-              .build("ingest/notices/queueSize", getNoticesQueueSize())
+                            .setDimension("dataSource", dataSource)
+                            .setDimensionIfNotNull(DruidMetrics.TAGS, spec.getContextValue(DruidMetrics.TAGS))
+                            .build("ingest/notices/queueSize", getNoticesQueueSize())
       );
     }
     catch (Exception e) {
@@ -4207,12 +4221,14 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         }
 
         LagStats lagStats = computeLags(partitionLags);
+        Map<String, Object> metricTags = spec.getContextValue(DruidMetrics.TAGS);
         for (Map.Entry<PartitionIdType, Long> entry : partitionLags.entrySet()) {
           emitter.emit(
               ServiceMetricEvent.builder()
                                 .setDimension(DruidMetrics.DATASOURCE, dataSource)
                                 .setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
                                 .setDimension(DruidMetrics.PARTITION, entry.getKey())
+                                .setDimensionIfNotNull(DruidMetrics.TAGS, metricTags)
                                 .build(
                                     StringUtils.format("ingest/%s/partitionLag%s", type, suffix),
                                     entry.getValue()
@@ -4223,18 +4239,21 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
             ServiceMetricEvent.builder()
                               .setDimension(DruidMetrics.DATASOURCE, dataSource)
                               .setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
+                              .setDimensionIfNotNull(DruidMetrics.TAGS, metricTags)
                               .build(StringUtils.format("ingest/%s/lag%s", type, suffix), lagStats.getTotalLag())
         );
         emitter.emit(
             ServiceMetricEvent.builder()
                               .setDimension(DruidMetrics.DATASOURCE, dataSource)
                               .setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
+                              .setDimensionIfNotNull(DruidMetrics.TAGS, metricTags)
                               .build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), lagStats.getMaxLag())
         );
         emitter.emit(
             ServiceMetricEvent.builder()
                               .setDimension(DruidMetrics.DATASOURCE, dataSource)
                               .setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
+                              .setDimensionIfNotNull(DruidMetrics.TAGS, metricTags)
                               .build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), lagStats.getAvgLag())
         );
       };
@@ -4250,7 +4269,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
 
   /**
-   *  This method computes maxLag, totalLag and avgLag
+   * This method computes maxLag, totalLag and avgLag
+   *
    * @param partitionLags lags per partition
    */
   protected LagStats computeLags(Map<PartitionIdType, Long> partitionLags)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 0ac57f8df7..90b0b70563 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -132,6 +132,12 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
     return context;
   }
 
+  @Nullable
+  public <ContextValueType> ContextValueType getContextValue(String key)
+  {
+    return context == null ? null : (ContextValueType) context.get(key);
+  }
+
   public ServiceEmitter getEmitter()
   {
     return emitter;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
new file mode 100644
index 0000000000..b50e68d13c
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.realtime.FireDepartment;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TaskRealtimeMetricsMonitorTest
+{
+  private static final Map<String, String[]> DIMENSIONS = ImmutableMap.of(
+      "dim1",
+      new String[]{"v1", "v2"},
+      "dim2",
+      new String[]{"vv"}
+  );
+
+  private static final Map<String, Object> TAGS = ImmutableMap.of("author", "Author Name", "version", 10);
+
+  @Mock(answer = Answers.RETURNS_MOCKS)
+  private FireDepartment fireDepartment;
+  @Mock(answer = Answers.RETURNS_MOCKS)
+  private RowIngestionMeters rowIngestionMeters;
+  @Mock
+  private ServiceEmitter emitter;
+  private Map<String, ServiceMetricEvent> emittedEvents;
+  private TaskRealtimeMetricsMonitor target;
+
+  @Before
+  public void setUp()
+  {
+    emittedEvents = new HashMap<>();
+    Mockito.doCallRealMethod().when(emitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
+    Mockito
+        .doAnswer(invocation -> {
+          ServiceMetricEvent e = invocation.getArgument(0);
+          emittedEvents.put(e.getMetric(), e);
+          return null;
+        })
+        .when(emitter).emit(ArgumentMatchers.any(Event.class));
+    target = new TaskRealtimeMetricsMonitor(fireDepartment, rowIngestionMeters, DIMENSIONS, TAGS);
+  }
+
+  @Test
+  public void testdoMonitorShouldEmitUserProvidedTags()
+  {
+    target.doMonitor(emitter);
+    for (ServiceMetricEvent sme : emittedEvents.values()) {
+      Assert.assertEquals(TAGS, sme.getUserDims().get(DruidMetrics.TAGS));
+    }
+  }
+
+  @Test
+  public void testdoMonitorWithoutTagsShouldNotEmitTags()
+  {
+    target = new TaskRealtimeMetricsMonitor(fireDepartment, rowIngestionMeters, DIMENSIONS, null);
+    for (ServiceMetricEvent sme : emittedEvents.values()) {
+      Assert.assertFalse(sme.getUserDims().containsKey(DruidMetrics.TAGS));
+    }
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java
new file mode 100644
index 0000000000..8543f893fd
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskUtilsTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Map;
+
+@RunWith(MockitoJUnitRunner.class)
+public class IndexTaskUtilsTest
+{
+  private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1", "v1", "k2", 20);
+  @Mock
+  private Task task;
+  @Mock
+  private AbstractTask abstractTask;
+  private ServiceMetricEvent.Builder metricBuilder;
+
+  @Before
+  public void setUp()
+  {
+    metricBuilder = ServiceMetricEvent.builder();
+    Mockito.when(task.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS);
+    Mockito.when(abstractTask.getContextValue(DruidMetrics.TAGS)).thenReturn(METRIC_TAGS);
+  }
+
+  @Test
+  public void testSetTaskDimensionsWithContextTagsShouldSetTags()
+  {
+    IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+    Assert.assertEquals(METRIC_TAGS, metricBuilder.getDimension(DruidMetrics.TAGS));
+  }
+
+  @Test
+  public void testSetTaskDimensionsForAbstractTaskWithContextTagsShouldSetTags()
+  {
+    IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask);
+    Assert.assertEquals(METRIC_TAGS, metricBuilder.getDimension(DruidMetrics.TAGS));
+  }
+
+  @Test
+  public void testSetTaskDimensionsWithoutTagsShouldNotSetTags()
+  {
+    Mockito.when(task.getContextValue(DruidMetrics.TAGS)).thenReturn(null);
+    IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+    Assert.assertNull(metricBuilder.getDimension(DruidMetrics.TAGS));
+  }
+
+  @Test
+  public void testSetTaskDimensionsForAbstractTaskWithoutTagsShouldNotSetTags()
+  {
+    Mockito.when(abstractTask.getContextValue(DruidMetrics.TAGS)).thenReturn(null);
+    IndexTaskUtils.setTaskDimensions(metricBuilder, abstractTask);
+    Assert.assertNull(metricBuilder.getDimension(DruidMetrics.TAGS));
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index b449867546..52e1600585 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -674,14 +674,15 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
     EasyMock.replay(ingestionSchema);
 
     EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
-            .andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis",
-                                                           "1",
-                                                           "enableTaskAutoScaler",
-                                                           true,
-                                                           "taskCountMax",
-                                                           "4",
-                                                           "taskCountMin",
-                                                           "1"
+            .andReturn(mapper.convertValue(ImmutableMap.of(
+                "lagCollectionIntervalMillis",
+                "1",
+                "enableTaskAutoScaler",
+                true,
+                "taskCountMax",
+                "4",
+                "taskCountMin",
+                "1"
             ), AutoScalerConfig.class))
             .anyTimes();
     EasyMock.replay(seekableStreamSupervisorIOConfig);
@@ -931,7 +932,8 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
         null,
         null,
         new IdleConfig(true, null)
-    ){
+    )
+    {
     };
 
     EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
@@ -981,6 +983,83 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
     Assert.assertTrue(Objects.requireNonNull(spec.getIoConfig().getIdleConfig()).isEnabled());
   }
 
+  @Test
+  public void testGetContextVauleWithNullContextShouldReturnNull()
+  {
+    mockIngestionSchema();
+    TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        null,
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
+    Assert.assertNull(spec.getContextValue("key"));
+  }
+
+  @Test
+  public void testGetContextVauleForNonExistentKeyShouldReturnNull()
+  {
+    mockIngestionSchema();
+    TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        ImmutableMap.of("key", "value"),
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
+    Assert.assertNull(spec.getContextValue("key_not_exists"));
+  }
+
+  @Test
+  public void testGetContextVauleForKeyShouldReturnValue()
+  {
+    mockIngestionSchema();
+    TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(
+        ingestionSchema,
+        ImmutableMap.of("key", "value"),
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        indexTaskClientFactory,
+        mapper,
+        emitter,
+        monitorSchedulerConfig,
+        rowIngestionMetersFactory,
+        supervisorStateManagerConfig,
+        supervisor4,
+        "id1"
+    );
+    Assert.assertEquals("value", spec.getContextValue("key"));
+  }
+
+  private void mockIngestionSchema()
+  {
+    EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+  }
+
   private static DataSchema getDataSchema()
   {
     List<DimensionSchema> dimensions = new ArrayList<>();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 61260221bb..d622984b0b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -68,6 +68,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.TestHelper;
@@ -110,6 +111,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
   private static final String SHARD_ID = "0";
   private static final StreamPartition<String> SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID);
   private static final String EXCEPTION_MSG = "I had an exception";
+  private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1", "v1", "k2", 20);
 
   private TaskStorage taskStorage;
   private TaskMaster taskMaster;
@@ -151,6 +153,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
     EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig()).anyTimes();
     EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
 
     EasyMock.expect(taskClientFactory.build(
         EasyMock.anyString(),
@@ -792,16 +795,22 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
     Assert.assertEquals(6, events.size());
     Assert.assertEquals("ingest/test/lag", events.get(0).toMap().get("metric"));
     Assert.assertEquals(850L, events.get(0).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, events.get(0).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/maxLag", events.get(1).toMap().get("metric"));
     Assert.assertEquals(500L, events.get(1).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, events.get(1).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/avgLag", events.get(2).toMap().get("metric"));
     Assert.assertEquals(283L, events.get(2).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, events.get(2).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/lag/time", events.get(3).toMap().get("metric"));
     Assert.assertEquals(45000L, events.get(3).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, events.get(3).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/maxLag/time", events.get(4).toMap().get("metric"));
     Assert.assertEquals(20000L, events.get(4).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, events.get(4).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/avgLag/time", events.get(5).toMap().get("metric"));
     Assert.assertEquals(15000L, events.get(5).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, events.get(5).toMap().get(DruidMetrics.TAGS));
     verifyAll();
   }
 
@@ -872,10 +881,13 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
     Assert.assertEquals(3, events.size());
     Assert.assertEquals("ingest/test/lag/time", events.get(0).toMap().get("metric"));
     Assert.assertEquals(45000L, events.get(0).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, events.get(0).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/maxLag/time", events.get(1).toMap().get("metric"));
     Assert.assertEquals(20000L, events.get(1).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, events.get(1).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("ingest/test/avgLag/time", events.get(2).toMap().get("metric"));
     Assert.assertEquals(15000L, events.get(2).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, events.get(2).toMap().get(DruidMetrics.TAGS));
     verifyAll();
   }
 
@@ -909,6 +921,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
     Assert.assertEquals(1, events.size());
     Assert.assertEquals("ingest/notices/queueSize", events.get(0).toMap().get("metric"));
     Assert.assertEquals(0, events.get(0).toMap().get("value"));
+    Assert.assertEquals(METRIC_TAGS, events.get(0).toMap().get(DruidMetrics.TAGS));
     Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
     verifyAll();
   }
@@ -936,6 +949,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
     events = filterMetrics(events, whitelist);
     Assert.assertEquals(1, events.size());
     Assert.assertEquals("ingest/notices/time", events.get(0).toMap().get("metric"));
+    Assert.assertEquals(METRIC_TAGS, events.get(0).toMap().get(DruidMetrics.TAGS));
     Assert.assertTrue(String.valueOf(events.get(0).toMap().get("value")), (long) events.get(0).toMap().get("value") > 0);
     Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
     Assert.assertEquals("run_notice", events.get(0).toMap().get("noticeType"));
@@ -1063,6 +1077,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
     }).anyTimes();
     EasyMock.expect(spec.isSuspended()).andReturn(suspended).anyTimes();
     EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
+    EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
 
     EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
     EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
index b3858258e1..50482ce7f4 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
@@ -52,6 +52,8 @@ public class DruidMetrics
 
   public static final String PARTITION = "partition";
 
+  public static final String TAGS = "tags";
+
   public static int findNumComplexAggs(List<AggregatorFactory> aggs)
   {
     int retVal = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org