You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/25 07:08:55 UTC

[incubator-streampark] 01/01: [Bug] flink-job archivefile parse bug fixed

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch yarn-session
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit a199c211ac0fa50131e8e110fb60bdf98ca2de58
Author: benjobs <be...@apache.org>
AuthorDate: Fri Nov 25 15:08:36 2022 +0800

    [Bug] flink-job archivefile parse bug fixed
---
 .../common/util/SystemPropertyUtils.scala          |  3 ++
 .../helper/KubernetesDeploymentHelper.scala        | 16 ++++--
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 36 +++++--------
 .../d933fa6c785f0db6dccc6cc05dd43bab.json          |  1 +
 .../flink/kubernetes/FlinkRestJsonTest.scala       | 62 ++++++++++++++++++++++
 .../flink/core/FlinkStreamingInitializer.scala     |  2 -
 6 files changed, 92 insertions(+), 28 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
index 211e04683..ce1f7c94b 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
@@ -119,4 +119,7 @@ object SystemPropertyUtils extends Logger {
       SystemPropertyUtils.set(key, appHome)
     }
   }
+
+  def getTmpdir() : String = get("java.io.tmpdir", "temp")
+
 }
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index e9922385b..994fdf220 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -83,8 +83,7 @@ object KubernetesDeploymentHelper extends Logger {
 
   def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String): String = {
     tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
-      val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
-      val path = s"$projectPath/$jobId.log"
+      val path = KubernetesDeploymentHelper.getJobLog(jobId)
       val file = new File(path)
       val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
       Files.asCharSink(file, Charsets.UTF_8).write(log)
@@ -96,8 +95,7 @@ object KubernetesDeploymentHelper extends Logger {
     tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
       Try {
         val podName = getPods(nameSpace, jobName).head.getMetadata.getName
-        val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
-        val path = s"$projectPath/${jobId}_err.log"
+        val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
         val file = new File(path)
         val log = client.pods.inNamespace(nameSpace).withName(podName).terminated().withPrettyOutput.getLog
         Files.asCharSink(file, Charsets.UTF_8).write(log)
@@ -118,4 +116,14 @@ object KubernetesDeploymentHelper extends Logger {
     }
   }
 
+  private[kubernetes] def getJobLog(jobId: String): String = {
+    val tmpPath = SystemPropertyUtils.getTmpdir()
+    s"$tmpPath/$jobId.log"
+  }
+
+  private[kubernetes] def getJobErrorLog(jobId: String): String = {
+    val tmpPath = SystemPropertyUtils.getTmpdir()
+    s"$tmpPath/${jobId}_err.log"
+  }
+
 }
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 037e68258..7a12aa4f2 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -18,12 +18,11 @@
 package org.apache.streampark.flink.kubernetes.watcher
 
 import com.google.common.base.Charsets
-import com.google.common.io.{FileWriteMode, Files}
+import com.google.common.io.Files
 import org.apache.commons.collections.CollectionUtils
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.history.FsJobArchivist
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson
-import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
+import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
 import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
@@ -39,7 +38,6 @@ import org.json4s.jackson.JsonMethods.parse
 
 import java.io.File
 import java.nio.charset.StandardCharsets
-import java.util
 import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
 import javax.annotation.Nonnull
 import javax.annotation.concurrent.ThreadSafe
@@ -96,7 +94,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
    * single flink job status tracking task
    */
   override def doWatch(): Unit = {
-    this.synchronized{
+    this.synchronized {
       logInfo("[FlinkJobStatusWatcher]: Status monitoring process begins - " + Thread.currentThread().getName)
       // get all legal tracking ids
       val trackIds = Try(trackController.collectAllTrackIds()).filter(_.nonEmpty).getOrElse(return)
@@ -444,17 +442,11 @@ private[kubernetes] object FlinkHistoryArchives {
         if (a.getPath == s"/jobs/$jobId/exceptions") {
           Try(parse(a.getJson)) match {
             case Success(ok) =>
-              ok \ "root-exception" match {
-                case JNothing | JNull =>
-                case JArray(arr) =>
-                  arr.foreach(x => {
-                    val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
-                    val path = s"${projectPath}/${jobId}_err.log"
-                    val file = new File(path)
-                    val log = (x \ "root-exception").extractOpt[String].orNull
-                    Files.asCharSink(file, Charsets.UTF_8).write(log)
-                  })
-              }
+              val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+              val file = new File(path)
+              val log = (ok \ "root-exception").extractOpt[String].orNull
+              Files.asCharSink(file, Charsets.UTF_8).write(log)
+            case _ =>
           }
         } else if (a.getPath == "/jobs/overview") {
           Try(parse(a.getJson)) match {
@@ -462,12 +454,12 @@ private[kubernetes] object FlinkHistoryArchives {
               ok \ "jobs" match {
                 case JNothing | JNull =>
                 case JArray(arr) =>
-                    arr.foreach(x => {
-                      val jid = (x \ "jid").extractOpt[String].orNull
-                      if (jid == jobId) {
-                        state = (x \ "state").extractOpt[String].orNull
-                      }
-                    })
+                  arr.foreach(x => {
+                    val jid = (x \ "jid").extractOpt[String].orNull
+                    if (jid == jobId) {
+                      state = (x \ "state").extractOpt[String].orNull
+                    }
+                  })
                 case _ =>
               }
             case Failure(_) =>
diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json b/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json
new file mode 100644
index 000000000..af2c1f6b6
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json
@@ -0,0 +1 @@
+{"archive":[{"path":"/jobs/overview","json":"{\"jobs\":[{\"jid\":\"d933fa6c785f0db6dccc6cc05dd43bab\",\"name\":\"test555\",\"state\":\"FAILED\",\"start-time\":1669348683915,\"end-time\":1669348702523,\"duration\":18608,\"last-modification\":1669348702523,\"tasks\":{\"total\":1,\"created\":0,\"scheduled\":0,\"deploying\":0,\"running\":0,\"finished\":0,\"canceling\":0,\"canceled\":0,\"failed\":1,\"reconciling\":0,\"initializing\":0}}]}"},{"path":"/jobs/d933fa6c785f0db6dccc6cc05dd43bab/conf [...]
\ No newline at end of file
diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index 300989d20..b0d3ed301 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -16,8 +16,22 @@
  */
 package org.apache.streampark.flink.kubernetes
 
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.apache.commons.collections.CollectionUtils
+import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.history.FsJobArchivist
+import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.streampark.flink.kubernetes.watcher.{Checkpoint, FlinkRestJmConfigItem, FlinkRestOverview, JobDetails}
+import org.json4s.DefaultFormats
 import org.junit.jupiter.api.Test
+import org.json4s.{JNothing, JNull}
+import org.json4s.JsonAST.JArray
+import org.json4s.jackson.JsonMethods.parse
+
+import java.io.File
+import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
+import scala.util.{Failure, Success, Try}
 
 // scalastyle:off println
 class FlinkRestJsonTest {
@@ -280,4 +294,52 @@ class FlinkRestJsonTest {
     println(ingressMeta.get)
   }
 
+  @Test def testHistoryArchives(): Unit = {
+
+    @transient
+    implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+
+
+    val state = Try {
+      val archivePath = new Path("src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json")
+      val jobId = "d933fa6c785f0db6dccc6cc05dd43bab"
+      val archivedJson = FsJobArchivist.getArchivedJsons(archivePath)
+      var state: String = "FAILED"
+      if (CollectionUtils.isNotEmpty(archivedJson)) {
+        archivedJson.foreach { a =>
+          if (a.getPath == s"/jobs/$jobId/exceptions") {
+            Try(parse(a.getJson)) match {
+              case Success(ok) =>
+                val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+                val file = new File(path)
+                val log = (ok \ "root-exception").extractOpt[String].orNull
+                Files.asCharSink(file, Charsets.UTF_8).write(log)
+                println(" error path: " + path)
+              case _ =>
+            }
+          } else if (a.getPath == "/jobs/overview") {
+            Try(parse(a.getJson)) match {
+              case Success(ok) =>
+                ok \ "jobs" match {
+                  case JNothing | JNull =>
+                  case JArray(arr) =>
+                    arr.foreach(x => {
+                      val jid = (x \ "jid").extractOpt[String].orNull
+                      if (jid == jobId) {
+                        state = (x \ "state").extractOpt[String].orNull
+                      }
+                    })
+                  case _ =>
+                }
+              case Failure(_) =>
+            }
+          }
+        }
+      }
+      state
+    }.getOrElse("FAILED")
+
+    println(state)
+  }
+
 }
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 8189a034c..3e9676d54 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -81,9 +81,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
   def initParameter(): FlinkConfiguration = {
     val argsMap = ParameterTool.fromArgs(args)
     val config = argsMap.get(KEY_APP_CONF(), null) match {
-      // scalastyle:off throwerror
       case null | "" => throw new ExceptionInInitializerError("[StreamPark] Usage:can't fond config,please set \"--conf $path \" in main arguments")
-      // scalastyle:on throwerror
       case file => file
     }
     val configMap = parseConfig(config)