You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/08 06:27:00 UTC
[incubator-streampark] branch dev updated: [Improve] Log archive retention for failed tasks (#1982)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 70308883c [Improve] Log archive retention for failed tasks (#1982)
70308883c is described below
commit 70308883c3d93ce7fede74d8acb72061c7da1ea9
Author: monster <60...@users.noreply.github.com>
AuthorDate: Tue Nov 8 14:26:54 2022 +0800
[Improve] Log archive retention for failed tasks (#1982)
* [Improve] Log archive retention for failed tasks #1956
* fix
---
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 43 ++++++++++++++++------
1 file changed, 32 insertions(+), 11 deletions(-)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index d4bd7e923..037e68258 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -17,10 +17,13 @@
package org.apache.streampark.flink.kubernetes.watcher
+import com.google.common.base.Charsets
+import com.google.common.io.{FileWriteMode, Files}
import org.apache.commons.collections.CollectionUtils
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.history.FsJobArchivist
-import org.apache.streampark.common.util.Logger
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson
+import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
@@ -34,7 +37,9 @@ import org.json4s.{DefaultFormats, JNothing, JNull}
import org.json4s.JsonAST.JArray
import org.json4s.jackson.JsonMethods.parse
+import java.io.File
import java.nio.charset.StandardCharsets
+import java.util
import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
import javax.annotation.Nonnull
import javax.annotation.concurrent.ThreadSafe
@@ -433,28 +438,44 @@ private[kubernetes] object FlinkHistoryArchives {
require(jobId != null, "[StreamPark] getJobStateFromArchiveFile: JobId cannot be null.")
val archivePath = new Path(Workspace.ARCHIVES_FILE_PATH, jobId)
val archivedJson = FsJobArchivist.getArchivedJsons(archivePath)
+ var state: String = FAILED_STATE
if (CollectionUtils.isNotEmpty(archivedJson)) {
archivedJson.foreach { a =>
- if (a.getPath == "/jobs/overview") {
+ if (a.getPath == s"/jobs/$jobId/exceptions") {
Try(parse(a.getJson)) match {
case Success(ok) =>
- ok \ "jobs" match {
- case JNothing | JNull => return FAILED_STATE
+ ok \ "root-exception" match {
+ case JNothing | JNull =>
case JArray(arr) =>
arr.foreach(x => {
- val jid = (x \ "jid").extractOpt[String].orNull
- if (jid == jobId) {
- return (x \ "state").extractOpt[String].orNull
- }
+ val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
+ val path = s"${projectPath}/${jobId}_err.log"
+ val file = new File(path)
+ val log = (x \ "root-exception").extractOpt[String].orNull
+ Files.asCharSink(file, Charsets.UTF_8).write(log)
})
- case _ => return FAILED_STATE
}
- case _ => return FAILED_STATE
+ }
+ } else if (a.getPath == "/jobs/overview") {
+ Try(parse(a.getJson)) match {
+ case Success(ok) =>
+ ok \ "jobs" match {
+ case JNothing | JNull =>
+ case JArray(arr) =>
+ arr.foreach(x => {
+ val jid = (x \ "jid").extractOpt[String].orNull
+ if (jid == jobId) {
+ state = (x \ "state").extractOpt[String].orNull
+ }
+ })
+ case _ =>
+ }
+ case Failure(_) =>
}
}
}
}
- FAILED_STATE
+ state
}.getOrElse(FAILED_STATE)
}