You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "georgew5656 (via GitHub)" <gi...@apache.org> on 2023/05/05 16:37:11 UTC

[GitHub] [druid] georgew5656 commented on a diff in pull request #14156: queue tasks in kubernetes task runner if capacity is fully utilized

georgew5656 commented on code in PR #14156:
URL: https://github.com/apache/druid/pull/14156#discussion_r1186294614


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -89,174 +69,128 @@
 
 /**
  * Runs tasks as k8s jobs using the "internal peon" verb.
- * One additional feature of this class is that kubernetes is the source of truth, so if you launch a task
- * shutdown druid, bring up druid, the task will keep running and the state will be updated when the cluster
- * comes back.  Thus while no tasks are technically restorable, all tasks once launched will run in isolation to the
- * extent possible without requiring the overlord consistently up during their lifetime.
  */
-
 public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
 {
-
   private static final EmittingLogger log = new EmittingLogger(KubernetesTaskRunner.class);
   private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
 
   // to cleanup old jobs that might not have been deleted.
   private final ScheduledExecutorService cleanupExecutor;
 
-  protected final ConcurrentHashMap<String, K8sWorkItem> tasks = new ConcurrentHashMap<>();
+  protected final ConcurrentHashMap<String, KubernetesWorkItem> tasks = new ConcurrentHashMap<>();
   protected final TaskAdapter adapter;
-  protected final KubernetesPeonClient client;
 
-  private final ObjectMapper mapper;
-  private final KubernetesTaskRunnerConfig k8sConfig;
-  private final TaskQueueConfig taskQueueConfig;
-  private final TaskLogs taskLogs;
+  private final KubernetesPeonClient client;
+  private final KubernetesTaskRunnerConfig config;
   private final ListeningExecutorService exec;
   private final HttpClient httpClient;
+  private final PeonLifecycleFactory peonLifecycleFactory;
 
 
   public KubernetesTaskRunner(
-      ObjectMapper mapper,
       TaskAdapter adapter,
-      KubernetesTaskRunnerConfig k8sConfig,
-      TaskQueueConfig taskQueueConfig,
-      TaskLogs taskLogs,
+      KubernetesTaskRunnerConfig config,
       KubernetesPeonClient client,
-      HttpClient httpClient
+      HttpClient httpClient,
+      PeonLifecycleFactory peonLifecycleFactory
   )
   {
-    this.mapper = mapper;
     this.adapter = adapter;
-    this.k8sConfig = k8sConfig;
-    this.taskQueueConfig = taskQueueConfig;
-    this.taskLogs = taskLogs;
+    this.config = config;
     this.client = client;
     this.httpClient = httpClient;
+    this.peonLifecycleFactory = peonLifecycleFactory;
     this.cleanupExecutor = Executors.newScheduledThreadPool(1);
     this.exec = MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(taskQueueConfig.getMaxSize(), "k8s-task-runner-%d")
-    );
-    Preconditions.checkArgument(
-        taskQueueConfig.getMaxSize() < Integer.MAX_VALUE,
-        "The task queue bounds how many concurrent k8s tasks you can have"
+        Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
     );
   }
 
-
   @Override
   public Optional<InputStream> streamTaskLog(String taskid, long offset)
   {
-    return client.getPeonLogs(new K8sTaskId(taskid));
+    KubernetesWorkItem workItem = tasks.get(taskid);
+    if (workItem == null) {
+      return Optional.absent();
+    }
+    return workItem.streamTaskLogs();
   }
 
   @Override
   public ListenableFuture<TaskStatus> run(Task task)
   {
     synchronized (tasks) {
-      tasks.computeIfAbsent(
-          task.getId(), k -> new K8sWorkItem(
-              client,
-              task,
-              exec.submit(() -> {
-                K8sTaskId k8sTaskId = new K8sTaskId(task);
-                try {
-                  JobResponse completedPhase;
-                  Optional<Job> existingJob = client.jobExists(k8sTaskId);
-                  if (!existingJob.isPresent()) {
-                    Job job = adapter.fromTask(task);
-                    log.info("Job created %s and ready to launch", k8sTaskId);
-                    Pod peonPod = client.launchJobAndWaitForStart(
-                        job,
-                        KubernetesTaskRunnerConfig.toMilliseconds(k8sConfig.k8sjobLaunchTimeout),
-                        TimeUnit.MILLISECONDS
-                    );
-                    log.info("Job %s launched in k8s", k8sTaskId);
-                    completedPhase = monitorJob(peonPod, k8sTaskId);
-                  } else {
-                    Job job = existingJob.get();
-                    if (job.getStatus().getActive() == null) {
-                      if (job.getStatus().getSucceeded() != null) {
-                        completedPhase = new JobResponse(job, PeonPhase.SUCCEEDED);
-                      } else {
-                        completedPhase = new JobResponse(job, PeonPhase.FAILED);
-                      }
-                    } else {
-                      // the job is active lets monitor it
-                      completedPhase = monitorJob(k8sTaskId);
-                    }
-                  }
-                  TaskStatus status = getTaskStatus(k8sTaskId, completedPhase);
-                  if (completedPhase.getJobDuration().isPresent()) {
-                    status = status.withDuration(completedPhase.getJobDuration().get());
-                  }
-                  updateStatus(task, status);
-                  return status;
-                }
-                catch (Exception e) {
-                  log.error(e, "Error with task: %s", k8sTaskId);
-                  throw e;
-                }
-                finally {
-                  // publish task logs
-                  Path log = Files.createTempFile(task.getId(), "log");
-                  try {
-                    Optional<InputStream> logStream = client.getPeonLogs(new K8sTaskId(task.getId()));
-                    if (logStream.isPresent()) {
-                      FileUtils.copyInputStreamToFile(logStream.get(), log.toFile());
-                    }
-                    taskLogs.pushTaskLog(task.getId(), log.toFile());
-                  }
-                  finally {
-                    Files.deleteIfExists(log);
-                  }
-                  client.cleanUpJob(new K8sTaskId(task.getId()));
-                  synchronized (tasks) {
-                    tasks.remove(task.getId());
-                  }
-                }
-              })
-          ));
+      tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))));
+      return tasks.get(task.getId()).getResult();
+    }
+  }
+
+  protected ListenableFuture<TaskStatus> join(Task task)
+  {
+    synchronized (tasks) {
+      tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))));
       return tasks.get(task.getId()).getResult();
     }
   }
 
