You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/03 09:30:13 UTC
[incubator-streampark] branch log updated: [Improve] Log archive retention for failed tasks
This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch log
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/log by this push:
new bb2e111a0 [Improve] Log archive retention for failed tasks
bb2e111a0 is described below
commit bb2e111a0f7822394e6b39ba0de51f9fd51e0d1e
Author: Monster <25...@qq.com>
AuthorDate: Thu Nov 3 17:29:49 2022 +0800
[Improve] Log archive retention for failed tasks
---
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 21 +++++++++++++++++++++
1 file changed, 21 insertions(+)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index acaec9594..6bf452ea2 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -17,6 +17,8 @@
package org.apache.streampark.flink.kubernetes.watcher
+import com.google.common.base.Charsets
+import com.google.common.io.{FileWriteMode, Files}
import org.apache.commons.collections.CollectionUtils
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.history.FsJobArchivist
@@ -34,6 +36,7 @@ import org.json4s.{DefaultFormats, JNothing, JNull}
import org.json4s.JsonAST.JArray
import org.json4s.jackson.JsonMethods.parse
+import java.io.File
import java.nio.charset.StandardCharsets
import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
import javax.annotation.Nonnull
@@ -434,6 +437,24 @@ private[kubernetes] object FlinkHistoryArchives {
val archivePath = new Path(Workspace.ARCHIVES_FILE_PATH, jobId)
val archivedJson = FsJobArchivist.getArchivedJsons(archivePath)
if (CollectionUtils.isNotEmpty(archivedJson)) {
+ archivedJson.stream
+ .filter( a => a.getPath.equals(s"/jobs/$jobId/exceptions"))
+ .map( a => {
+ Try(parse(a.getJson)) match {
+ case Success(ok) =>
+ ok \ "jobs" match {
+ case JNothing | JNull => _
+ case JArray(arr) =>
+ arr.foreach(x => {
+ val projectPath = new File("").getCanonicalPath
+ val path = s"$projectPath/$jobId.log"
+ val file = new File(path)
+ val log = (x \ "root-exception").extractOpt[String].orNull
+ Files.asCharSink(file, Charsets.UTF_8, FileWriteMode.APPEND).write(log)
+ })
+ }
+ }
+ })
archivedJson.foreach { a =>
if (a.getPath == "/jobs/overview") {
Try(parse(a.getJson)) match {