You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by kf...@apache.org on 2023/08/04 03:37:18 UTC
[druid] branch master updated: Report task/pending/time metrics for k8s based ingestion (#14698)
This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3335040b22 Report task/pending/time metrics for k8s based ingestion (#14698)
3335040b22 is described below
commit 3335040b2206588f4034bc8c6004d67c9ce75773
Author: YongGang <ma...@gmail.com>
AuthorDate: Thu Aug 3 20:37:11 2023 -0700
Report task/pending/time metrics for k8s based ingestion (#14698)
Changes:
* Add and invoke `StateListener` when state changes in `KubernetesPeonLifecycle`
* Report `task/pending/time` metric in `KubernetesTaskRunner` when state moves to RUNNING
---
.../k8s/overlord/KubernetesPeonLifecycle.java | 54 +++--
.../overlord/KubernetesPeonLifecycleFactory.java | 5 +-
.../druid/k8s/overlord/KubernetesTaskRunner.java | 42 +++-
.../k8s/overlord/KubernetesTaskRunnerFactory.java | 9 +-
.../druid/k8s/overlord/KubernetesWorkItem.java | 5 +
.../druid/k8s/overlord/PeonLifecycleFactory.java | 2 +-
.../k8s/overlord/KubernetesPeonLifecycleTest.java | 257 ++++++++++++++++++---
.../overlord/KubernetesTaskRunnerFactoryTest.java | 27 ++-
.../k8s/overlord/KubernetesTaskRunnerTest.java | 34 ++-
.../druid/k8s/overlord/KubernetesWorkItemTest.java | 14 ++
.../k8s/overlord/TestPeonLifecycleFactory.java | 2 +-
11 files changed, 384 insertions(+), 67 deletions(-)
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
index b0d483e527..302a568a23 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java
@@ -45,6 +45,7 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -60,6 +61,12 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class KubernetesPeonLifecycle
{
+ @FunctionalInterface
+ public interface TaskStateListener
+ {
+ void stateChanged(State state, String taskId);
+ }
+
private static final EmittingLogger log = new EmittingLogger(KubernetesPeonLifecycle.class);
protected enum State
@@ -79,6 +86,7 @@ public class KubernetesPeonLifecycle
private final TaskLogs taskLogs;
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
+ private final TaskStateListener stateListener;
@MonotonicNonNull
private LogWatch logWatch;
@@ -89,13 +97,15 @@ public class KubernetesPeonLifecycle
Task task,
KubernetesPeonClient kubernetesClient,
TaskLogs taskLogs,
- ObjectMapper mapper
+ ObjectMapper mapper,
+ TaskStateListener stateListener
)
{
this.taskId = new K8sTaskId(task);
this.kubernetesClient = kubernetesClient;
this.taskLogs = taskLogs;
this.mapper = mapper;
+ this.stateListener = stateListener;
}
/**
@@ -110,13 +120,7 @@ public class KubernetesPeonLifecycle
protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) throws IllegalStateException
{
try {
- Preconditions.checkState(
- state.compareAndSet(State.NOT_STARTED, State.PENDING),
- "Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]",
- taskId.getOriginalTaskId(),
- state.get(),
- State.PENDING
- );
+ updateState(new State[]{State.NOT_STARTED}, State.PENDING);
// In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation.
taskLocation = null;
@@ -134,7 +138,7 @@ public class KubernetesPeonLifecycle
throw e;
}
finally {
- state.set(State.STOPPED);
+ stopTask();
}
}
@@ -148,16 +152,7 @@ public class KubernetesPeonLifecycle
protected synchronized TaskStatus join(long timeout) throws IllegalStateException
{
try {
- Preconditions.checkState(
- (
- state.compareAndSet(State.NOT_STARTED, State.RUNNING) ||
- state.compareAndSet(State.PENDING, State.RUNNING)
- ),
- "Task [%s] failed to join: invalid peon lifecycle state transition [%s]->[%s]",
- taskId.getOriginalTaskId(),
- state.get(),
- State.RUNNING
- );
+ updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);
JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
taskId,
@@ -176,7 +171,7 @@ public class KubernetesPeonLifecycle
log.warn(e, "Task [%s] cleanup failed", taskId);
}
- state.set(State.STOPPED);
+ stopTask();
}
}
@@ -326,4 +321,23 @@ public class KubernetesPeonLifecycle
log.warn(e, "Failed to manage temporary log file for task [%s]", taskId.getOriginalTaskId());
}
}
+
+ private void stopTask()
+ {
+ if (!State.STOPPED.equals(state.get())) {
+ updateState(new State[]{State.NOT_STARTED, State.PENDING, State.RUNNING}, State.STOPPED);
+ }
+ }
+
+ private void updateState(State[] acceptedStates, State targetState)
+ {
+ Preconditions.checkState(
+ Arrays.stream(acceptedStates).anyMatch(s -> state.compareAndSet(s, targetState)),
+ "Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]",
+ taskId.getOriginalTaskId(),
+ state.get(),
+ targetState
+ );
+ stateListener.stateChanged(state.get(), taskId.getOriginalTaskId());
+ }
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
index 2f2375789b..bf4e3a7125 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java
@@ -42,13 +42,14 @@ public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory
}
@Override
- public KubernetesPeonLifecycle build(Task task)
+ public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener)
{
return new KubernetesPeonLifecycle(
task,
client,
taskLogs,
- mapper
+ mapper,
+ stateListener
);
}
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
index 12c5bb6029..89d33c7404 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java
@@ -31,16 +31,20 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@@ -48,6 +52,7 @@ import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -100,6 +105,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
private final ListeningExecutorService exec;
private final HttpClient httpClient;
private final PeonLifecycleFactory peonLifecycleFactory;
+ private final ServiceEmitter emitter;
public KubernetesTaskRunner(
@@ -107,7 +113,8 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
KubernetesTaskRunnerConfig config,
KubernetesPeonClient client,
HttpClient httpClient,
- PeonLifecycleFactory peonLifecycleFactory
+ PeonLifecycleFactory peonLifecycleFactory,
+ ServiceEmitter emitter
)
{
this.adapter = adapter;
@@ -119,6 +126,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
this.exec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
);
+ this.emitter = emitter;
}
@Override
@@ -162,7 +170,10 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
protected TaskStatus doTask(Task task, boolean run)
{
try {
- KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
+ KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
+ task,
+ this::emitTaskStateMetrics
+ );
synchronized (tasks) {
KubernetesWorkItem workItem = tasks.get(task.getId());
@@ -206,6 +217,33 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
}
}
+ @VisibleForTesting
+ protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String taskId)
+ {
+ switch (state) {
+ case RUNNING:
+ KubernetesWorkItem workItem;
+ synchronized (tasks) {
+ workItem = tasks.get(taskId);
+ if (workItem == null) {
+ log.error("Task [%s] disappeared", taskId);
+ return;
+ }
+ }
+ ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
+ IndexTaskUtils.setTaskDimensions(metricBuilder, workItem.getTask());
+ emitter.emit(
+ metricBuilder.build(
+ "task/pending/time",
+ new Duration(workItem.getCreatedTime(), DateTimes.nowUtc()).getMillis()
+ )
+ );
+ default:
+ // ignore other state transition now
+ return;
+ }
+ }
+
@Override
public void updateStatus(Task task, TaskStatus status)
{
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
index 5b63872cab..76698ba8fe 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java
@@ -28,6 +28,7 @@ import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
@@ -54,6 +55,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final TaskConfig taskConfig;
private final Properties properties;
private final DruidKubernetesClient druidKubernetesClient;
+ private final ServiceEmitter emitter;
private KubernetesTaskRunner runner;
@@ -67,7 +69,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
@Self DruidNode druidNode,
TaskConfig taskConfig,
Properties properties,
- DruidKubernetesClient druidKubernetesClient
+ DruidKubernetesClient druidKubernetesClient,
+ ServiceEmitter emitter
)
{
this.smileMapper = smileMapper;
@@ -79,6 +82,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
this.taskConfig = taskConfig;
this.properties = properties;
this.druidKubernetesClient = druidKubernetesClient;
+ this.emitter = emitter;
}
@Override
@@ -96,7 +100,8 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
kubernetesTaskRunnerConfig,
peonClient,
httpClient,
- new KubernetesPeonLifecycleFactory(peonClient, taskLogs, smileMapper)
+ new KubernetesPeonLifecycleFactory(peonClient, taskLogs, smileMapper),
+ emitter
);
return runner;
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
index 0bfbe1afa0..164a82b6ae 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java
@@ -123,4 +123,9 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
{
return task.getDataSource();
}
+
+ public Task getTask()
+ {
+ return task;
+ }
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
index 2587069d75..2a234ebc57 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java
@@ -23,5 +23,5 @@ import org.apache.druid.indexing.common.task.Task;
public interface PeonLifecycleFactory
{
- KubernetesPeonLifecycle build(Task task);
+ KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener);
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index d9160d31c9..1ec726f3fa 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -63,6 +63,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Mock TaskLogs taskLogs;
@Mock LogWatch logWatch;
+ @Mock KubernetesPeonLifecycle.TaskStateListener stateListener;
private ObjectMapper mapper;
private Task task;
@@ -80,7 +81,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_run()
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper) {
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ )
+ {
@Override
protected synchronized TaskStatus join(long timeout)
{
@@ -98,6 +106,11 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
+
replayAll();
TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L);
@@ -112,7 +125,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_run_whenCalledMultipleTimes_raisesIllegalStateException()
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper) {
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ )
+ {
@Override
protected synchronized TaskStatus join(long timeout)
{
@@ -130,6 +150,11 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
+
replayAll();
peonLifecycle.run(job, 0L, 0L);
@@ -148,7 +173,14 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_run_whenExceptionRaised_setsRunnerTaskStateToNone()
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper) {
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ )
+ {
@Override
protected synchronized TaskStatus join(long timeout)
{
@@ -164,9 +196,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(null);
EasyMock.expect(kubernetesClient.deletePeonJob(
- new K8sTaskId(ID)
+ new K8sTaskId(ID)
)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
replayAll();
@@ -183,7 +219,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
EasyMock.expect(kubernetesClient.waitForPeonJobCompletion(
EasyMock.eq(k8sTaskId),
@@ -194,6 +236,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -213,7 +259,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_join() throws IOException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
Job job = new JobBuilder()
.withNewMetadata()
@@ -237,6 +289,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
));
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -256,7 +312,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() throws IOException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
Job job = new JobBuilder()
.withNewMetadata()
@@ -282,6 +344,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expectLastCall();
logWatch.close();
EasyMock.expectLastCall();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -307,7 +373,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
Job job = new JobBuilder()
.withNewMetadata()
@@ -327,6 +399,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -348,7 +424,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFailedTaskStatus() throws IOException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
Job job = new JobBuilder()
.withNewMetadata()
@@ -368,6 +450,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -389,7 +475,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() throws IOException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
Job job = new JobBuilder()
.withNewMetadata()
@@ -411,6 +503,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
);
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall().andThrow(new IOException());
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -427,11 +523,16 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState());
}
-
@Test
public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_throwsException() throws IOException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
EasyMock.expect(kubernetesClient.waitForPeonJobCompletion(
EasyMock.eq(k8sTaskId),
@@ -443,6 +544,10 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
@@ -462,14 +567,26 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_shutdown_withNotStartedTaskState()
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
peonLifecycle.shutdown();
}
@Test
public void test_shutdown_withPendingTaskState() throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING);
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -484,7 +601,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_shutdown_withRunningTaskState() throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
@@ -499,7 +622,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_shutdown_withStoppedTaskState() throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
peonLifecycle.shutdown();
@@ -508,7 +637,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_streamLogs_withNotStartedTaskState() throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.NOT_STARTED);
peonLifecycle.streamLogs();
@@ -517,7 +652,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_streamLogs_withPendingTaskState() throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING);
peonLifecycle.streamLogs();
@@ -526,7 +667,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_streamLogs_withRunningTaskState() throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(
@@ -543,7 +690,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
@Test
public void test_streamLogs_withStoppedTaskState() throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
peonLifecycle.streamLogs();
@@ -553,7 +706,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withNotStartedTaskState_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.NOT_STARTED);
Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
@@ -563,7 +722,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withPendingTaskState_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING);
Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
@@ -573,7 +738,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent());
@@ -589,7 +760,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithoutStatus_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
Pod pod = new PodBuilder()
@@ -611,7 +788,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_returnsLocation()
throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
Pod pod = new PodBuilder()
@@ -640,7 +823,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_saveTaskLocation()
throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
Pod pod = new PodBuilder()
@@ -669,7 +858,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithTLSAnnotation_returnsLocation()
throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);
Pod pod = new PodBuilder()
@@ -699,7 +894,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void test_getTaskLocation_withStoppedTaskState_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
- KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ );
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
index 9d211cffba..ba9d2accf1 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java
@@ -24,6 +24,7 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
@@ -32,6 +33,7 @@ import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.NoopTaskLogs;
import org.apache.druid.tasklogs.TaskLogs;
+import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -50,6 +52,7 @@ public class KubernetesTaskRunnerFactoryTest
private Properties properties;
private DruidKubernetesClient druidKubernetesClient;
+ @Mock private ServiceEmitter emitter;
@Before
public void setup()
@@ -86,7 +89,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
properties,
- druidKubernetesClient
+ druidKubernetesClient,
+ emitter
);
KubernetesTaskRunner expectedRunner = factory.build();
@@ -107,7 +111,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
properties,
- druidKubernetesClient
+ druidKubernetesClient,
+ emitter
);
KubernetesTaskRunner runner = factory.build();
@@ -133,7 +138,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
properties,
- druidKubernetesClient
+ druidKubernetesClient,
+ emitter
);
KubernetesTaskRunner runner = factory.build();
@@ -157,7 +163,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
props,
- druidKubernetesClient
+ druidKubernetesClient,
+ emitter
);
KubernetesTaskRunner runner = factory.build();
@@ -186,7 +193,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
props,
- druidKubernetesClient
+ druidKubernetesClient,
+ emitter
);
Assert.assertThrows(
@@ -216,7 +224,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
props,
- druidKubernetesClient
+ druidKubernetesClient,
+ emitter
);
KubernetesTaskRunner runner = factory.build();
@@ -240,7 +249,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
props,
- druidKubernetesClient
+ druidKubernetesClient,
+ emitter
);
KubernetesTaskRunner runner = factory.build();
@@ -267,7 +277,8 @@ public class KubernetesTaskRunnerFactoryTest
druidNode,
taskConfig,
props,
- druidKubernetesClient
+ druidKubernetesClient,
+ emitter
);
KubernetesTaskRunner runner = factory.build();
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 0ef053a619..0359488802 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -33,6 +33,8 @@ import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@@ -73,6 +75,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
@Mock private TaskAdapter taskAdapter;
@Mock private KubernetesPeonClient peonClient;
@Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle;
+ @Mock private ServiceEmitter emitter;
private KubernetesTaskRunnerConfig config;
private KubernetesTaskRunner runner;
@@ -92,7 +95,8 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
config,
peonClient,
httpClient,
- new TestPeonLifecycleFactory(kubernetesPeonLifecycle)
+ new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+ emitter
);
}
@@ -289,7 +293,8 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
config,
peonClient,
httpClient,
- new TestPeonLifecycleFactory(kubernetesPeonLifecycle)
+ new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+ emitter
) {
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
@@ -325,7 +330,8 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
config,
peonClient,
httpClient,
- new TestPeonLifecycleFactory(kubernetesPeonLifecycle)
+ new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+ emitter
) {
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
@@ -599,4 +605,26 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
verifyAll();
}
+
+ @Test
+ public void test_metricsReported_whenTaskStateChange()
+ {
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+ @Override
+ public TaskLocation getLocation()
+ {
+ return TaskLocation.unknown();
+ }
+ };
+
+ runner.tasks.put(task.getId(), workItem);
+
+ emitter.emit(EasyMock.anyObject(ServiceEventBuilder.class));
+
+ replayAll();
+
+ runner.emitTaskStateMetrics(KubernetesPeonLifecycle.State.RUNNING, task.getId());
+
+ verifyAll();
+ }
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
index b272230b07..5f95177048 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
@@ -55,6 +55,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
+ null,
null
));
@@ -64,6 +65,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
+ null,
null
))
);
@@ -157,6 +159,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
+ null,
null
));
@@ -170,6 +173,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
+ null,
null
) {
@Override
@@ -191,6 +195,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
+ null,
null
) {
@Override
@@ -212,6 +217,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
+ null,
null
) {
@Override
@@ -239,6 +245,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
+ null,
null
));
Assert.assertFalse(workItem.streamTaskLogs().isPresent());
@@ -257,6 +264,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
task,
null,
null,
+ null,
null
));
@@ -274,4 +282,10 @@ public class KubernetesWorkItemTest extends EasyMockSupport
{
Assert.assertEquals(task.getDataSource(), workItem.getDataSource());
}
+
+ @Test
+ public void test_getTask()
+ {
+ Assert.assertEquals(task, workItem.getTask());
+ }
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
index 333b0490ba..8b8c43c0d7 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java
@@ -31,7 +31,7 @@ public class TestPeonLifecycleFactory implements PeonLifecycleFactory
}
@Override
- public KubernetesPeonLifecycle build(Task task)
+ public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener)
{
return kubernetesPeonLifecycle;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org