You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2023/03/30 04:39:59 UTC

[druid] branch master updated: Fix bug in k8s task runner in handling deleted jobs (#14001)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 44abe2b96f Fix bug in k8s task runner in handling deleted jobs (#14001)
44abe2b96f is described below

commit 44abe2b96f97b5e0d4e630809ae318d32f7f3351
Author: George Shiqi Wu <ge...@berkeley.edu>
AuthorDate: Thu Mar 30 00:39:52 2023 -0400

    Fix bug in k8s task runner in handling deleted jobs (#14001)
    
    With the KubernetesTaskRunner, if a task is manually shutdown via the web console while running or the corresponding k8s job is manually deleted, the thread responsible for overseeing the task gets stuck in a loop because the fabric8 client sends one event to it that the job is null when the job is deleted, but this doesn't pass the condition.
    
    This means that the thread is stuck waiting on a fabric8 event (the job being successful) that will never come up until maxTaskDuration (default 4 hours). If a user of the extension is trying to use a limited taskqueue maxSize, this can cause problems as the k8s executor pool is unable to pick up additional tasks (since threads are stuck waiting on the old tasks that have already been deleted).
---
 .../overlord/common/DruidKubernetesPeonClient.java |  5 ++++-
 .../druid/k8s/overlord/common/JobResponse.java     |  2 +-
 .../common/DruidKubernetesPeonClientTest.java      | 26 ++++++++++++++++++----
 3 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java
index 226121d5a5..e60b3b6f8e 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClient.java
@@ -106,10 +106,13 @@ public class DruidKubernetesPeonClient implements KubernetesPeonClient
                       .inNamespace(namespace)
                       .withName(taskId.getK8sTaskId())
                       .waitUntilCondition(
-                          x -> x != null && x.getStatus() != null && x.getStatus().getActive() == null,
+                          x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null),
                           howLong,
                           unit
                       );
+      if (job == null) {
+        return new JobResponse(job, PeonPhase.FAILED);
+      }
       if (job.getStatus().getSucceeded() != null) {
         return new JobResponse(job, PeonPhase.SUCCEEDED);
       }
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
index 0148b53cce..6f39944951 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java
@@ -55,7 +55,7 @@ public class JobResponse
   {
     Optional<Long> duration = Optional.absent();
     try {
-      if (job.getStatus() != null
+      if (job != null && job.getStatus() != null
           && job.getStatus().getStartTime() != null
           && job.getStatus().getCompletionTime() != null) {
         duration = Optional.of((long) new Period(
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java
index da77d4b61e..a10bcb526e 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/DruidKubernetesPeonClientTest.java
@@ -30,8 +30,8 @@ import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
 import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
 import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
+import io.fabric8.kubernetes.api.model.batch.v1.JobStatusBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import org.junit.jupiter.api.Assertions;
@@ -60,9 +60,27 @@ public class DruidKubernetesPeonClientTest
     DruidKubernetesPeonClient client = new DruidKubernetesPeonClient(new TestKubernetesClient(this.client), "test",
                                                                      false
     );
-    Assertions.assertThrows(KubernetesClientTimeoutException.class, () -> {
-      client.waitForJobCompletion(new K8sTaskId("some-task"), 1, TimeUnit.SECONDS);
-    });
+    JobResponse jobResponse = client.waitForJobCompletion(new K8sTaskId("some-task"), 1, TimeUnit.SECONDS);
+    Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase());
+    Assertions.assertNull(jobResponse.getJob());
+  }
+
+  @Test
+  void testWaitingForAPodToGetReadySuccess()
+  {
+    DruidKubernetesPeonClient peonClient = new DruidKubernetesPeonClient(new TestKubernetesClient(this.client), "test",
+        false
+    );
+    Job job = new JobBuilder()
+        .withNewMetadata()
+        .withName("sometask")
+        .endMetadata()
+        .withStatus(new JobStatusBuilder().withActive(null).withSucceeded(1).build())
+        .build();
+    client.batch().v1().jobs().inNamespace("test").create(job);
+    JobResponse jobResponse = peonClient.waitForJobCompletion(new K8sTaskId("sometask"), 1, TimeUnit.SECONDS);
+    Assertions.assertEquals(PeonPhase.SUCCEEDED, jobResponse.getPhase());
+    Assertions.assertEquals(job.getStatus().getSucceeded(), jobResponse.getJob().getStatus().getSucceeded());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org