You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/25 07:08:54 UTC

[incubator-streampark] branch yarn-session created (now a199c211a)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a change to branch yarn-session
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


      at a199c211a [Bug] flink-job archivefile parse bug fixed

This branch includes the following new commits:

     new a199c211a [Bug] flink-job archivefile parse bug fixed

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-streampark] 01/01: [Bug] flink-job archivefile parse bug fixed

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch yarn-session
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit a199c211ac0fa50131e8e110fb60bdf98ca2de58
Author: benjobs <be...@apache.org>
AuthorDate: Fri Nov 25 15:08:36 2022 +0800

    [Bug] flink-job archivefile parse bug fixed
---
 .../common/util/SystemPropertyUtils.scala          |  3 ++
 .../helper/KubernetesDeploymentHelper.scala        | 16 ++++--
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 36 +++++--------
 .../d933fa6c785f0db6dccc6cc05dd43bab.json          |  1 +
 .../flink/kubernetes/FlinkRestJsonTest.scala       | 62 ++++++++++++++++++++++
 .../flink/core/FlinkStreamingInitializer.scala     |  2 -
 6 files changed, 92 insertions(+), 28 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
index 211e04683..ce1f7c94b 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
@@ -119,4 +119,7 @@ object SystemPropertyUtils extends Logger {
       SystemPropertyUtils.set(key, appHome)
     }
   }
