You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "suneet-s (via GitHub)" <gi...@apache.org> on 2023/05/18 15:38:15 UTC

[GitHub] [druid] suneet-s commented on a diff in pull request #14285: Fix log streaming

suneet-s commented on code in PR #14285:
URL: https://github.com/apache/druid/pull/14285#discussion_r1197955503


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -78,6 +79,8 @@
   private final KubernetesPeonClient kubernetesClient;
   private final ObjectMapper mapper;
 
+  private LogWatch logWatch;

Review Comment:
   Add annotation to verify that it is set only once
   ```suggestion
     @MonotonicNonNull
     private LogWatch logWatch;
   ```



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java:
##########
@@ -120,22 +120,46 @@ public boolean deletePeonJob(K8sTaskId taskId)
     }
   }
 
-  public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
+  public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId)
   {
     KubernetesClient k8sClient = clientApi.getClient();
     try {
       LogWatch logWatch = k8sClient.batch()
+          .v1()
+          .jobs()
+          .inNamespace(namespace)
+          .withName(taskId.getK8sTaskId())

Review Comment:
   The taskId is non unique since we have to limit the name to 63 characters. If this happens, will we be watching logs from multiple peons?



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -265,14 +268,32 @@ private TaskStatus getTaskStatus(long duration)
     return taskStatus.withDuration(duration);
   }
 
-  private void saveLogs()
+  protected void startWatchingLogs()
+  {
+    if (logWatch != null) {
+      log.debug("There is already a log watcher for %s", taskId.getOriginalTaskId());
+      return;
+    }
+    try {
+      Optional<LogWatch> maybeLogWatch = kubernetesClient.getPeonLogWatcher(taskId);
+      if (maybeLogWatch.isPresent()) {
+        logWatch = maybeLogWatch.get();
+      }
+    }
+    catch (Exception e) {
+      log.error(e, "Error watching logs from task: %s", taskId);
+    }
+  }
+
+  protected void saveLogs()

Review Comment:
   I see saveLogs is only called on line 159 after `waitForPeonJobCompletion` but we do not save the logs if `waitForPeonJobCompletion` throws an exception. Does that need to be handled?



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java:
##########
@@ -120,22 +120,46 @@ public boolean deletePeonJob(K8sTaskId taskId)
     }
   }
 
-  public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
+  public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId)
   {
     KubernetesClient k8sClient = clientApi.getClient();
     try {
       LogWatch logWatch = k8sClient.batch()
+          .v1()

Review Comment:
   nit: You could use the try-with-resources pattern in java to make this code a little more concise
   
   ```
   try (KubernetesClient k8sClient = clientApi.getClient()) {
         LogWatch logWatch = k8sClient.batch()
                                      .v1()
                                      .jobs()
                                      .inNamespace(namespace)
                                      .withName(taskId.getK8sTaskId())
                                      .inContainer("main")
                                      .watchLog();
         return Optional.fromNullable(logWatch);
       }
       catch (Exception e) {
         log.error(e, "Error watching logs from task: %s", taskId);
         return Optional.absent();
       }
   ```



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -265,14 +268,32 @@ private TaskStatus getTaskStatus(long duration)
     return taskStatus.withDuration(duration);
   }
 
-  private void saveLogs()
+  protected void startWatchingLogs()
+  {
+    if (logWatch != null) {
+      log.debug("There is already a log watcher for %s", taskId.getOriginalTaskId());
+      return;
+    }
+    try {
+      Optional<LogWatch> maybeLogWatch = kubernetesClient.getPeonLogWatcher(taskId);
+      if (maybeLogWatch.isPresent()) {

Review Comment:
   Do you need to handle multi-threaded access here? I see the `join` method is synchronized, but I'm not sure how this class is being accessed by many threads



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org