-  JobResponse monitorJob(K8sTaskId k8sTaskId)
+  private TaskStatus runTask(Task task)
   {
-    return monitorJob(client.getMainJobPod(k8sTaskId), k8sTaskId);
+    return doTask(task, true);
   }
 
-  JobResponse monitorJob(Pod peonPod, K8sTaskId k8sTaskId)
+  private TaskStatus joinTask(Task task)
   {
-    if (peonPod == null) {
-      throw new ISE("Error in k8s launching peon pod for task %s", k8sTaskId);
-    }
-    return client.waitForJobCompletion(
-        k8sTaskId,
-        KubernetesTaskRunnerConfig.toMilliseconds(k8sConfig.maxTaskDuration),
-        TimeUnit.MILLISECONDS
-    );
+    return doTask(task, false);
   }
 
-  private TaskStatus getTaskStatus(K8sTaskId task, JobResponse jobResponse) throws IOException
+  @VisibleForTesting
+  protected TaskStatus doTask(Task task, boolean run)
   {
-    Optional<InputStream> maybeTaskStatusStream = taskLogs.streamTaskStatus(task.getOriginalTaskId());
-    if (maybeTaskStatusStream.isPresent()) {
-      String taskStatus = IOUtils.toString(maybeTaskStatusStream.get(), StandardCharsets.UTF_8);
-      return mapper.readValue(taskStatus, TaskStatus.class);
-    } else if (PeonPhase.SUCCEEDED.equals(jobResponse.getPhase())) {
-      // fallback to behavior before the introduction of task status streaming for backwards compatibility
-      return TaskStatus.success(task.getOriginalTaskId());
-    } else if (Objects.isNull(jobResponse.getJob())) {
-      return TaskStatus.failure(
-          task.getOriginalTaskId(),
-          StringUtils.format("Task [%s] failed kubernetes job disappeared before completion", task.getOriginalTaskId())
-      );
-    } else {
-      return TaskStatus.failure(
-          task.getOriginalTaskId(),
-          StringUtils.format("Task [%s] failed", task.getOriginalTaskId())
-      );
+    KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
+
+    synchronized (tasks) {
+      KubernetesWorkItem workItem = tasks.get(task.getId());
+
+      if (workItem == null) {
+        throw new ISE("Task [%s] disappeared", task.getId());
+      }
+
+      if (workItem.isShutdownRequested()) {
+        throw new ISE("Task [%s] has been shut down", task.getId());
+      }
+
+      workItem.setKubernetesPeonLifecycle(peonLifecycle);
+    }
+
+    try {
+      TaskStatus taskStatus;
+      if (run) {
+        taskStatus = peonLifecycle.run(
+            adapter.fromTask(task),
+            config.getTaskLaunchTimeout().toStandardDuration().getMillis(),
+            config.getTaskTimeout().toStandardDuration().getMillis()
+        );
+      } else {
+        taskStatus = peonLifecycle.join(
+            config.getTaskTimeout().toStandardDuration().getMillis()
+        );
+      }
+
+      updateStatus(task, taskStatus);
+
+      return taskStatus;
+    }
+
+    catch (Exception e) {
+      log.error(e, "Task [%s] execution caught an exception", task.getId());

Review Comment:
   In the join method of the KubernetesPeonLifecycle class, there's a finally block that cleans up the K8s Job. this will get run even if the main try logic throws an error.
   
   
   if there's some problem with this logic in the lifecycle class (like k8s being offline or something), there is a cleanup executor that delete jobs that have been around for a while
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org