You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "georgew5656 (via GitHub)" <gi...@apache.org> on 2023/05/15 21:50:19 UTC

[GitHub] [druid] georgew5656 opened a new pull request, #14285: Fix log streaming

georgew5656 opened a new pull request, #14285:
URL: https://github.com/apache/druid/pull/14285

   Fix issues with task log streaming + pushing logs when a task in cancelled
   
   ### Description
   I was trying to fix the bug where task logs are not pushed if a task is cancelled because the job is deleted before the main thread has a chance to grab logs, but when doing this I noticed that there is a issue with streaming logs.
   
   The LogWatch fabric8 class doesn't really play well with the way we are streaming logs (it seems to keep the stream open for as long as the logWatch controller is open, so when a request to stream logs to the K8sTaskRunner is ran, it just hangs since there are technically more lines in the stream as long as the LogWatcher is still open.
   
   The LogWatcher only seems to get closed if .close is explicitly called or if the underlying k8s condition is satisfied (i think it waits for the container to be completed).
   
   The regular getLogInputStream stream returned by fabric8 works well enough for streaming task logs (it just captures a stream of the k8s logs in a moment), since the druid console will periodically call the overlordResource to refresh the logs anyways.
   
   it would probably be good if we actually used the offset parameter to the streamTaskLog in KubernetesTaskRunner instead of just returning the whole stream, I can look into that in a future PR.
   
   I also included the work to push logs from cancelled tasks in this PR, basically just having the shutdown thread try to push logs before shutting down the job and then updating the log push logic to not push if there is no log stream (so that the log doesn't get overwritten with a empty file).
   
   It would be better if we could somehow mark a job as failed, grab the logs in our regular logic, and then delete the job, but I'm not aware of a way to do this, I can look into this more later.
   
   #### Release note
   - FIx bugs around K8s Task scheduling log streaming and logging for cancelled tasks.
   
   
   <hr>
   
   ##### Key changed/added classes in this PR
   Changes are only to the kubernetes-overlord-extensions module.
   
   This PR has:
   
   - [X ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ X] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [X ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a diff in pull request #14285: Fix log streaming

Posted by "suneet-s (via GitHub)" <gi...@apache.org>.
suneet-s commented on code in PR #14285:
URL: https://github.com/apache/druid/pull/14285#discussion_r1198201183


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java:
##########
@@ -120,22 +120,46 @@ public boolean deletePeonJob(K8sTaskId taskId)
     }
   }
 
-  public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
+  public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId)
   {
     KubernetesClient k8sClient = clientApi.getClient();
     try {
       LogWatch logWatch = k8sClient.batch()
+          .v1()
+          .jobs()
+          .inNamespace(namespace)
+          .withName(taskId.getK8sTaskId())

Review Comment:
   A separate PR sounds good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] georgew5656 commented on a diff in pull request #14285: Fix log streaming

Posted by "georgew5656 (via GitHub)" <gi...@apache.org>.
georgew5656 commented on code in PR #14285:
URL: https://github.com/apache/druid/pull/14285#discussion_r1198126384


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -265,14 +268,32 @@ private TaskStatus getTaskStatus(long duration)
     return taskStatus.withDuration(duration);
   }
 
