You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/30 14:04:59 UTC

[incubator-streampark] branch dev updated: [Bug] k8s jobstatus bug fixed (#2112)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 61289105b [Bug] k8s jobstatus bug fixed (#2112)
61289105b is described below

commit 61289105bd491129863ecf47f14a71e6b9c5327d
Author: benjobs <be...@apache.org>
AuthorDate: Wed Nov 30 22:04:54 2022 +0800

    [Bug] k8s jobstatus bug fixed (#2112)
---
 .../flink/kubernetes/watcher/FlinkJobStatusWatcher.scala       |  8 +++++---
 .../apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala | 10 ++++++----
 2 files changed, 11 insertions(+), 7 deletions(-)

diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 7a12aa4f2..e3a77f2f6 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -442,10 +442,12 @@ private[kubernetes] object FlinkHistoryArchives {
         if (a.getPath == s"/jobs/$jobId/exceptions") {
           Try(parse(a.getJson)) match {
             case Success(ok) =>
-              val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
-              val file = new File(path)
               val log = (ok \ "root-exception").extractOpt[String].orNull
-              Files.asCharSink(file, Charsets.UTF_8).write(log)
+              if (log != null) {
+                val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+                val file = new File(path)
+                Files.asCharSink(file, Charsets.UTF_8).write(log)
+              }
             case _ =>
           }
         } else if (a.getPath == "/jobs/overview") {
diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index 3e0dd61c3..98a3e76e6 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -310,11 +310,13 @@ class FlinkRestJsonTest {
           if (a.getPath == s"/jobs/$jobId/exceptions") {
             Try(parse(a.getJson)) match {
               case Success(ok) =>
-                val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
-                val file = new File(path)
                 val log = (ok \ "root-exception").extractOpt[String].orNull
-                Files.asCharSink(file, Charsets.UTF_8).write(log)
-                println(" error path: " + path)
+                if (log != null) {
+                  val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+                  val file = new File(path)
+                  Files.asCharSink(file, Charsets.UTF_8).write(log)
+                  println(" error path: " + path)
+                }
               case _ =>
             }
           } else if (a.getPath == "/jobs/overview") {