+
+  def getTmpdir() : String = get("java.io.tmpdir", "temp")
+
 }
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index e9922385b..994fdf220 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -83,8 +83,7 @@ object KubernetesDeploymentHelper extends Logger {
 
   def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String): String = {
     tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
-      val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
-      val path = s"$projectPath/$jobId.log"
+      val path = KubernetesDeploymentHelper.getJobLog(jobId)
       val file = new File(path)
       val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
       Files.asCharSink(file, Charsets.UTF_8).write(log)
@@ -96,8 +95,7 @@ object KubernetesDeploymentHelper extends Logger {
     tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
       Try {
         val podName = getPods(nameSpace, jobName).head.getMetadata.getName
-        val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
-        val path = s"$projectPath/${jobId}_err.log"
+        val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
         val file = new File(path)
         val log = client.pods.inNamespace(nameSpace).withName(podName).terminated().withPrettyOutput.getLog
         Files.asCharSink(file, Charsets.UTF_8).write(log)
@@ -118,4 +116,14 @@ object KubernetesDeploymentHelper extends Logger {
     }
   }
 
+  private[kubernetes] def getJobLog(jobId: String): String = {
+    val tmpPath = SystemPropertyUtils.getTmpdir()
+    s"$tmpPath/$jobId.log"
+  }
+
+  private[kubernetes] def getJobErrorLog(jobId: String): String = {
+    val tmpPath = SystemPropertyUtils.getTmpdir()
+    s"$tmpPath/${jobId}_err.log"
+  }
+
 }
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 037e68258..7a12aa4f2 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -18,12 +18,11 @@
 package org.apache.streampark.flink.kubernetes.watcher
 
 import com.google.common.base.Charsets
-import com.google.common.io.{FileWriteMode, Files}
+import com.google.common.io.Files
 import org.apache.commons.collections.CollectionUtils
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.history.FsJobArchivist
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson
-import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
+import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
 import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
@@ -39,7 +38,6 @@ import org.json4s.jackson.JsonMethods.parse
 
 import java.io.File
 import java.nio.charset.StandardCharsets
-import java.util
 import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
 import javax.annotation.Nonnull
 import javax.annotation.concurrent.ThreadSafe
@@ -96,7 +94,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
    * single flink job status tracking task
    */
   override def doWatch(): Unit = {
-    this.synchronized{
+    this.synchronized {
       logInfo("[FlinkJobStatusWatcher]: Status monitoring process begins - " + Thread.currentThread().getName)
       // get all legal tracking ids
       val trackIds = Try(trackController.collectAllTrackIds()).filter(_.nonEmpty).getOrElse(return)
@@ -444,17 +442,11 @@ private[kubernetes] object FlinkHistoryArchives {
         if (a.getPath == s"/jobs/$jobId/exceptions") {
           Try(parse(a.getJson)) match {
             case Success(ok) =>
-              ok \ "root-exception" match {
-                case JNothing | JNull =>
-                case JArray(arr) =>
-                  arr.foreach(x => {
-                    val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
-                    val path = s"${projectPath}/${jobId}_err.log"
-                    val file = new File(path)
-                    val log = (x \ "root-exception").extractOpt[String].orNull
-                    Files.asCharSink(file, Charsets.UTF_8).write(log)
-                  })
-              }
+              val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+              val file = new File(path)
+              val log = (ok \ "root-exception").extractOpt[String].orNull
+              Files.asCharSink(file, Charsets.UTF_8).write(log)
+            case _ =>
           }
         } else if (a.getPath == "/jobs/overview") {
           Try(parse(a.getJson)) match {
@@ -462,12 +454,12 @@ private[kubernetes] object FlinkHistoryArchives {
               ok \ "jobs" match {
                 case JNothing | JNull =>
                 case JArray(arr) =>
-                    arr.foreach(x => {
-                      val jid = (x \ "jid").extractOpt[String].orNull
-                      if (jid == jobId) {
-                        state = (x \ "state").extractOpt[String].orNull
-                      }
-                    })
+                  arr.foreach(x => {
+                    val jid = (x \ "jid").extractOpt[String].orNull
+                    if (jid == jobId) {
+                      state = (x \ "state").extractOpt[String].orNull
+                    }
+                  })
                 case _ =>
               }
             case Failure(_) =>
diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json b/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json
new file mode 100644
index 000000000..af2c1f6b6
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json
@@ -0,0 +1 @@
+{"archive":[{"path":"/jobs/overview","json":"{\"jobs\":[{\"jid\":\"d933fa6c785f0db6dccc6cc05dd43bab\",\"name\":\"test555\",\"state\":\"FAILED\",\"start-time\":1669348683915,\"end-time\":1669348702523,\"duration\":18608,\"last-modification\":1669348702523,\"tasks\":{\"total\":1,\"created\":0,\"scheduled\":0,\"deploying\":0,\"running\":0,\"finished\":0,\"canceling\":0,\"canceled\":0,\"failed\":1,\"reconciling\":0,\"initializing\":0}}]}"},{"path":"/jobs/d933fa6c785f0db6dccc6cc05dd43bab/conf [...]
\ No newline at end of file
diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index 300989d20..b0d3ed301 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -16,8 +16,22 @@
  */
 package org.apache.streampark.flink.kubernetes
 
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.apache.commons.collections.CollectionUtils
+import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.history.FsJobArchivist
+import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.streampark.flink.kubernetes.watcher.{Checkpoint, FlinkRestJmConfigItem, FlinkRestOverview, JobDetails}
+import org.json4s.DefaultFormats
 import org.junit.jupiter.api.Test
+import org.json4s.{JNothing, JNull}
+import org.json4s.JsonAST.JArray
+import org.json4s.jackson.JsonMethods.parse
+
+import java.io.File
+import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
+import scala.util.{Failure, Success, Try}
 
 // scalastyle:off println
 class FlinkRestJsonTest {
@@ -280,4 +294,52 @@ class FlinkRestJsonTest {
     println(ingressMeta.get)
   }
 
+  @Test def testHistoryArchives(): Unit = {
+
+    @transient
+    implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+
+
+    val state = Try {
+      val archivePath = new Path("src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json")
+      val jobId = "d933fa6c785f0db6dccc6cc05dd43bab"
+      val archivedJson = FsJobArchivist.getArchivedJsons(archivePath)
+      var state: String = "FAILED"
+      if (CollectionUtils.isNotEmpty(archivedJson)) {
+        archivedJson.foreach { a =>
+          if (a.getPath == s"/jobs/$jobId/exceptions") {
+            Try(parse(a.getJson)) match {
+              case Success(ok) =>
+                val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+                val file = new File(path)
+                val log = (ok \ "root-exception").extractOpt[String].orNull
+                Files.asCharSink(file, Charsets.UTF_8).write(log)
+                println(" error path: " + path)
+              case _ =>
+            }
+          } else if (a.getPath == "/jobs/overview") {
+            Try(parse(a.getJson)) match {
+              case Success(ok) =>
+                ok \ "jobs" match {
+                  case JNothing | JNull =>
+                  case JArray(arr) =>
+                    arr.foreach(x => {
+                      val jid = (x \ "jid").extractOpt[String].orNull
+                      if (jid == jobId) {
+                        state = (x \ "state").extractOpt[String].orNull
+                      }
+                    })
+                  case _ =>
+                }
+              case Failure(_) =>
+            }
+          }
+        }
+      }
+      state
+    }.getOrElse("FAILED")
+
+    println(state)
+  }
+
 }
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 8189a034c..3e9676d54 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -81,9 +81,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
   def initParameter(): FlinkConfiguration = {
     val argsMap = ParameterTool.fromArgs(args)
     val config = argsMap.get(KEY_APP_CONF(), null) match {
-      // scalastyle:off throwerror
       case null | "" => throw new ExceptionInInitializerError("[StreamPark] Usage:can't fond config,please set \"--conf $path \" in main arguments")
-      // scalastyle:on throwerror
       case file => file
     }
     val configMap = parseConfig(config)