You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "abhishekagarwal87 (via GitHub)" <gi...@apache.org> on 2023/05/04 07:33:49 UTC

[GitHub] [druid] abhishekagarwal87 commented on a diff in pull request #14156: queue tasks in kubernetes task runner if capacity is fully utilized

abhishekagarwal87 commented on code in PR #14156:
URL: https://github.com/apache/druid/pull/14156#discussion_r1182425149


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
+import org.apache.druid.k8s.overlord.common.JobResponse;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
+import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import org.apache.druid.tasklogs.TaskLogs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class KubernetesPeonLifecycle
+{
+  private static final EmittingLogger log = new EmittingLogger(KubernetesPeonLifecycle.class);
+
+  protected enum State
+  {
+    /** Lifecycle's state before {@link #run(Job, long, long)} or {@link #join(long)} is called. */
+    NOT_STARTED,
+    /** Lifecycle's state since {@link #run(Job, long, long)} is called. */
+    PENDING,
+    /** Lifecycle's state since {@link #join(long)} is called. */
+    RUNNING,
+    /** Lifecycle's state since the task has completed. */
+    STOPPED
+  }
+
+  private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);
+  private final K8sTaskId taskId;
+  private final TaskLogs taskLogs;
+  private final KubernetesPeonClient kubernetesClient;
+  private final ObjectMapper mapper;
+
+  protected KubernetesPeonLifecycle(
+      Task task,
+      KubernetesPeonClient kubernetesClient,
+      TaskLogs taskLogs,
+      ObjectMapper mapper
+  )
+  {
+    this.taskId = new K8sTaskId(task);
+    this.kubernetesClient = kubernetesClient;
+    this.taskLogs = taskLogs;
+    this.mapper = mapper;
+  }
+
+  /**
+   * Run a Kubernetes Job
+   *
+   * @param job
+   * @param launchTimeout
+   * @param timeout
+   * @return
+   * @throws IllegalStateException
+   */
+  protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) throws IllegalStateException
+  {
+    try {
+      Preconditions.checkState(
+          state.compareAndSet(State.NOT_STARTED, State.PENDING),
+          "Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]",
+          taskId.getOriginalTaskId(),
+          state.get(),
+          State.PENDING
+      );
+
+      kubernetesClient.launchPeonJobAndWaitForStart(
+          job,
+          launchTimeout,
+          TimeUnit.MILLISECONDS
+      );
+
+      return join(timeout);
+    }
+    finally {
+      state.set(State.STOPPED);
+    }
+  }
+
+  /**
+   * Join existing Kubernetes Job
+   *
+   * @param timeout
+   * @return
+   * @throws IllegalStateException
+   */
+  protected synchronized TaskStatus join(long timeout) throws IllegalStateException
+  {
+    try {
+      Preconditions.checkState(
+          (
+              state.compareAndSet(State.NOT_STARTED, State.RUNNING) ||
+              state.compareAndSet(State.PENDING, State.RUNNING)
+          ),
+          "Task [%s] failed to join: invalid peon lifecycle state transition [%s]->[%s]",
+          taskId.getOriginalTaskId(),
+          state.get(),
+          State.RUNNING
+      );
+
+      JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
+          taskId,
+          timeout,
+          TimeUnit.MILLISECONDS
+      );
+
+      saveLogs();
+
+      return getTaskStatus(jobResponse.getJobDuration());
+    }
+    finally {
+      try {
+        shutdown();
+      }
+      catch (Exception e) {
+        log.warn(e, "Task [%s] shutdown failed", taskId);
+      }
+
+      state.set(State.STOPPED);
+    }
+  }
+
+  /**
+   * Shutdown Kubernetes job and associated pods
+   *
+   * Behavior: Deletes Kubernetes job which a kill signal to the containers running in
+   * the job's associated pod.
+   *
+   * Task state will be set by the thread running the run(...) or join(...) commands
+   */
+  protected void shutdown()
+  {
+    if (State.PENDING.equals(state.get()) || State.RUNNING.equals(state.get())) {
+      kubernetesClient.deletePeonJob(taskId);
+    }
+  }
+
+  /**
+   * Stream logs from the Kubernetes pod running the peon process
+   *
+   * @return
+   */
+  protected Optional<InputStream> streamLogs()
+  {
+    if (!State.RUNNING.equals(state.get())) {
+      return Optional.absent();
+    }
+    return kubernetesClient.getPeonLogs(taskId);
+  }
+
+  /**
+   * Get peon lifecycle state
+   *
+   * @return
+   */
+  protected State getState()
+  {
+    return state.get();
+  }
+
+  /**
+   * Get task location for the Kubernetes pod running the peon process
+   *
+   * @return
+   */
+  protected TaskLocation getTaskLocation()
+  {
+    if (!State.RUNNING.equals(state.get())) {
+      return TaskLocation.unknown();
+    }
+
+    Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId);
+    if (!maybePod.isPresent()) {
+      return TaskLocation.unknown();
+    }
+
+    Pod pod = maybePod.get();
+    PodStatus podStatus = pod.getStatus();
+
+    if (podStatus == null || podStatus.getPodIP() == null) {
+      return TaskLocation.unknown();
+    }
+
+    return TaskLocation.create(
+        podStatus.getPodIP(),
+        DruidK8sConstants.PORT,
+        DruidK8sConstants.TLS_PORT,
+        Boolean.parseBoolean(pod.getMetadata()
+            .getAnnotations()
+            .getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")
+        )
+    );
+  }
+
+  private TaskStatus getTaskStatus(long duration)
+  {
+    TaskStatus taskStatus;
+    try {
+      Optional<InputStream> maybeTaskStatusStream = taskLogs.streamTaskStatus(taskId.getOriginalTaskId());
+      if (maybeTaskStatusStream.isPresent()) {
+        taskStatus = mapper.readValue(
+            IOUtils.toString(maybeTaskStatusStream.get(), StandardCharsets.UTF_8),
+            TaskStatus.class
+        );
+      } else {
+        taskStatus = TaskStatus.failure(taskId.getOriginalTaskId(), "task status not found");
+      }
+    }
+    catch (IOException e) {
+      log.error(e, "Failed to load task status for task [%s]", taskId.getOriginalTaskId());
+      taskStatus = TaskStatus.failure(
+          taskId.getOriginalTaskId(),
+          StringUtils.format("error loading status: %s", e.getMessage())
+      );
+    }
+
+    return taskStatus.withDuration(duration);
+  }
+
+  private void saveLogs()
+  {
+    try {
+      Path file = Files.createTempFile(taskId.getOriginalTaskId(), "file");
+      try {
+        Optional<InputStream> maybeLogStream = streamLogs();
+        if (maybeLogStream.isPresent()) {
+          FileUtils.copyInputStreamToFile(maybeLogStream.get(), file.toFile());
+        }

Review Comment:
   again a debug log here will be helpful. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
+import org.apache.druid.k8s.overlord.common.JobResponse;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
+import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import org.apache.druid.tasklogs.TaskLogs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class KubernetesPeonLifecycle
+{
+  private static final EmittingLogger log = new EmittingLogger(KubernetesPeonLifecycle.class);
+
+  protected enum State
+  {
+    /** Lifecycle's state before {@link #run(Job, long, long)} or {@link #join(long)} is called. */
+    NOT_STARTED,
+    /** Lifecycle's state since {@link #run(Job, long, long)} is called. */
+    PENDING,
+    /** Lifecycle's state since {@link #join(long)} is called. */
+    RUNNING,
+    /** Lifecycle's state since the task has completed. */
+    STOPPED
+  }
+
+  private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);
+  private final K8sTaskId taskId;
+  private final TaskLogs taskLogs;
+  private final KubernetesPeonClient kubernetesClient;
+  private final ObjectMapper mapper;
+
+  protected KubernetesPeonLifecycle(
+      Task task,
+      KubernetesPeonClient kubernetesClient,
+      TaskLogs taskLogs,
+      ObjectMapper mapper
+  )
+  {
+    this.taskId = new K8sTaskId(task);
+    this.kubernetesClient = kubernetesClient;
+    this.taskLogs = taskLogs;
+    this.mapper = mapper;
+  }
+
+  /**
+   * Run a Kubernetes Job
+   *
+   * @param job
+   * @param launchTimeout
+   * @param timeout
+   * @return
+   * @throws IllegalStateException
+   */
+  protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) throws IllegalStateException
+  {
+    try {
+      Preconditions.checkState(
+          state.compareAndSet(State.NOT_STARTED, State.PENDING),
+          "Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]",
+          taskId.getOriginalTaskId(),
+          state.get(),
+          State.PENDING
+      );
+
+      kubernetesClient.launchPeonJobAndWaitForStart(
+          job,
+          launchTimeout,
+          TimeUnit.MILLISECONDS
+      );
+
+      return join(timeout);
+    }
+    finally {
+      state.set(State.STOPPED);
+    }
+  }
+
+  /**
+   * Join existing Kubernetes Job
+   *
+   * @param timeout
+   * @return
+   * @throws IllegalStateException
+   */
+  protected synchronized TaskStatus join(long timeout) throws IllegalStateException
+  {
+    try {
+      Preconditions.checkState(
+          (
+              state.compareAndSet(State.NOT_STARTED, State.RUNNING) ||
+              state.compareAndSet(State.PENDING, State.RUNNING)
+          ),
+          "Task [%s] failed to join: invalid peon lifecycle state transition [%s]->[%s]",
+          taskId.getOriginalTaskId(),
+          state.get(),
+          State.RUNNING
+      );
+
+      JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
+          taskId,
+          timeout,
+          TimeUnit.MILLISECONDS
+      );
+
+      saveLogs();
+
+      return getTaskStatus(jobResponse.getJobDuration());
+    }
+    finally {
+      try {
+        shutdown();
+      }
+      catch (Exception e) {
+        log.warn(e, "Task [%s] shutdown failed", taskId);
+      }
+
+      state.set(State.STOPPED);
+    }
+  }
+
+  /**
+   * Shutdown Kubernetes job and associated pods
+   *
+   * Behavior: Deletes Kubernetes job which a kill signal to the containers running in
+   * the job's associated pod.
+   *
+   * Task state will be set by the thread running the run(...) or join(...) commands
+   */
+  protected void shutdown()
+  {
+    if (State.PENDING.equals(state.get()) || State.RUNNING.equals(state.get())) {
+      kubernetesClient.deletePeonJob(taskId);
+    }
+  }
+
+  /**
+   * Stream logs from the Kubernetes pod running the peon process
+   *
+   * @return
+   */
+  protected Optional<InputStream> streamLogs()
+  {
+    if (!State.RUNNING.equals(state.get())) {
+      return Optional.absent();
+    }
+    return kubernetesClient.getPeonLogs(taskId);
+  }
+
+  /**
+   * Get peon lifecycle state
+   *
+   * @return
+   */
+  protected State getState()
+  {
+    return state.get();
+  }
+
+  /**
+   * Get task location for the Kubernetes pod running the peon process
+   *
+   * @return
+   */
+  protected TaskLocation getTaskLocation()
+  {
+    if (!State.RUNNING.equals(state.get())) {
+      return TaskLocation.unknown();
+    }
+
+    Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId);
+    if (!maybePod.isPresent()) {
+      return TaskLocation.unknown();
+    }
+
+    Pod pod = maybePod.get();
+    PodStatus podStatus = pod.getStatus();
+
+    if (podStatus == null || podStatus.getPodIP() == null) {
+      return TaskLocation.unknown();
+    }
+
+    return TaskLocation.create(
+        podStatus.getPodIP(),
+        DruidK8sConstants.PORT,
+        DruidK8sConstants.TLS_PORT,
+        Boolean.parseBoolean(pod.getMetadata()
+            .getAnnotations()
+            .getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")
+        )
+    );
+  }
+
+  private TaskStatus getTaskStatus(long duration)
+  {
+    TaskStatus taskStatus;
+    try {
+      Optional<InputStream> maybeTaskStatusStream = taskLogs.streamTaskStatus(taskId.getOriginalTaskId());
+      if (maybeTaskStatusStream.isPresent()) {
+        taskStatus = mapper.readValue(
+            IOUtils.toString(maybeTaskStatusStream.get(), StandardCharsets.UTF_8),
+            TaskStatus.class
+        );
+      } else {
+        taskStatus = TaskStatus.failure(taskId.getOriginalTaskId(), "task status not found");
+      }
+    }
+    catch (IOException e) {
+      log.error(e, "Failed to load task status for task [%s]", taskId.getOriginalTaskId());
+      taskStatus = TaskStatus.failure(
+          taskId.getOriginalTaskId(),
+          StringUtils.format("error loading status: %s", e.getMessage())
+      );
+    }
+
+    return taskStatus.withDuration(duration);
+  }
+
+  private void saveLogs()
+  {
+    try {
+      Path file = Files.createTempFile(taskId.getOriginalTaskId(), "file");

Review Comment:
   why not name the file as "log"?



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
+import org.apache.druid.k8s.overlord.common.JobResponse;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
+import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import org.apache.druid.tasklogs.TaskLogs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class KubernetesPeonLifecycle
+{
+  private static final EmittingLogger log = new EmittingLogger(KubernetesPeonLifecycle.class);
+
+  protected enum State
+  {
+    /** Lifecycle's state before {@link #run(Job, long, long)} or {@link #join(long)} is called. */
+    NOT_STARTED,
+    /** Lifecycle's state since {@link #run(Job, long, long)} is called. */
+    PENDING,
+    /** Lifecycle's state since {@link #join(long)} is called. */
+    RUNNING,
+    /** Lifecycle's state since the task has completed. */
+    STOPPED
+  }
+
+  private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);
+  private final K8sTaskId taskId;
+  private final TaskLogs taskLogs;
+  private final KubernetesPeonClient kubernetesClient;
+  private final ObjectMapper mapper;
+
+  protected KubernetesPeonLifecycle(
+      Task task,
+      KubernetesPeonClient kubernetesClient,
+      TaskLogs taskLogs,
+      ObjectMapper mapper
+  )
+  {
+    this.taskId = new K8sTaskId(task);
+    this.kubernetesClient = kubernetesClient;
+    this.taskLogs = taskLogs;
+    this.mapper = mapper;
+  }
+
+  /**
+   * Run a Kubernetes Job
+   *
+   * @param job
+   * @param launchTimeout
+   * @param timeout
+   * @return
+   * @throws IllegalStateException
+   */
+  protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) throws IllegalStateException
+  {
+    try {
+      Preconditions.checkState(
+          state.compareAndSet(State.NOT_STARTED, State.PENDING),
+          "Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]",
+          taskId.getOriginalTaskId(),
+          state.get(),
+          State.PENDING
+      );
+
+      kubernetesClient.launchPeonJobAndWaitForStart(
+          job,
+          launchTimeout,
+          TimeUnit.MILLISECONDS
+      );
+
+      return join(timeout);
+    }
+    finally {
+      state.set(State.STOPPED);
+    }
+  }
+
+  /**
+   * Join existing Kubernetes Job
+   *
+   * @param timeout
+   * @return
+   * @throws IllegalStateException
+   */
+  protected synchronized TaskStatus join(long timeout) throws IllegalStateException
+  {
+    try {
+      Preconditions.checkState(
+          (
+              state.compareAndSet(State.NOT_STARTED, State.RUNNING) ||
+              state.compareAndSet(State.PENDING, State.RUNNING)
+          ),
+          "Task [%s] failed to join: invalid peon lifecycle state transition [%s]->[%s]",
+          taskId.getOriginalTaskId(),
+          state.get(),
+          State.RUNNING
+      );
+
+      JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
+          taskId,
+          timeout,
+          TimeUnit.MILLISECONDS
+      );
+
+      saveLogs();
+
+      return getTaskStatus(jobResponse.getJobDuration());
+    }
+    finally {
+      try {
+        shutdown();
+      }
+      catch (Exception e) {
+        log.warn(e, "Task [%s] shutdown failed", taskId);
+      }
+
+      state.set(State.STOPPED);
+    }
+  }
+
+  /**
+   * Shutdown Kubernetes job and associated pods
+   *
+   * Behavior: Deletes Kubernetes job which a kill signal to the containers running in
+   * the job's associated pod.
+   *
+   * Task state will be set by the thread running the run(...) or join(...) commands
+   */
+  protected void shutdown()
+  {
+    if (State.PENDING.equals(state.get()) || State.RUNNING.equals(state.get())) {
+      kubernetesClient.deletePeonJob(taskId);
+    }
+  }
+
+  /**
+   * Stream logs from the Kubernetes pod running the peon process
+   *
+   * @return
+   */
+  protected Optional<InputStream> streamLogs()
+  {
+    if (!State.RUNNING.equals(state.get())) {
+      return Optional.absent();
+    }
+    return kubernetesClient.getPeonLogs(taskId);
+  }
+
+  /**
+   * Get peon lifecycle state
+   *
+   * @return
+   */
+  protected State getState()
+  {
+    return state.get();
+  }
+
+  /**
+   * Get task location for the Kubernetes pod running the peon process
+   *
+   * @return
+   */
+  protected TaskLocation getTaskLocation()
+  {
+    if (!State.RUNNING.equals(state.get())) {
+      return TaskLocation.unknown();

Review Comment:
   some debug logging here will be useful in case we see a problem where task location is somehow always unknown. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
+import org.apache.druid.k8s.overlord.common.JobResponse;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
+import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import org.apache.druid.tasklogs.TaskLogs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class KubernetesPeonLifecycle
+{
+  private static final EmittingLogger log = new EmittingLogger(KubernetesPeonLifecycle.class);
+
+  protected enum State
+  {
+    /** Lifecycle's state before {@link #run(Job, long, long)} or {@link #join(long)} is called. */
+    NOT_STARTED,
+    /** Lifecycle's state since {@link #run(Job, long, long)} is called. */
+    PENDING,
+    /** Lifecycle's state since {@link #join(long)} is called. */
+    RUNNING,
+    /** Lifecycle's state since the task has completed. */
+    STOPPED
+  }
+
+  private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);
+  private final K8sTaskId taskId;
+  private final TaskLogs taskLogs;
+  private final KubernetesPeonClient kubernetesClient;
+  private final ObjectMapper mapper;
+
+  protected KubernetesPeonLifecycle(

Review Comment:
   is this class expected to be overridden? what is the reason for methods in this class to be marked protected? 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
+import org.apache.druid.k8s.overlord.common.JobResponse;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
+import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import org.apache.druid.tasklogs.TaskLogs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class KubernetesPeonLifecycle

Review Comment:
   Please add some javadocs here to understand what this class does



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
+import org.apache.druid.k8s.overlord.common.JobResponse;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
+import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import org.apache.druid.tasklogs.TaskLogs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class KubernetesPeonLifecycle
+{
+  private static final EmittingLogger log = new EmittingLogger(KubernetesPeonLifecycle.class);
+
+  protected enum State
+  {
+    /** Lifecycle's state before {@link #run(Job, long, long)} or {@link #join(long)} is called. */
+    NOT_STARTED,
+    /** Lifecycle's state since {@link #run(Job, long, long)} is called. */
+    PENDING,
+    /** Lifecycle's state since {@link #join(long)} is called. */
+    RUNNING,
+    /** Lifecycle's state since the task has completed. */
+    STOPPED
+  }
+
+  private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);
+  private final K8sTaskId taskId;
+  private final TaskLogs taskLogs;
+  private final KubernetesPeonClient kubernetesClient;
+  private final ObjectMapper mapper;
+
+  protected KubernetesPeonLifecycle(
+      Task task,
+      KubernetesPeonClient kubernetesClient,
+      TaskLogs taskLogs,
+      ObjectMapper mapper
+  )
+  {
+    this.taskId = new K8sTaskId(task);
+    this.kubernetesClient = kubernetesClient;
+    this.taskLogs = taskLogs;
+    this.mapper = mapper;
+  }
+
+  /**
+   * Run a Kubernetes Job
+   *
+   * @param job
+   * @param launchTimeout
+   * @param timeout
+   * @return
+   * @throws IllegalStateException
+   */
+  protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) throws IllegalStateException
+  {
+    try {
+      Preconditions.checkState(
+          state.compareAndSet(State.NOT_STARTED, State.PENDING),
+          "Task [%s] failed to run: invalid peon lifecycle state transition [%s]->[%s]",
+          taskId.getOriginalTaskId(),
+          state.get(),
+          State.PENDING
+      );
+
+      kubernetesClient.launchPeonJobAndWaitForStart(

Review Comment:
   maybe not in this PR but if there is a timeout exception here, we should try to kill the pod if any was launched. maybe inside the implementation of this method itself. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -89,174 +69,128 @@
 
 /**
  * Runs tasks as k8s jobs using the "internal peon" verb.
- * One additional feature of this class is that kubernetes is the source of truth, so if you launch a task
- * shutdown druid, bring up druid, the task will keep running and the state will be updated when the cluster
- * comes back.  Thus while no tasks are technically restorable, all tasks once launched will run in isolation to the
- * extent possible without requiring the overlord consistently up during their lifetime.
  */
-
 public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
 {
-
   private static final EmittingLogger log = new EmittingLogger(KubernetesTaskRunner.class);
   private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
 
   // to cleanup old jobs that might not have been deleted.
   private final ScheduledExecutorService cleanupExecutor;
 
-  protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new ConcurrentHashMap<>();
+  protected final ConcurrentHashMap<String, KubernetesWorkItem> tasks = new ConcurrentHashMap<>();
   protected final TaskAdapter adapter;
-  protected final KubernetesPeonClient client;
 
-  private final ObjectMapper mapper;
-  private final KubernetesTaskRunnerConfig k8sConfig;
-  private final TaskQueueConfig taskQueueConfig;
-  private final TaskLogs taskLogs;
+  private final KubernetesPeonClient client;
+  private final KubernetesTaskRunnerConfig config;
   private final ListeningExecutorService exec;
   private final HttpClient httpClient;
+  private final PeonLifecycleFactory peonLifecycleFactory;
 
 
   public KubernetesTaskRunner(
-      ObjectMapper mapper,
       TaskAdapter adapter,
-      KubernetesTaskRunnerConfig k8sConfig,
-      TaskQueueConfig taskQueueConfig,
-      TaskLogs taskLogs,
+      KubernetesTaskRunnerConfig config,
       KubernetesPeonClient client,
-      HttpClient httpClient
+      HttpClient httpClient,
+      PeonLifecycleFactory peonLifecycleFactory
   )
   {
-    this.mapper = mapper;
     this.adapter = adapter;
-    this.k8sConfig = k8sConfig;
-    this.taskQueueConfig = taskQueueConfig;
-    this.taskLogs = taskLogs;
+    this.config = config;
     this.client = client;
     this.httpClient = httpClient;
+    this.peonLifecycleFactory = peonLifecycleFactory;
     this.cleanupExecutor = Executors.newScheduledThreadPool(1);
     this.exec = MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(taskQueueConfig.getMaxSize(), "k8s-task-runner-%d")
-    );
-    Preconditions.checkArgument(
-        taskQueueConfig.getMaxSize() < Integer.MAX_VALUE,
-        "The task queue bounds how many concurrent k8s tasks you can have"
+        Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
     );
   }
 
-
   @Override
   public Optional<InputStream> streamTaskLog(String taskid, long offset)
   {
-    return client.getPeonLogs(new K8sTaskId(taskid));
+    KubernetesWorkItem workItem = tasks.get(taskid);
+    if (workItem == null) {
+      return Optional.absent();
+    }
+    return workItem.streamTaskLogs();
   }
 
   @Override
   public ListenableFuture<TaskStatus> run(Task task)
   {
     synchronized (tasks) {
-      tasks.computeIfAbsent(
-          task.getId(), k -> new K8sWorkItem(
-              client,
-              task,
-              exec.submit(() -> {
-                K8sTaskId k8sTaskId = new K8sTaskId(task);
-                try {
-                  JobResponse completedPhase;
-                  Optional<Job> existingJob = client.jobExists(k8sTaskId);
-                  if (!existingJob.isPresent()) {
-                    Job job = adapter.fromTask(task);
-                    log.info("Job created %s and ready to launch", k8sTaskId);
-                    Pod peonPod = client.launchJobAndWaitForStart(
-                        job,
-                        KubernetesTaskRunnerConfig.toMilliseconds(k8sConfig.k8sjobLaunchTimeout),
-                        TimeUnit.MILLISECONDS
-                    );
-                    log.info("Job %s launched in k8s", k8sTaskId);
-                    completedPhase = monitorJob(peonPod, k8sTaskId);
-                  } else {
-                    Job job = existingJob.get();
-                    if (job.getStatus().getActive() == null) {
-                      if (job.getStatus().getSucceeded() != null) {
-                        completedPhase = new JobResponse(job, PeonPhase.SUCCEEDED);
-                      } else {
-                        completedPhase = new JobResponse(job, PeonPhase.FAILED);
-                      }
-                    } else {
-                      // the job is active lets monitor it
-                      completedPhase = monitorJob(k8sTaskId);
-                    }
-                  }
-                  TaskStatus status = getTaskStatus(k8sTaskId, completedPhase);
-                  if (completedPhase.getJobDuration().isPresent()) {
-                    status = status.withDuration(completedPhase.getJobDuration().get());
-                  }
-                  updateStatus(task, status);
-                  return status;
-                }
-                catch (Exception e) {
-                  log.error(e, "Error with task: %s", k8sTaskId);
-                  throw e;
-                }
-                finally {
-                  // publish task logs
-                  Path log = Files.createTempFile(task.getId(), "log");
-                  try {
-                    Optional<InputStream> logStream = client.getPeonLogs(new K8sTaskId(task.getId()));
-                    if (logStream.isPresent()) {
-                      FileUtils.copyInputStreamToFile(logStream.get(), log.toFile());
-                    }
-                    taskLogs.pushTaskLog(task.getId(), log.toFile());
-                  }
-                  finally {
-                    Files.deleteIfExists(log);
-                  }
-                  client.cleanUpJob(new K8sTaskId(task.getId()));
-                  synchronized (tasks) {
-                    tasks.remove(task.getId());
-                  }
-                }
-              })
-          ));
+      tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))));
+      return tasks.get(task.getId()).getResult();
+    }
+  }
+
+  protected ListenableFuture<TaskStatus> join(Task task)

Review Comment:
   can this be renamed to `joinAsync`? We use similar convention elsewhere. E.g. `stopAsync` / `stop` methods in SeekableStreamSupervisor. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -318,61 +261,56 @@ public Optional<InputStream> streamTaskReports(String taskid) throws IOException
   @Override
   public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
   {
-    return ImmutableList.of();
+    List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = new ArrayList<>();
+    for (Job job : client.getPeonJobs()) {
+      try {
+        Task task = adapter.toTask(job);
+        tasks.add(Pair.of(task, join(task)));
+      }
+      catch (IOException e) {
+        log.error("Error deserializing task from job [%s]", job.getMetadata().getName());

Review Comment:
   please log the original exception as well. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java:
##########
@@ -19,37 +19,218 @@
 
 package org.apache.druid.k8s.overlord.common;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.LogWatch;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 
 import java.io.InputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * A Kubernetes client wrapper to assist with peon task managment.
- * It provides a high level api to retreive jobs, launch jobs, delete jobs and various other
- * tasks like getting task logs, listing all active tasks.
- */
-public interface KubernetesPeonClient
+public class KubernetesPeonClient
 {
+  private static final EmittingLogger log = new EmittingLogger(KubernetesPeonClient.class);
+
+  private final KubernetesClientApi clientApi;
+  private final String namespace;
+  private final boolean debugJobs;
+
+  public KubernetesPeonClient(KubernetesClientApi clientApi, String namespace, boolean debugJobs)
+  {
+    this.clientApi = clientApi;
+    this.namespace = namespace;
+    this.debugJobs = debugJobs;
+  }
+
+  public Pod launchPeonJobAndWaitForStart(Job job, long howLong, TimeUnit timeUnit)
+  {
+    long start = System.currentTimeMillis();
+    // launch job
+    return clientApi.executeRequest(client -> {
+      client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
+      K8sTaskId taskId = new K8sTaskId(job.getMetadata().getName());
+      log.info("Successfully submitted job: %s ... waiting for job to launch", taskId);
+      // wait until the pod is running or complete or failed, any of those is fine
+      Pod mainPod = getPeonPodWithRetries(taskId);
+      Pod result = client.pods().inNamespace(namespace).withName(mainPod.getMetadata().getName())
+                         .waitUntilCondition(pod -> {
+                           if (pod == null) {
+                             return false;
+                           }
+                           return pod.getStatus() != null && pod.getStatus().getPodIP() != null;
+                         }, howLong, timeUnit);
+      long duration = System.currentTimeMillis() - start;
+      log.info("Took task %s %d ms for pod to startup", taskId, duration);
+      return result;
+    });
+  }
 
-  Optional<Job> jobExists(K8sTaskId taskId);
+  public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, TimeUnit unit)
+  {
+    return clientApi.executeRequest(client -> {
+      Job job = client.batch()
+                      .v1()
+                      .jobs()
+                      .inNamespace(namespace)
+                      .withName(taskId.getK8sTaskId())
+                      .waitUntilCondition(
+                          x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null
+                          && (x.getStatus().getFailed() != null || x.getStatus().getSucceeded() != null)),
+                          howLong,
+                          unit
+                      );
+      if (job == null) {
+        log.info("K8s job for the task [%s] was not found. It can happen if the task was canceled", taskId);
+        return new JobResponse(null, PeonPhase.FAILED);
+      }
+      if (job.getStatus().getSucceeded() != null) {
+        return new JobResponse(job, PeonPhase.SUCCEEDED);
+      }
+      log.warn("Task %s failed with status %s", taskId, job.getStatus());
+      return new JobResponse(job, PeonPhase.FAILED);
+    });
+  }
 
-  Pod launchJobAndWaitForStart(Job job, long howLong, TimeUnit timeUnit);
+  public boolean deletePeonJob(K8sTaskId taskId)
+  {
+    if (!debugJobs) {
+      Boolean result = clientApi.executeRequest(client -> !client.batch()
+                                                                 .v1()
+                                                                 .jobs()
+                                                                 .inNamespace(namespace)
+                                                                 .withName(taskId.getK8sTaskId())
+                                                                 .delete().isEmpty());
+      if (result) {
+        log.info("Cleaned up k8s task: %s", taskId);
+      } else {
+        log.info("K8s task does not exist: %s", taskId);
+      }
+      return result;
+    } else {
+      log.info("Not cleaning up task %s due to flag: debugJobs=true", taskId);
+      return true;
+    }
+  }
 
-  JobResponse waitForJobCompletion(K8sTaskId taskId, long howLong, TimeUnit timeUnit);
+  public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
+  {
+    KubernetesClient k8sClient = clientApi.getClient();
+    try {
+      LogWatch logWatch = k8sClient.batch()
+                                   .v1()
+                                   .jobs()
+                                   .inNamespace(namespace)
+                                   .withName(taskId.getK8sTaskId())
+                                   .inContainer("main")
+                                   .watchLog();
+      if (logWatch == null) {
+        k8sClient.close();
+        return Optional.absent();
+      }
+      return Optional.of(new LogWatchInputStream(k8sClient, logWatch));
+    }
+    catch (Exception e) {
+      log.error(e, "Error streaming logs from task: %s", taskId);
+      k8sClient.close();
+      return Optional.absent();
+    }
+  }
 
-  boolean cleanUpJob(K8sTaskId taskId);
+  public List<Job> getPeonJobs()
+  {
+    return clientApi.executeRequest(client -> client.batch()
+                                                    .v1()
+                                                    .jobs()
+                                                    .inNamespace(namespace)
+                                                    .withLabel(DruidK8sConstants.LABEL_KEY)
+                                                    .list()
+                                                    .getItems());
+  }
 
-  Optional<InputStream> getPeonLogs(K8sTaskId taskId);
+  public int deleteCompletedPeonJobsOlderThan(long howFarBack, TimeUnit timeUnit)
+  {
+    AtomicInteger numDeleted = new AtomicInteger();
+    return clientApi.executeRequest(client -> {
+      List<Job> jobs = getJobsToCleanup(getPeonJobs(), howFarBack, timeUnit);
+      jobs.forEach(x -> {
+        if (!client.batch()
+                   .v1()
+                   .jobs()
+                   .inNamespace(namespace)
+                   .withName(x.getMetadata().getName())
+                   .delete().isEmpty()) {
+          numDeleted.incrementAndGet();

Review Comment:
   we should log a warning if deleting a job failed. 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java:
##########
@@ -19,37 +19,218 @@
 
 package org.apache.druid.k8s.overlord.common;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.LogWatch;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 
 import java.io.InputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * A Kubernetes client wrapper to assist with peon task managment.
- * It provides a high level api to retreive jobs, launch jobs, delete jobs and various other
- * tasks like getting task logs, listing all active tasks.
- */
-public interface KubernetesPeonClient
+public class KubernetesPeonClient
 {
+  private static final EmittingLogger log = new EmittingLogger(KubernetesPeonClient.class);
+
+  private final KubernetesClientApi clientApi;
+  private final String namespace;
+  private final boolean debugJobs;
+
+  public KubernetesPeonClient(KubernetesClientApi clientApi, String namespace, boolean debugJobs)
+  {
+    this.clientApi = clientApi;
+    this.namespace = namespace;
+    this.debugJobs = debugJobs;
+  }
+
+  public Pod launchPeonJobAndWaitForStart(Job job, long howLong, TimeUnit timeUnit)
+  {
+    long start = System.currentTimeMillis();
+    // launch job
+    return clientApi.executeRequest(client -> {
+      client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
+      K8sTaskId taskId = new K8sTaskId(job.getMetadata().getName());
+      log.info("Successfully submitted job: %s ... waiting for job to launch", taskId);
+      // wait until the pod is running or complete or failed, any of those is fine
+      Pod mainPod = getPeonPodWithRetries(taskId);
+      Pod result = client.pods().inNamespace(namespace).withName(mainPod.getMetadata().getName())
+                         .waitUntilCondition(pod -> {
+                           if (pod == null) {
+                             return false;
+                           }
+                           return pod.getStatus() != null && pod.getStatus().getPodIP() != null;
+                         }, howLong, timeUnit);
+      long duration = System.currentTimeMillis() - start;
+      log.info("Took task %s %d ms for pod to startup", taskId, duration);

Review Comment:
   https://github.com/apache/druid/blob/master/dev/style-conventions.md#message-formatting-logs-and-exceptions



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -89,174 +69,128 @@
 
 /**
  * Runs tasks as k8s jobs using the "internal peon" verb.
- * One additional feature of this class is that kubernetes is the source of truth, so if you launch a task
- * shutdown druid, bring up druid, the task will keep running and the state will be updated when the cluster
- * comes back.  Thus while no tasks are technically restorable, all tasks once launched will run in isolation to the
- * extent possible without requiring the overlord consistently up during their lifetime.
  */
-
 public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
 {
-
   private static final EmittingLogger log = new EmittingLogger(KubernetesTaskRunner.class);
   private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
 
   // to cleanup old jobs that might not have been deleted.
   private final ScheduledExecutorService cleanupExecutor;
 
-  protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new ConcurrentHashMap<>();
+  protected final ConcurrentHashMap<String, KubernetesWorkItem> tasks = new ConcurrentHashMap<>();
   protected final TaskAdapter adapter;
-  protected final KubernetesPeonClient client;
 
-  private final ObjectMapper mapper;
-  private final KubernetesTaskRunnerConfig k8sConfig;
-  private final TaskQueueConfig taskQueueConfig;
-  private final TaskLogs taskLogs;
+  private final KubernetesPeonClient client;
+  private final KubernetesTaskRunnerConfig config;
   private final ListeningExecutorService exec;
   private final HttpClient httpClient;
+  private final PeonLifecycleFactory peonLifecycleFactory;
 
 
   public KubernetesTaskRunner(
-      ObjectMapper mapper,
       TaskAdapter adapter,
-      KubernetesTaskRunnerConfig k8sConfig,
-      TaskQueueConfig taskQueueConfig,
-      TaskLogs taskLogs,
+      KubernetesTaskRunnerConfig config,
       KubernetesPeonClient client,
-      HttpClient httpClient
+      HttpClient httpClient,
+      PeonLifecycleFactory peonLifecycleFactory
   )
   {
-    this.mapper = mapper;
     this.adapter = adapter;
-    this.k8sConfig = k8sConfig;
-    this.taskQueueConfig = taskQueueConfig;
-    this.taskLogs = taskLogs;
+    this.config = config;
     this.client = client;
     this.httpClient = httpClient;
+    this.peonLifecycleFactory = peonLifecycleFactory;
     this.cleanupExecutor = Executors.newScheduledThreadPool(1);
     this.exec = MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(taskQueueConfig.getMaxSize(), "k8s-task-runner-%d")
-    );
-    Preconditions.checkArgument(
-        taskQueueConfig.getMaxSize() < Integer.MAX_VALUE,
-        "The task queue bounds how many concurrent k8s tasks you can have"
+        Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
     );
   }
 
-
   @Override
   public Optional<InputStream> streamTaskLog(String taskid, long offset)
   {
-    return client.getPeonLogs(new K8sTaskId(taskid));
+    KubernetesWorkItem workItem = tasks.get(taskid);
+    if (workItem == null) {
+      return Optional.absent();
+    }
+    return workItem.streamTaskLogs();
   }
 
   @Override
   public ListenableFuture<TaskStatus> run(Task task)
   {
     synchronized (tasks) {
-      tasks.computeIfAbsent(
-          task.getId(), k -> new K8sWorkItem(
-              client,
-              task,
-              exec.submit(() -> {
-                K8sTaskId k8sTaskId = new K8sTaskId(task);
-                try {
-                  JobResponse completedPhase;
-                  Optional<Job> existingJob = client.jobExists(k8sTaskId);
-                  if (!existingJob.isPresent()) {
-                    Job job = adapter.fromTask(task);
-                    log.info("Job created %s and ready to launch", k8sTaskId);
-                    Pod peonPod = client.launchJobAndWaitForStart(
-                        job,
-                        KubernetesTaskRunnerConfig.toMilliseconds(k8sConfig.k8sjobLaunchTimeout),
-                        TimeUnit.MILLISECONDS
-                    );
-                    log.info("Job %s launched in k8s", k8sTaskId);
-                    completedPhase = monitorJob(peonPod, k8sTaskId);
-                  } else {
-                    Job job = existingJob.get();
-                    if (job.getStatus().getActive() == null) {
-                      if (job.getStatus().getSucceeded() != null) {
-                        completedPhase = new JobResponse(job, PeonPhase.SUCCEEDED);
-                      } else {
-                        completedPhase = new JobResponse(job, PeonPhase.FAILED);
-                      }
-                    } else {
-                      // the job is active lets monitor it
-                      completedPhase = monitorJob(k8sTaskId);
-                    }
-                  }
-                  TaskStatus status = getTaskStatus(k8sTaskId, completedPhase);
-                  if (completedPhase.getJobDuration().isPresent()) {
-                    status = status.withDuration(completedPhase.getJobDuration().get());
-                  }
-                  updateStatus(task, status);
-                  return status;
-                }
-                catch (Exception e) {
-                  log.error(e, "Error with task: %s", k8sTaskId);
-                  throw e;
-                }
-                finally {
-                  // publish task logs
-                  Path log = Files.createTempFile(task.getId(), "log");
-                  try {
-                    Optional<InputStream> logStream = client.getPeonLogs(new K8sTaskId(task.getId()));
-                    if (logStream.isPresent()) {
-                      FileUtils.copyInputStreamToFile(logStream.get(), log.toFile());
-                    }
-                    taskLogs.pushTaskLog(task.getId(), log.toFile());
-                  }
-                  finally {
-                    Files.deleteIfExists(log);
-                  }
-                  client.cleanUpJob(new K8sTaskId(task.getId()));
-                  synchronized (tasks) {
-                    tasks.remove(task.getId());
-                  }
-                }
-              })
-          ));
+      tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))));
+      return tasks.get(task.getId()).getResult();
+    }
+  }
+
+  protected ListenableFuture<TaskStatus> join(Task task)
+  {
+    synchronized (tasks) {
+      tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))));
       return tasks.get(task.getId()).getResult();
     }
   }
 
-  JobResponse monitorJob(K8sTaskId k8sTaskId)
+  private TaskStatus runTask(Task task)
   {
-    return monitorJob(client.getMainJobPod(k8sTaskId), k8sTaskId);
+    return doTask(task, true);
   }
 
-  JobResponse monitorJob(Pod peonPod, K8sTaskId k8sTaskId)
+  private TaskStatus joinTask(Task task)
   {
-    if (peonPod == null) {
-      throw new ISE("Error in k8s launching peon pod for task %s", k8sTaskId);
-    }
-    return client.waitForJobCompletion(
-        k8sTaskId,
-        KubernetesTaskRunnerConfig.toMilliseconds(k8sConfig.maxTaskDuration),
-        TimeUnit.MILLISECONDS
-    );
+    return doTask(task, false);
   }
 
-  private TaskStatus getTaskStatus(K8sTaskId task, JobResponse jobResponse) throws IOException
+  @VisibleForTesting
+  protected TaskStatus doTask(Task task, boolean run)
   {
-    Optional<InputStream> maybeTaskStatusStream = taskLogs.streamTaskStatus(task.getOriginalTaskId());
-    if (maybeTaskStatusStream.isPresent()) {
-      String taskStatus = IOUtils.toString(maybeTaskStatusStream.get(), StandardCharsets.UTF_8);
-      return mapper.readValue(taskStatus, TaskStatus.class);
-    } else if (PeonPhase.SUCCEEDED.equals(jobResponse.getPhase())) {
-      // fallback to behavior before the introduction of task status streaming for backwards compatibility
-      return TaskStatus.success(task.getOriginalTaskId());
-    } else if (Objects.isNull(jobResponse.getJob())) {
-      return TaskStatus.failure(
-          task.getOriginalTaskId(),
-          StringUtils.format("Task [%s] failed kubernetes job disappeared before completion", task.getOriginalTaskId())
-      );
-    } else {
-      return TaskStatus.failure(
-          task.getOriginalTaskId(),
-          StringUtils.format("Task [%s] failed", task.getOriginalTaskId())
-      );
+    KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
+
+    synchronized (tasks) {
+      KubernetesWorkItem workItem = tasks.get(task.getId());
+
+      if (workItem == null) {
+        throw new ISE("Task [%s] disappeared", task.getId());
+      }
+
+      if (workItem.isShutdownRequested()) {
+        throw new ISE("Task [%s] has been shut down", task.getId());
+      }
+
+      workItem.setKubernetesPeonLifecycle(peonLifecycle);
+    }
+
+    try {
+      TaskStatus taskStatus;
+      if (run) {
+        taskStatus = peonLifecycle.run(
+            adapter.fromTask(task),
+            config.getTaskLaunchTimeout().toStandardDuration().getMillis(),
+            config.getTaskTimeout().toStandardDuration().getMillis()
+        );
+      } else {
+        taskStatus = peonLifecycle.join(
+            config.getTaskTimeout().toStandardDuration().getMillis()
+        );
+      }
+
+      updateStatus(task, taskStatus);
+
+      return taskStatus;
+    }
+
+    catch (Exception e) {
+      log.error(e, "Task [%s] execution caught an exception", task.getId());

Review Comment:
   do we cleanup the task when such an exception happens? 



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -89,174 +69,128 @@
 
 /**
  * Runs tasks as k8s jobs using the "internal peon" verb.
- * One additional feature of this class is that kubernetes is the source of truth, so if you launch a task

Review Comment:
   lets use this PR itself if we can. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org