You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/25 07:08:54 UTC
[incubator-streampark] branch yarn-session created (now a199c211a)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a change to branch yarn-session
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
at a199c211a [Bug] flink-job archivefile parse bug fixed
This branch includes the following new commits:
new a199c211a [Bug] flink-job archivefile parse bug fixed
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[incubator-streampark] 01/01: [Bug] flink-job archivefile parse bug fixed
Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch yarn-session
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit a199c211ac0fa50131e8e110fb60bdf98ca2de58
Author: benjobs <be...@apache.org>
AuthorDate: Fri Nov 25 15:08:36 2022 +0800
[Bug] flink-job archivefile parse bug fixed
---
.../common/util/SystemPropertyUtils.scala | 3 ++
.../helper/KubernetesDeploymentHelper.scala | 16 ++++--
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 36 +++++--------
.../d933fa6c785f0db6dccc6cc05dd43bab.json | 1 +
.../flink/kubernetes/FlinkRestJsonTest.scala | 62 ++++++++++++++++++++++
.../flink/core/FlinkStreamingInitializer.scala | 2 -
6 files changed, 92 insertions(+), 28 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
index 211e04683..ce1f7c94b 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
@@ -119,4 +119,7 @@ object SystemPropertyUtils extends Logger {
SystemPropertyUtils.set(key, appHome)
}
}
+
+ def getTmpdir() : String = get("java.io.tmpdir", "temp")
+
}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index e9922385b..994fdf220 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -83,8 +83,7 @@ object KubernetesDeploymentHelper extends Logger {
def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String): String = {
tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
- val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
- val path = s"$projectPath/$jobId.log"
+ val path = KubernetesDeploymentHelper.getJobLog(jobId)
val file = new File(path)
val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
Files.asCharSink(file, Charsets.UTF_8).write(log)
@@ -96,8 +95,7 @@ object KubernetesDeploymentHelper extends Logger {
tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
Try {
val podName = getPods(nameSpace, jobName).head.getMetadata.getName
- val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
- val path = s"$projectPath/${jobId}_err.log"
+ val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
val file = new File(path)
val log = client.pods.inNamespace(nameSpace).withName(podName).terminated().withPrettyOutput.getLog
Files.asCharSink(file, Charsets.UTF_8).write(log)
@@ -118,4 +116,14 @@ object KubernetesDeploymentHelper extends Logger {
}
}
+ private[kubernetes] def getJobLog(jobId: String): String = {
+ val tmpPath = SystemPropertyUtils.getTmpdir()
+ s"$tmpPath/$jobId.log"
+ }
+
+ private[kubernetes] def getJobErrorLog(jobId: String): String = {
+ val tmpPath = SystemPropertyUtils.getTmpdir()
+ s"$tmpPath/${jobId}_err.log"
+ }
+
}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 037e68258..7a12aa4f2 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -18,12 +18,11 @@
package org.apache.streampark.flink.kubernetes.watcher
import com.google.common.base.Charsets
-import com.google.common.io.{FileWriteMode, Files}
+import com.google.common.io.Files
import org.apache.commons.collections.CollectionUtils
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.history.FsJobArchivist
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson
-import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
+import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
@@ -39,7 +38,6 @@ import org.json4s.jackson.JsonMethods.parse
import java.io.File
import java.nio.charset.StandardCharsets
-import java.util
import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
import javax.annotation.Nonnull
import javax.annotation.concurrent.ThreadSafe
@@ -96,7 +94,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
* single flink job status tracking task
*/
override def doWatch(): Unit = {
- this.synchronized{
+ this.synchronized {
logInfo("[FlinkJobStatusWatcher]: Status monitoring process begins - " + Thread.currentThread().getName)
// get all legal tracking ids
val trackIds = Try(trackController.collectAllTrackIds()).filter(_.nonEmpty).getOrElse(return)
@@ -444,17 +442,11 @@ private[kubernetes] object FlinkHistoryArchives {
if (a.getPath == s"/jobs/$jobId/exceptions") {
Try(parse(a.getJson)) match {
case Success(ok) =>
- ok \ "root-exception" match {
- case JNothing | JNull =>
- case JArray(arr) =>
- arr.foreach(x => {
- val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
- val path = s"${projectPath}/${jobId}_err.log"
- val file = new File(path)
- val log = (x \ "root-exception").extractOpt[String].orNull
- Files.asCharSink(file, Charsets.UTF_8).write(log)
- })
- }
+ val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+ val file = new File(path)
+ val log = (ok \ "root-exception").extractOpt[String].orNull
+ Files.asCharSink(file, Charsets.UTF_8).write(log)
+ case _ =>
}
} else if (a.getPath == "/jobs/overview") {
Try(parse(a.getJson)) match {
@@ -462,12 +454,12 @@ private[kubernetes] object FlinkHistoryArchives {
ok \ "jobs" match {
case JNothing | JNull =>
case JArray(arr) =>
- arr.foreach(x => {
- val jid = (x \ "jid").extractOpt[String].orNull
- if (jid == jobId) {
- state = (x \ "state").extractOpt[String].orNull
- }
- })
+ arr.foreach(x => {
+ val jid = (x \ "jid").extractOpt[String].orNull
+ if (jid == jobId) {
+ state = (x \ "state").extractOpt[String].orNull
+ }
+ })
case _ =>
}
case Failure(_) =>
diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json b/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json
new file mode 100644
index 000000000..af2c1f6b6
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json
@@ -0,0 +1 @@
+{"archive":[{"path":"/jobs/overview","json":"{\"jobs\":[{\"jid\":\"d933fa6c785f0db6dccc6cc05dd43bab\",\"name\":\"test555\",\"state\":\"FAILED\",\"start-time\":1669348683915,\"end-time\":1669348702523,\"duration\":18608,\"last-modification\":1669348702523,\"tasks\":{\"total\":1,\"created\":0,\"scheduled\":0,\"deploying\":0,\"running\":0,\"finished\":0,\"canceling\":0,\"canceled\":0,\"failed\":1,\"reconciling\":0,\"initializing\":0}}]}"},{"path":"/jobs/d933fa6c785f0db6dccc6cc05dd43bab/conf [...]
\ No newline at end of file
diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index 300989d20..b0d3ed301 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -16,8 +16,22 @@
*/
package org.apache.streampark.flink.kubernetes
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.apache.commons.collections.CollectionUtils
+import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.history.FsJobArchivist
+import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.streampark.flink.kubernetes.watcher.{Checkpoint, FlinkRestJmConfigItem, FlinkRestOverview, JobDetails}
+import org.json4s.DefaultFormats
import org.junit.jupiter.api.Test
+import org.json4s.{JNothing, JNull}
+import org.json4s.JsonAST.JArray
+import org.json4s.jackson.JsonMethods.parse
+
+import java.io.File
+import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
+import scala.util.{Failure, Success, Try}
// scalastyle:off println
class FlinkRestJsonTest {
@@ -280,4 +294,52 @@ class FlinkRestJsonTest {
println(ingressMeta.get)
}
+ @Test def testHistoryArchives(): Unit = {
+
+ @transient
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+
+
+ val state = Try {
+ val archivePath = new Path("src/test/resources/d933fa6c785f0db6dccc6cc05dd43bab.json")
+ val jobId = "d933fa6c785f0db6dccc6cc05dd43bab"
+ val archivedJson = FsJobArchivist.getArchivedJsons(archivePath)
+ var state: String = "FAILED"
+ if (CollectionUtils.isNotEmpty(archivedJson)) {
+ archivedJson.foreach { a =>
+ if (a.getPath == s"/jobs/$jobId/exceptions") {
+ Try(parse(a.getJson)) match {
+ case Success(ok) =>
+ val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+ val file = new File(path)
+ val log = (ok \ "root-exception").extractOpt[String].orNull
+ Files.asCharSink(file, Charsets.UTF_8).write(log)
+ println(" error path: " + path)
+ case _ =>
+ }
+ } else if (a.getPath == "/jobs/overview") {
+ Try(parse(a.getJson)) match {
+ case Success(ok) =>
+ ok \ "jobs" match {
+ case JNothing | JNull =>
+ case JArray(arr) =>
+ arr.foreach(x => {
+ val jid = (x \ "jid").extractOpt[String].orNull
+ if (jid == jobId) {
+ state = (x \ "state").extractOpt[String].orNull
+ }
+ })
+ case _ =>
+ }
+ case Failure(_) =>
+ }
+ }
+ }
+ }
+ state
+ }.getOrElse("FAILED")
+
+ println(state)
+ }
+
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 8189a034c..3e9676d54 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -81,9 +81,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
def initParameter(): FlinkConfiguration = {
val argsMap = ParameterTool.fromArgs(args)
val config = argsMap.get(KEY_APP_CONF(), null) match {
- // scalastyle:off throwerror
case null | "" => throw new ExceptionInInitializerError("[StreamPark] Usage:can't fond config,please set \"--conf $path \" in main arguments")
- // scalastyle:on throwerror
case file => file
}
val configMap = parseConfig(config)