You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by kf...@apache.org on 2023/08/04 03:37:18 UTC

[druid] branch master updated: Report task/pending/time metrics for k8s based ingestion (#14698)

This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3335040b22 Report task/pending/time metrics for k8s based ingestion (#14698)
3335040b22 is described below

commit 3335040b2206588f4034bc8c6004d67c9ce75773
Author: YongGang <ma...@gmail.com>
AuthorDate: Thu Aug 3 20:37:11 2023 -0700

    Report task/pending/time metrics for k8s based ingestion (#14698)
    
    Changes:
    * Add and invoke `StateListener` when state changes in `KubernetesPeonLifecycle`
    * Report `task/pending/time` metric in `KubernetesTaskRunner` when state moves to RUNNING
---
 .../k8s/overlord/KubernetesPeonLifecycle.java      |  54 +++--
 .../overlord/KubernetesPeonLifecycleFactory.java   |   5 +-
 .../druid/k8s/overlord/KubernetesTaskRunner.java   |  42 +++-
 .../k8s/overlord/KubernetesTaskRunnerFactory.java  |   9 +-
 .../druid/k8s/overlord/KubernetesWorkItem.java     |   5 +
 .../druid/k8s/overlord/PeonLifecycleFactory.java   |   2 +-
 .../k8s/overlord/KubernetesPeonLifecycleTest.java  | 257 ++++++++++++++++++---
 .../overlord/KubernetesTaskRunnerFactoryTest.java  |  27 ++-
 .../k8s/overlord/KubernetesTaskRunnerTest.java     |  34 ++-
 .../druid/k8s/overlord/KubernetesWorkItemTest.java |  14 ++
 .../k8s/overlord/TestPeonLifecycleFactory.java     |   2 +-
 11 files changed, 384 insertions(+), 67 deletions(-)

diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index b0d483e527..302a568a23 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -45,6 +45,7 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -60,6 +61,12 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class KubernetesPeonLifecycle
 {
+  @FunctionalInterface
+  public interface TaskStateListener
+  {
+    void stateChanged(State state, String taskId);
+  }
+
   private static final EmittingLogger log = new EmittingLogger(KubernetesPeonLifecycle.class);
 
   protected enum State
@@ -79,6 +86,7 @@ public class KubernetesPeonLifecycle
   private final TaskLogs taskLogs;
   private final KubernetesPeonClient kubernetesClient;
   private final ObjectMapper mapper;
+  private final TaskStateListener stateListener;
 
   @MonotonicNonNull
   private LogWatch logWatch;
@@ -89,13 +97,15 @@ public class KubernetesPeonLifecycle
       Task task,
       KubernetesPeonClient kubernetesClient,
       TaskLogs taskLogs,
-      ObjectMapper mapper
+      ObjectMapper mapper,
+      TaskStateListener stateListener
   )
   {
     this.taskId = new K8sTaskId(task);
     this.kubernetesClient = kubernetesClient;
     this.taskLogs = taskLogs;
     this.mapper = mapper;
+    this.stateListener = stateListener;
   }
 
   /**
@@ -110,13 +120,7 @@ public class KubernetesPeonLifecycle
   protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) throws IllegalStateException
   {
     try {
-      Preconditions.checkState(
-          state.compareAndSet(State.NOT_STARTED, State.PENDING),
-          "Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]",
-          taskId.getOriginalTaskId(),
-          state.get(),
-          State.PENDING
-      );
+      updateState(new State[]{State.NOT_STARTED}, State.PENDING);
 
       // In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation.
       taskLocation = null;
@@ -134,7 +138,7 @@ public class KubernetesPeonLifecycle
       throw e;
     }
     finally {
-      state.set(State.STOPPED);
+      stopTask();
     }
   }
 
@@ -148,16 +152,7 @@ public class KubernetesPeonLifecycle
   protected synchronized TaskStatus join(long timeout) throws IllegalStateException
   {
     try {
-      Preconditions.checkState(
-          (
-              state.compareAndSet(State.NOT_STARTED, State.RUNNING) ||
-              state.compareAndSet(State.PENDING, State.RUNNING)
-          ),
-          "Task [%s] failed to join: invalid peon lifecycle state transition [%s]->[%s]",
-          taskId.getOriginalTaskId(),
-          state.get(),
-          State.RUNNING
-      );
+      updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);
 
       JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
           taskId,
@@ -176,7 +171,7 @@ public class KubernetesPeonLifecycle
         log.warn(e, "Task [%s] cleanup failed", taskId);
       }
 
-      state.set(State.STOPPED);
+      stopTask();
     }
   }
 
@@ -326,4 +321,23 @@ public class KubernetesPeonLifecycle
       log.warn(e, "Failed to manage temporary log file for task [%s]", taskId.getOriginalTaskId());
     }
   }
+
+  private void stopTask()
+  {
+    if (!State.STOPPED.equals(state.get())) {
+      updateState(new State[]{State.NOT_STARTED, State.PENDING, State.RUNNING}, State.STOPPED);
+    }
+  }
+
+  private void updateState(State[] acceptedStates, State targetState)
+  {
+    Preconditions.checkState(
+        Arrays.stream(acceptedStates).anyMatch(s -> state.compareAndSet(s, targetState)),
+        "Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]",
+        taskId.getOriginalTaskId(),
+        state.get(),
+        targetState
+    );
+    stateListener.stateChanged(state.get(), taskId.getOriginalTaskId());
+  }
 }
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
index 2f2375789b..bf4e3a7125 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
@@ -42,13 +42,14 @@ public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory
   }
 
   @Override
-  public KubernetesPeonLifecycle build(Task task)
+  public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener)
   {
     return new KubernetesPeonLifecycle(
         task,
         client,
         taskLogs,
-        mapper
+        mapper,
+        stateListener
     );
   }
 }
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 12c5bb6029..89d33c7404 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -31,16 +31,20 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerListener;
 import org.apache.druid.indexing.overlord.TaskRunnerUtils;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@@ -48,6 +52,7 @@ import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
 import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
 import org.apache.druid.tasklogs.TaskLogStreamer;
 import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.Duration;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -100,6 +105,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
   private final ListeningExecutorService exec;
   private final HttpClient httpClient;
   private final PeonLifecycleFactory peonLifecycleFactory;
+  private final ServiceEmitter emitter;
 
 
   public KubernetesTaskRunner(
@@ -107,7 +113,8 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
       KubernetesTaskRunnerConfig config,
       KubernetesPeonClient client,
       HttpClient httpClient,
-      PeonLifecycleFactory peonLifecycleFactory
+      PeonLifecycleFactory peonLifecycleFactory,
+      ServiceEmitter emitter
   )
   {
     this.adapter = adapter;
@@ -119,6 +126,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
     this.exec = MoreExecutors.listeningDecorator(
         Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
     );
+    this.emitter = emitter;
   }
 
   @Override
@@ -162,7 +170,10 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
   protected TaskStatus doTask(Task task, boolean run)
   {
     try {
-      KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
+      KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
+          task,
+          this::emitTaskStateMetrics
+      );
 
       synchronized (tasks) {
         KubernetesWorkItem workItem = tasks.get(task.getId());
@@ -206,6 +217,33 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
     }
   }
 
+  @VisibleForTesting
+  protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String taskId)
+  {
+    switch (state) {
+      case RUNNING:
+        KubernetesWorkItem workItem;
+        synchronized (tasks) {
+          workItem = tasks.get(taskId);
+          if (workItem == null) {
+            log.error("Task [%s] disappeared", taskId);
+            return;
+          }
+        }
+        ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
+        IndexTaskUtils.setTaskDimensions(metricBuilder, workItem.getTask());
+        emitter.emit(
+            metricBuilder.build(
+                "task/pending/time",
+                new Duration(workItem.getCreatedTime(), DateTimes.nowUtc()).getMillis()
+            )
+        );
+      default:
+        // ignore other state transition now
+        return;
+    }
+  }
+
   @Override
   public void updateStatus(Task task, TaskStatus status)
   {
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index 5b63872cab..76698ba8fe 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -28,6 +28,7 @@ import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.overlord.TaskRunnerFactory;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
 import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
@@ -54,6 +55,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
   private final TaskConfig taskConfig;
   private final Properties properties;
   private final DruidKubernetesClient druidKubernetesClient;
+  private final ServiceEmitter emitter;
   private KubernetesTaskRunner runner;
 
 
@@ -67,7 +69,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
       @Self DruidNode druidNode,
       TaskConfig taskConfig,
       Properties properties,
-      DruidKubernetesClient druidKubernetesClient
+      DruidKubernetesClient druidKubernetesClient,
+      ServiceEmitter emitter
   )
   {
     this.smileMapper = smileMapper;
@@ -79,6 +82,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
     this.taskConfig = taskConfig;
     this.properties = properties;
     this.druidKubernetesClient = druidKubernetesClient;
+    this.emitter = emitter;
   }
 
   @Override
@@ -96,7 +100,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
         kubernetesTaskRunnerConfig,
         peonClient,
         httpClient,
-        new KubernetesPeonLifecycleFactory(peonClient, taskLogs, smileMapper)
+        new KubernetesPeonLifecycleFactory(peonClient, taskLogs, smileMapper),
+        emitter
     );
     return runner;
   }
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
index 0bfbe1afa0..164a82b6ae 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
@@ -123,4 +123,9 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
   {
     return task.getDataSource();
   }
+
+  public Task getTask()
+  {
+    return task;
+  }
 }
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
index 2587069d75..2a234ebc57 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
@@ -23,5 +23,5 @@ import org.apache.druid.indexing.common.task.Task;
 
 public interface PeonLifecycleFactory
 {
-  KubernetesPeonLifecycle build(Task task);
+  KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener);
 }
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index d9160d31c9..1ec726f3fa 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -63,6 +63,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Mock TaskLogs taskLogs;
 
   @Mock LogWatch logWatch;
+  @Mock KubernetesPeonLifecycle.TaskStateListener stateListener;
 
   private ObjectMapper mapper;
   private Task task;
@@ -80,7 +81,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_run()
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper) {
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    )
+    {
       @Override
       protected synchronized TaskStatus join(long timeout)
       {
@@ -98,6 +106,11 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
 
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
+    EasyMock.expectLastCall().once();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+    EasyMock.expectLastCall().once();
+
     replayAll();
 
     TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L);
@@ -112,7 +125,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_run_whenCalledMultipleTimes_raisesIllegalStateException()
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper) {
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    )
+    {
       @Override
       protected synchronized TaskStatus join(long timeout)
       {
@@ -130,6 +150,11 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
 
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
 
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
+    EasyMock.expectLastCall().once();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+    EasyMock.expectLastCall().once();
+
     replayAll();
 
     peonLifecycle.run(job, 0L, 0L);
@@ -148,7 +173,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_run_whenExceptionRaised_setsRunnerTaskStateToNone()
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper) {
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    )
+    {
       @Override
       protected synchronized TaskStatus join(long timeout)
       {
@@ -164,9 +196,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
         EasyMock.eq(TimeUnit.MILLISECONDS)
     )).andReturn(null);
     EasyMock.expect(kubernetesClient.deletePeonJob(
-      new K8sTaskId(ID)
+        new K8sTaskId(ID)
     )).andReturn(true);
     Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
+    EasyMock.expectLastCall().once();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+    EasyMock.expectLastCall().once();
 
     replayAll();
 
@@ -183,7 +219,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
 
     EasyMock.expect(kubernetesClient.waitForPeonJobCompletion(
         EasyMock.eq(k8sTaskId),
@@ -194,6 +236,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
     EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+    EasyMock.expectLastCall().once();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+    EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
     EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -213,7 +259,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_join() throws IOException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
 
     Job job = new JobBuilder()
         .withNewMetadata()
@@ -237,6 +289,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
     ));
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+    EasyMock.expectLastCall().once();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+    EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
     EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -256,7 +312,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() throws IOException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
 
     Job job = new JobBuilder()
         .withNewMetadata()
@@ -282,6 +344,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
     EasyMock.expectLastCall();
     logWatch.close();
     EasyMock.expectLastCall();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+    EasyMock.expectLastCall().once();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+    EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
     EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -307,7 +373,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
 
     Job job = new JobBuilder()
         .withNewMetadata()
@@ -327,6 +399,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
     EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+    EasyMock.expectLastCall().once();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+    EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
     EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -348,7 +424,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFailedTaskStatus() throws IOException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
 
     Job job = new JobBuilder()
         .withNewMetadata()
@@ -368,6 +450,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
     EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException());
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+    EasyMock.expectLastCall().once();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+    EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
     EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -389,7 +475,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() throws IOException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
 
     Job job = new JobBuilder()
         .withNewMetadata()
@@ -411,6 +503,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
     );
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall().andThrow(new IOException());
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+    EasyMock.expectLastCall().once();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+    EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
     EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -427,11 +523,16 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
     Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState());
   }
 
-
   @Test
   public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_throwsException() throws IOException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
 
     EasyMock.expect(kubernetesClient.waitForPeonJobCompletion(
         EasyMock.eq(k8sTaskId),
@@ -443,6 +544,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
     EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
     taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
     EasyMock.expectLastCall();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+    EasyMock.expectLastCall().once();
+    stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+    EasyMock.expectLastCall().once();
     logWatch.close();
     EasyMock.expectLastCall();
 
@@ -462,14 +567,26 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_shutdown_withNotStartedTaskState()
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     peonLifecycle.shutdown();
   }
 
   @Test
   public void test_shutdown_withPendingTaskState() throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING);
 
     EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -484,7 +601,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_shutdown_withRunningTaskState() throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
 
     EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -499,7 +622,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_shutdown_withStoppedTaskState() throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
 
     peonLifecycle.shutdown();
@@ -508,7 +637,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_streamLogs_withNotStartedTaskState() throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.NOT_STARTED);
 
     peonLifecycle.streamLogs();
@@ -517,7 +652,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_streamLogs_withPendingTaskState() throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING);
 
     peonLifecycle.streamLogs();
@@ -526,7 +667,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_streamLogs_withRunningTaskState() throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
 
     EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(
@@ -543,7 +690,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   @Test
   public void test_streamLogs_withStoppedTaskState() throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
 
     peonLifecycle.streamLogs();
@@ -553,7 +706,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   public void test_getTaskLocation_withNotStartedTaskState_returnsUnknown()
       throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.NOT_STARTED);
 
     Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
@@ -563,7 +722,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   public void test_getTaskLocation_withPendingTaskState_returnsUnknown()
       throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING);
 
     Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
@@ -573,7 +738,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   public void test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnknown()
       throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
 
     EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent());
@@ -589,7 +760,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   public void test_getTaskLocation_withRunningTaskState_withPeonPodWithoutStatus_returnsUnknown()
       throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
 
     Pod pod = new PodBuilder()
@@ -611,7 +788,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_returnsLocation()
       throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
 
     Pod pod = new PodBuilder()
@@ -640,7 +823,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   public void test_getTaskLocation_saveTaskLocation()
       throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
 
     Pod pod = new PodBuilder()
@@ -669,7 +858,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithTLSAnnotation_returnsLocation()
       throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
 
     Pod pod = new PodBuilder()
@@ -699,7 +894,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
   public void test_getTaskLocation_withStoppedTaskState_returnsUnknown()
       throws NoSuchFieldException, IllegalAccessException
   {
-    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+    KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+        task,
+        kubernetesClient,
+        taskLogs,
+        mapper,
+        stateListener
+    );
     setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
 
     Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
index 9d211cffba..ba9d2accf1 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
@@ -24,6 +24,7 @@ import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskConfigBuilder;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
 import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
 import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
@@ -32,6 +33,7 @@ import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
 import org.apache.druid.tasklogs.NoopTaskLogs;
 import org.apache.druid.tasklogs.TaskLogs;
+import org.easymock.Mock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,6 +52,7 @@ public class KubernetesTaskRunnerFactoryTest
   private Properties properties;
 
   private DruidKubernetesClient druidKubernetesClient;
+  @Mock private ServiceEmitter emitter;
 
   @Before
   public void setup()
@@ -86,7 +89,8 @@ public class KubernetesTaskRunnerFactoryTest
         druidNode,
         taskConfig,
         properties,
-        druidKubernetesClient
+        druidKubernetesClient,
+        emitter
     );
 
     KubernetesTaskRunner expectedRunner = factory.build();
@@ -107,7 +111,8 @@ public class KubernetesTaskRunnerFactoryTest
         druidNode,
         taskConfig,
         properties,
-        druidKubernetesClient
+        druidKubernetesClient,
+        emitter
     );
 
     KubernetesTaskRunner runner = factory.build();
@@ -133,7 +138,8 @@ public class KubernetesTaskRunnerFactoryTest
         druidNode,
         taskConfig,
         properties,
-        druidKubernetesClient
+        druidKubernetesClient,
+        emitter
     );
 
     KubernetesTaskRunner runner = factory.build();
@@ -157,7 +163,8 @@ public class KubernetesTaskRunnerFactoryTest
         druidNode,
         taskConfig,
         props,
-        druidKubernetesClient
+        druidKubernetesClient,
+        emitter
     );
 
     KubernetesTaskRunner runner = factory.build();
@@ -186,7 +193,8 @@ public class KubernetesTaskRunnerFactoryTest
         druidNode,
         taskConfig,
         props,
-        druidKubernetesClient
+        druidKubernetesClient,
+        emitter
     );
 
     Assert.assertThrows(
@@ -216,7 +224,8 @@ public class KubernetesTaskRunnerFactoryTest
         druidNode,
         taskConfig,
         props,
-        druidKubernetesClient
+        druidKubernetesClient,
+        emitter
     );
 
     KubernetesTaskRunner runner = factory.build();
@@ -240,7 +249,8 @@ public class KubernetesTaskRunnerFactoryTest
         druidNode,
         taskConfig,
         props,
-        druidKubernetesClient
+        druidKubernetesClient,
+        emitter
     );
 
     KubernetesTaskRunner runner = factory.build();
@@ -267,7 +277,8 @@ public class KubernetesTaskRunnerFactoryTest
         druidNode,
         taskConfig,
         props,
-        druidKubernetesClient
+        druidKubernetesClient,
+        emitter
     );
 
     KubernetesTaskRunner runner = factory.build();
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 0ef053a619..0359488802 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -33,6 +33,8 @@ import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@@ -73,6 +75,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
   @Mock private TaskAdapter taskAdapter;
   @Mock private KubernetesPeonClient peonClient;
   @Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle;
+  @Mock private ServiceEmitter emitter;
 
   private KubernetesTaskRunnerConfig config;
   private KubernetesTaskRunner runner;
@@ -92,7 +95,8 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
         config,
         peonClient,
         httpClient,
-        new TestPeonLifecycleFactory(kubernetesPeonLifecycle)
+        new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+        emitter
     );
   }
 
@@ -289,7 +293,8 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
         config,
         peonClient,
         httpClient,
-        new TestPeonLifecycleFactory(kubernetesPeonLifecycle)
+        new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+        emitter
     ) {
       @Override
       protected ListenableFuture<TaskStatus> joinAsync(Task task)
@@ -325,7 +330,8 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
         config,
         peonClient,
         httpClient,
-        new TestPeonLifecycleFactory(kubernetesPeonLifecycle)
+        new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+        emitter
     ) {
       @Override
       protected ListenableFuture<TaskStatus> joinAsync(Task task)
@@ -599,4 +605,26 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
 
     verifyAll();
   }
+
+  @Test
+  public void test_metricsReported_whenTaskStateChange()
+  {
+    KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+      @Override
+      public TaskLocation getLocation()
+      {
+        return TaskLocation.unknown();
+      }
+    };
+
+    runner.tasks.put(task.getId(), workItem);
+
+    emitter.emit(EasyMock.anyObject(ServiceEventBuilder.class));
+
+    replayAll();
+
+    runner.emitTaskStateMetrics(KubernetesPeonLifecycle.State.RUNNING, task.getId());
+
+    verifyAll();
+  }
 }
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
index b272230b07..5f95177048 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
@@ -55,6 +55,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         task,
         null,
         null,
+        null,
         null
     ));
 
@@ -64,6 +65,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
             task,
             null,
             null,
+            null,
             null
         ))
     );
@@ -157,6 +159,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         task,
         null,
         null,
+        null,
         null
     ));
 
@@ -170,6 +173,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         task,
         null,
         null,
+        null,
         null
     ) {
       @Override
@@ -191,6 +195,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         task,
         null,
         null,
+        null,
         null
     ) {
       @Override
@@ -212,6 +217,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         task,
         null,
         null,
+        null,
         null
     ) {
       @Override
@@ -239,6 +245,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         task,
         null,
         null,
+        null,
         null
     ));
     Assert.assertFalse(workItem.streamTaskLogs().isPresent());
@@ -257,6 +264,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
         task,
         null,
         null,
+        null,
         null
     ));
 
@@ -274,4 +282,10 @@ public class KubernetesWorkItemTest extends EasyMockSupport
   {
     Assert.assertEquals(task.getDataSource(), workItem.getDataSource());
   }
+
+  @Test
+  public void test_getTask()
+  {
+    Assert.assertEquals(task, workItem.getTask());
+  }
 }
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
index 333b0490ba..8b8c43c0d7 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
@@ -31,7 +31,7 @@ public class TestPeonLifecycleFactory implements PeonLifecycleFactory
   }
 
   @Override
-  public KubernetesPeonLifecycle build(Task task)
+  public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener)
   {
     return kubernetesPeonLifecycle;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org