You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/30 14:04:59 UTC
[incubator-streampark] branch dev updated: [Bug] k8s jobstatus bug fixed (#2112)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 61289105b [Bug] k8s jobstatus bug fixed (#2112)
61289105b is described below
commit 61289105bd491129863ecf47f14a71e6b9c5327d
Author: benjobs <be...@apache.org>
AuthorDate: Wed Nov 30 22:04:54 2022 +0800
[Bug] k8s jobstatus bug fixed (#2112)
---
.../flink/kubernetes/watcher/FlinkJobStatusWatcher.scala | 8 +++++---
.../apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala | 10 ++++++----
2 files changed, 11 insertions(+), 7 deletions(-)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 7a12aa4f2..e3a77f2f6 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -442,10 +442,12 @@ private[kubernetes] object FlinkHistoryArchives {
if (a.getPath == s"/jobs/$jobId/exceptions") {
Try(parse(a.getJson)) match {
case Success(ok) =>
- val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
- val file = new File(path)
val log = (ok \ "root-exception").extractOpt[String].orNull
- Files.asCharSink(file, Charsets.UTF_8).write(log)
+ if (log != null) {
+ val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+ val file = new File(path)
+ Files.asCharSink(file, Charsets.UTF_8).write(log)
+ }
case _ =>
}
} else if (a.getPath == "/jobs/overview") {
diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index 3e0dd61c3..98a3e76e6 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -310,11 +310,13 @@ class FlinkRestJsonTest {
if (a.getPath == s"/jobs/$jobId/exceptions") {
Try(parse(a.getJson)) match {
case Success(ok) =>
- val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
- val file = new File(path)
val log = (ok \ "root-exception").extractOpt[String].orNull
- Files.asCharSink(file, Charsets.UTF_8).write(log)
- println(" error path: " + path)
+ if (log != null) {
+ val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+ val file = new File(path)
+ Files.asCharSink(file, Charsets.UTF_8).write(log)
+ println(" error path: " + path)
+ }
case _ =>
}
} else if (a.getPath == "/jobs/overview") {