You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/25 07:08:55 UTC
[incubator-streampark] 01/01: [Bug] flink-job archivefile parse bug fixed
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch yarn-session
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit a199c211ac0fa50131e8e110fb60bdf98ca2de58
Author: benjobs <be...@apache.org>
AuthorDate: Fri Nov 25 15:08:36 2022 +0800
[Bug] flink-job archivefile parse bug fixed
---
.../common/util/SystemPropertyUtils.scala | 3 ++
.../helper/KubernetesDeploymentHelper.scala | 16 ++++--
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 36 +++++--------
.../d933fa6c785f0db6dccc6cc05dd43bab.json | 1 +
.../flink/kubernetes/FlinkRestJsonTest.scala | 62 ++++++++++++++++++++++
.../flink/core/FlinkStreamingInitializer.scala | 2 -
6 files changed, 92 insertions(+), 28 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
index 211e04683..ce1f7c94b 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
@@ -119,4 +119,7 @@ object SystemPropertyUtils extends Logger {
SystemPropertyUtils.set(key, appHome)
}
}
+
+ def getTmpdir() : String = get("java.io.tmpdir", "temp")
+
}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index e9922385b..994fdf220 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -83,8 +83,7 @@ object KubernetesDeploymentHelper extends Logger {
def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String): String = {
tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
- val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
- val path = s"$projectPath/$jobId.log"
+ val path = KubernetesDeploymentHelper.getJobLog(jobId)
val file = new File(path)
val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
Files.asCharSink(file, Charsets.UTF_8).write(log)
@@ -96,8 +95,7 @@ object KubernetesDeploymentHelper extends Logger {
tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
Try {
val podName = getPods(nameSpace, jobName).head.getMetadata.getName
- val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
- val path = s"$projectPath/${jobId}_err.log"
+ val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
val file = new File(path)
val log = client.pods.inNamespace(nameSpace).withName(podName).terminated().withPrettyOutput.getLog
Files.asCharSink(file, Charsets.UTF_8).write(log)
@@ -118,4 +116,14 @@ object KubernetesDeploymentHelper extends Logger {
}
}
+ private[kubernetes] def getJobLog(jobId: String): String = {
+ val tmpPath = SystemPropertyUtils.getTmpdir()
+ s"$tmpPath/$jobId.log"
+ }
+
+ private[kubernetes] def getJobErrorLog(jobId: String): String = {
+ val tmpPath = SystemPropertyUtils.getTmpdir()
+ s"$tmpPath/${jobId}_err.log"
+ }
+
}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 037e68258..7a12aa4f2 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -18,12 +18,11 @@
package org.apache.streampark.flink.kubernetes.watcher
import com.google.common.base.Charsets
-import com.google.common.io.{FileWriteMode, Files}
+import com.google.common.io.Files
import org.apache.commons.collections.CollectionUtils
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.history.FsJobArchivist
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson
-import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
+import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
@@ -39,7 +38,6 @@ import org.json4s.jackson.JsonMethods.parse
import java.io.File
import java.nio.charset.StandardCharsets
-import java.util
import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
import javax.annotation.Nonnull
import javax.annotation.concurrent.ThreadSafe
@@ -96,7 +94,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
* single flink job status tracking task
*/
override def doWatch(): Unit = {
- this.synchronized{
+ this.synchronized {
logInfo("[FlinkJobStatusWatcher]: Status monitoring process begins - " + Thread.currentThread().getName)
// get all legal tracking ids
val trackIds = Try(trackController.collectAllTrackIds()).filter(_.nonEmpty).getOrElse(return)
@@ -444,17 +442,11 @@ private[kubernetes] object FlinkHistoryArchives {
if (a.getPath == s"/jobs/$jobId/exceptions") {
Try(parse(a.getJson)) match {
case Success(ok) =>
- ok \ "root-exception" match {
- case JNothing | JNull =>
- case JArray(arr) =>
- arr.foreach(x => {
- val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
- val path = s"${projectPath}/${jobId}_err.log"
- val file = new File(path)
- val log = (x \ "root-exception").extractOpt[String].orNull
- Files.asCharSink(file, Charsets.UTF_8).write(log)
- })
- }
+ val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+ val file = new File(path)
+ val log = (ok \ "root-exception").extractOpt[String].orNull
+ Files.asCharSink(file, Charsets.UTF_8).write(log)
+ case _ =>
}
} else if (a.getPath == "/jobs/overview") {
Try(parse(a.getJson)) match {
@@ -462,12 +454,12 @@ private[kubernetes] object FlinkHistoryArchives {
ok \ "jobs" match {
case JNothing | JNull =>
case JArray(arr) =>
- arr.foreach(x => {
- val jid = (x \ "jid").extractOpt[String].orNull
- if (jid == jobId) {
- state = (x \ "state").extractOpt[String].orNull
- }
- })
+ arr.foreach(x => {
+ val jid = (x \ "jid").extractOpt[String].orNull
+ if (jid == jobId) {
+ state = (x \ "state").extractOpt[String].orNull
+ }
+ })
case _ =>
}
case Failure(_) =>
diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json b/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json
new file mode 100644
index 000000000..af2c1f6b6
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json
@@ -0,0 +1 @@
+{"archive":[{"path":"/jobs/overview","json":"{\"jobs\":[{\"jid\":\"d933fa6c785f0db6dccc6cc05dd43bab\",\"name\":\"test555\",\"state\":\"FAILED\",\"start-time\":1669348683915,\"end-time\":1669348702523,\"duration\":18608,\"last-modification\":1669348702523,\"tasks\":{\"total\":1,\"created\":0,\"scheduled\":0,\"deploying\":0,\"running\":0,\"finished\":0,\"canceling\":0,\"canceled\":0,\"failed\":1,\"reconciling\":0,\"initializing\":0}}]}"},{"path":"/jobs/d933fa6c785f0db6dccc6cc05dd43bab/conf [...]
\ No newline at end of file
diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index 300989d20..b0d3ed301 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -16,8 +16,22 @@
*/
package org.apache.streampark.flink.kubernetes
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.apache.commons.collections.CollectionUtils
+import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.history.FsJobArchivist
+import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.streampark.flink.kubernetes.watcher.{Checkpoint, FlinkRestJmConfigItem, FlinkRestOverview, JobDetails}
+import org.json4s.DefaultFormats
import org.junit.jupiter.api.Test
+import org.json4s.{JNothing, JNull}
+import org.json4s.JsonAST.JArray
+import org.json4s.jackson.JsonMethods.parse
+
+import java.io.File
+import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
+import scala.util.{Failure, Success, Try}
// scalastyle:off println
class FlinkRestJsonTest {
@@ -280,4 +294,52 @@ class FlinkRestJsonTest {
println(ingressMeta.get)
}
+ @Test def testHistoryArchives(): Unit = {
+
+ @transient
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+
+
+ val state = Try {
+ val archivePath = new Path("src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json")
+ val jobId = "d933fa6c785f0db6dccc6cc05dd43bab"
+ val archivedJson = FsJobArchivist.getArchivedJsons(archivePath)
+ var state: String = "FAILED"
+ if (CollectionUtils.isNotEmpty(archivedJson)) {
+ archivedJson.foreach { a =>
+ if (a.getPath == s"/jobs/$jobId/exceptions") {
+ Try(parse(a.getJson)) match {
+ case Success(ok) =>
+ val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+ val file = new File(path)
+ val log = (ok \ "root-exception").extractOpt[String].orNull
+ Files.asCharSink(file, Charsets.UTF_8).write(log)
+ println(" error path: " + path)
+ case _ =>
+ }
+ } else if (a.getPath == "/jobs/overview") {
+ Try(parse(a.getJson)) match {
+ case Success(ok) =>
+ ok \ "jobs" match {
+ case JNothing | JNull =>
+ case JArray(arr) =>
+ arr.foreach(x => {
+ val jid = (x \ "jid").extractOpt[String].orNull
+ if (jid == jobId) {
+ state = (x \ "state").extractOpt[String].orNull
+ }
+ })
+ case _ =>
+ }
+ case Failure(_) =>
+ }
+ }
+ }
+ }
+ state
+ }.getOrElse("FAILED")
+
+ println(state)
+ }
+
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 8189a034c..3e9676d54 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -81,9 +81,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
def initParameter(): FlinkConfiguration = {
val argsMap = ParameterTool.fromArgs(args)
val config = argsMap.get(KEY_APP_CONF(), null) match {
- // scalastyle:off throwerror
case null | "" => throw new ExceptionInInitializerError("[StreamPark] Usage:can't fond config,please set \"--conf $path \" in main arguments")
- // scalastyle:on throwerror
case file => file
}
val configMap = parseConfig(config)