-  private void saveLogs()
+  protected void startWatchingLogs()
+  {
+    if (logWatch != null) {
+      log.debug("There is already a log watcher for %s", taskId.getOriginalTaskId());
+      return;
+    }
+    try {
+      Optional<LogWatch> maybeLogWatch = kubernetesClient.getPeonLogWatcher(taskId);
+      if (maybeLogWatch.isPresent()) {

Review Comment:
   this method would only ever be called by one thread running join on that taskId. it's possible another thread (the one running shutdown commands) runs startWatchingLogs as well, but I don't think that would be an issue since logWatch would get set twice in succession which isn't too much of an issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s merged pull request #14285: Fix log streaming

Posted by "suneet-s (via GitHub)" <gi...@apache.org>.
suneet-s merged PR #14285:
URL: https://github.com/apache/druid/pull/14285


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] georgew5656 commented on a diff in pull request #14285: Fix log streaming

Posted by "georgew5656 (via GitHub)" <gi...@apache.org>.
georgew5656 commented on code in PR #14285:
URL: https://github.com/apache/druid/pull/14285#discussion_r1198132410


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -265,14 +268,32 @@ private TaskStatus getTaskStatus(long duration)
     return taskStatus.withDuration(duration);
   }
 
-  private void saveLogs()
+  protected void startWatchingLogs()
+  {
+    if (logWatch != null) {
+      log.debug("There is already a log watcher for %s", taskId.getOriginalTaskId());
+      return;
+    }
+    try {
+      Optional<LogWatch> maybeLogWatch = kubernetesClient.getPeonLogWatcher(taskId);
+      if (maybeLogWatch.isPresent()) {
+        logWatch = maybeLogWatch.get();
+      }
+    }
+    catch (Exception e) {
+      log.error(e, "Error watching logs from task: %s", taskId);
+    }
+  }
+
+  protected void saveLogs()

Review Comment:
   hmm yeah i guess this makes sense if the k8s api throws an error or something, we would probably still want to try pushing logs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] georgew5656 commented on a diff in pull request #14285: Fix log streaming

Posted by "georgew5656 (via GitHub)" <gi...@apache.org>.
georgew5656 commented on code in PR #14285:
URL: https://github.com/apache/druid/pull/14285#discussion_r1198141850


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java:
##########
@@ -120,22 +120,46 @@ public boolean deletePeonJob(K8sTaskId taskId)
     }
   }
 
-  public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
+  public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId)
   {
     KubernetesClient k8sClient = clientApi.getClient();
     try {
       LogWatch logWatch = k8sClient.batch()
+          .v1()
+          .jobs()
+          .inNamespace(namespace)
+          .withName(taskId.getK8sTaskId())

Review Comment:
   this is a overall potential problem with the implementation (since we always look for jobs running in kubernetes by their shortened-task-id name), currently I don't think its too much of a issue because the stripped punctuation don't affect the uniqueness of task ids. the shortening can affect this, although all the task ids i've seen are either shorter than 63 characters or left pack their uniqueness (at the beginning of the task id).
   
   i'll try to think of a solution to this but i think it'll have to be a separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] georgew5656 commented on a diff in pull request #14285: Fix log streaming

Posted by "georgew5656 (via GitHub)" <gi...@apache.org>.
georgew5656 commented on code in PR #14285:
URL: https://github.com/apache/druid/pull/14285#discussion_r1198285119


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java:
##########
@@ -120,22 +120,46 @@ public boolean deletePeonJob(K8sTaskId taskId)
     }
   }
 
-  public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
+  public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId)
   {
     KubernetesClient k8sClient = clientApi.getClient();
     try {
       LogWatch logWatch = k8sClient.batch()
+          .v1()

Review Comment:
   @suneet-s for now i'm gonna push a version of this that loads everything into memory again so this at least works properly.
   
   I personally think it would be okay for us to keep a longer lived copy of KubernetesClient around somewhere to facilitate streaming but I'll have to think about that some more



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a diff in pull request #14285: Fix log streaming

Posted by "suneet-s (via GitHub)" <gi...@apache.org>.
suneet-s commented on code in PR #14285:
URL: https://github.com/apache/druid/pull/14285#discussion_r1197955503


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -78,6 +79,8 @@
   private final KubernetesPeonClient kubernetesClient;
   private final ObjectMapper mapper;
 
+  private LogWatch logWatch;

Review Comment:
   Add annotation to verify that it is set only once
   ```suggestion
     @MonotonicNonNull
     private LogWatch logWatch;
   ```



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java:
##########
@@ -120,22 +120,46 @@ public boolean deletePeonJob(K8sTaskId taskId)
     }
   }
 
