You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/03 09:30:13 UTC

[incubator-streampark] branch log updated: [Improve] Log archive retention for failed tasks

This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch log
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/log by this push:
     new bb2e111a0 [Improve] Log archive retention for failed tasks
bb2e111a0 is described below

commit bb2e111a0f7822394e6b39ba0de51f9fd51e0d1e
Author: Monster <25...@qq.com>
AuthorDate: Thu Nov 3 17:29:49 2022 +0800

    [Improve] Log archive retention for failed tasks
---
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala  | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index acaec9594..6bf452ea2 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.flink.kubernetes.watcher
 
+import com.google.common.base.Charsets
+import com.google.common.io.{FileWriteMode, Files}
 import org.apache.commons.collections.CollectionUtils
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.history.FsJobArchivist
@@ -34,6 +36,7 @@ import org.json4s.{DefaultFormats, JNothing, JNull}
 import org.json4s.JsonAST.JArray
 import org.json4s.jackson.JsonMethods.parse
 
+import java.io.File
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
 import javax.annotation.Nonnull
@@ -434,6 +437,24 @@ private[kubernetes] object FlinkHistoryArchives {
     val archivePath = new Path(Workspace.ARCHIVES_FILE_PATH, jobId)
     val archivedJson = FsJobArchivist.getArchivedJsons(archivePath)
     if (CollectionUtils.isNotEmpty(archivedJson)) {
+      archivedJson.stream
+        .filter( a => a.getPath.equals(s"/jobs/$jobId/exceptions"))
+        .map( a => {
+          Try(parse(a.getJson)) match {
+            case Success(ok) =>
+              ok \ "jobs" match {
+                case JNothing | JNull => _
+                case JArray(arr) =>
+                  arr.foreach(x => {
+                    val projectPath = new File("").getCanonicalPath
+                    val path = s"$projectPath/$jobId.log"
+                    val file = new File(path)
+                    val log = (x \ "root-exception").extractOpt[String].orNull
+                    Files.asCharSink(file, Charsets.UTF_8, FileWriteMode.APPEND).write(log)
+                  })
+              }
+          }
+        })
       archivedJson.foreach { a =>
         if (a.getPath == "/jobs/overview") {
           Try(parse(a.getJson)) match {