You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/08 06:27:00 UTC

[incubator-streampark] branch dev updated: [Improve] Log archive retention for failed tasks (#1982)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 70308883c  [Improve] Log archive retention for failed tasks (#1982)
70308883c is described below

commit 70308883c3d93ce7fede74d8acb72061c7da1ea9
Author: monster <60...@users.noreply.github.com>
AuthorDate: Tue Nov 8 14:26:54 2022 +0800

     [Improve] Log archive retention for failed tasks (#1982)
    
    * [Improve] Log archive retention for failed tasks #1956
    
    * fix
---
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 43 ++++++++++++++++------
 1 file changed, 32 insertions(+), 11 deletions(-)

diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index d4bd7e923..037e68258 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -17,10 +17,13 @@
 
 package org.apache.streampark.flink.kubernetes.watcher
 
+import com.google.common.base.Charsets
+import com.google.common.io.{FileWriteMode, Files}
 import org.apache.commons.collections.CollectionUtils
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.history.FsJobArchivist
-import org.apache.streampark.common.util.Logger
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson
+import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
 import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
@@ -34,7 +37,9 @@ import org.json4s.{DefaultFormats, JNothing, JNull}
 import org.json4s.JsonAST.JArray
 import org.json4s.jackson.JsonMethods.parse
 
+import java.io.File
 import java.nio.charset.StandardCharsets
+import java.util
 import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
 import javax.annotation.Nonnull
 import javax.annotation.concurrent.ThreadSafe
@@ -433,28 +438,44 @@ private[kubernetes] object FlinkHistoryArchives {
     require(jobId != null, "[StreamPark] getJobStateFromArchiveFile: JobId cannot be null.")
     val archivePath = new Path(Workspace.ARCHIVES_FILE_PATH, jobId)
     val archivedJson = FsJobArchivist.getArchivedJsons(archivePath)
+    var state: String = FAILED_STATE
     if (CollectionUtils.isNotEmpty(archivedJson)) {
       archivedJson.foreach { a =>
-        if (a.getPath == "/jobs/overview") {
+        if (a.getPath == s"/jobs/$jobId/exceptions") {
           Try(parse(a.getJson)) match {
             case Success(ok) =>
-              ok \ "jobs" match {
-                case JNothing | JNull => return FAILED_STATE
+              ok \ "root-exception" match {
+                case JNothing | JNull =>
                 case JArray(arr) =>
                   arr.foreach(x => {
-                    val jid = (x \ "jid").extractOpt[String].orNull
-                    if (jid == jobId) {
-                      return (x \ "state").extractOpt[String].orNull
-                    }
+                    val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
+                    val path = s"${projectPath}/${jobId}_err.log"
+                    val file = new File(path)
+                    val log = (x \ "root-exception").extractOpt[String].orNull
+                    Files.asCharSink(file, Charsets.UTF_8).write(log)
                   })
-                case _ => return FAILED_STATE
               }
-            case _ => return FAILED_STATE
+          }
+        } else if (a.getPath == "/jobs/overview") {
+          Try(parse(a.getJson)) match {
+            case Success(ok) =>
+              ok \ "jobs" match {
+                case JNothing | JNull =>
+                case JArray(arr) =>
+                    arr.foreach(x => {
+                      val jid = (x \ "jid").extractOpt[String].orNull
+                      if (jid == jobId) {
+                        state = (x \ "state").extractOpt[String].orNull
+                      }
+                    })
+                case _ =>
+              }
+            case Failure(_) =>
           }
         }
       }
     }
-    FAILED_STATE
+    state
   }.getOrElse(FAILED_STATE)
 
 }