-  public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
+  public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId)
   {
     KubernetesClient k8sClient = clientApi.getClient();
     try {
       LogWatch logWatch = k8sClient.batch()
+          .v1()
+          .jobs()
+          .inNamespace(namespace)
+          .withName(taskId.getK8sTaskId())

Review Comment:
   The taskId is non unique since we have to limit the name to 63 characters. If this happens, will we be watching logs from multiple peons?



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -265,14 +268,32 @@ private TaskStatus getTaskStatus(long duration)
     return taskStatus.withDuration(duration);
   }
 
-  private void saveLogs()
+  protected void startWatchingLogs()
+  {
+    if (logWatch != null) {
+      log.debug("There is already a log watcher for %s", taskId.getOriginalTaskId());
+      return;
+    }
+    try {
+      Optional<LogWatch> maybeLogWatch = kubernetesClient.getPeonLogWatcher(taskId);
+      if (maybeLogWatch.isPresent()) {
+        logWatch = maybeLogWatch.get();
+      }
+    }
+    catch (Exception e) {
+      log.error(e, "Error watching logs from task: %s", taskId);
+    }
+  }
+
+  protected void saveLogs()

Review Comment:
   I see saveLogs is only called on line 159 after `waitForPeonJobCompletion` but we do not save the logs if `waitForPeonJobCompletion` throws an exception. Does that need to be handled?



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java:
##########
@@ -120,22 +120,46 @@ public boolean deletePeonJob(K8sTaskId taskId)
     }
   }
 
-  public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
+  public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId)
   {
     KubernetesClient k8sClient = clientApi.getClient();
     try {
       LogWatch logWatch = k8sClient.batch()
+          .v1()

Review Comment:
   nit: You could use the try-with-resources pattern in java to make this code a little more concise
   
   ```
   try (KubernetesClient k8sClient = clientApi.getClient()) {
         LogWatch logWatch = k8sClient.batch()
                                      .v1()
                                      .jobs()
                                      .inNamespace(namespace)
                                      .withName(taskId.getK8sTaskId())
                                      .inContainer("main")
                                      .watchLog();
         return Optional.fromNullable(logWatch);
       }
       catch (Exception e) {
         log.error(e, "Error watching logs from task: %s", taskId);
         return Optional.absent();
       }
   ```



##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java:
##########
@@ -265,14 +268,32 @@ private TaskStatus getTaskStatus(long duration)
     return taskStatus.withDuration(duration);
   }
 
-  private void saveLogs()
+  protected void startWatchingLogs()
+  {
+    if (logWatch != null) {
+      log.debug("There is already a log watcher for %s", taskId.getOriginalTaskId());
+      return;
+    }
+    try {
+      Optional<LogWatch> maybeLogWatch = kubernetesClient.getPeonLogWatcher(taskId);
+      if (maybeLogWatch.isPresent()) {

Review Comment:
   Do you need to handle multi-threaded access here? I see the `join` method is synchronized, but I'm not sure how this class is being accessed by many threads



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] georgew5656 commented on a diff in pull request #14285: Fix log streaming

Posted by "georgew5656 (via GitHub)" <gi...@apache.org>.
georgew5656 commented on code in PR #14285:
URL: https://github.com/apache/druid/pull/14285#discussion_r1198249895


##########
extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java:
##########
@@ -120,22 +120,46 @@ public boolean deletePeonJob(K8sTaskId taskId)
     }
   }
 
-  public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
+  public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId)
   {
     KubernetesClient k8sClient = clientApi.getClient();
     try {
       LogWatch logWatch = k8sClient.batch()
+          .v1()

Review Comment:
   adding this caused issues because the client (and hence the stream) gets closed before it's read to the requester.
   current logic is no good either though because it leaves the KubernetesClient open, going to try to come up with another solution, hopefully that doesn't involve reading the entire stream into memory



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org