You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/01 04:03:19 UTC

[incubator-streampark] branch dev updated: [improve] get jobState from Archivefile improvement (#1937)

This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 32d12bea6 [improve] get jobState from Archivefile improvement (#1937)
32d12bea6 is described below

commit 32d12bea61088c235cad8dea4a61f9bd04e2d576
Author: benjobs <be...@apache.org>
AuthorDate: Tue Nov 1 12:03:14 2022 +0800

    [improve] get jobState from Archivefile improvement (#1937)
---
 .../apache/streampark/archives/FetchArchives.java  | 52 ----------------------
 .../java/org/apache/streampark/archives/Jobs.java  | 36 ---------------
 .../org/apache/streampark/archives/Overview.java   | 43 ------------------
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 45 ++++++++++++++++++-
 4 files changed, 43 insertions(+), 133 deletions(-)

diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java
deleted file mode 100644
index aa31ab5ac..000000000
--- a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.archives;
-
-import org.apache.streampark.common.conf.Workspace;
-import org.apache.streampark.common.util.JsonUtils;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.history.FsJobArchivist;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-
-import java.util.List;
-import java.util.Objects;
-
-public class FetchArchives {
-
-    private static final String FAILED_STATE = "FAILED";
-
-    public static String getJobStateFromArchiveFile(String jobId) {
-        try {
-            Objects.requireNonNull(jobId, "JobId cannot be empty.");
-            Path archiveFilePath = new Path(String.format("%s/%s", Workspace.ARCHIVES_FILE_PATH()), jobId);
-            for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(archiveFilePath)) {
-                String path = archive.getPath();
-                if (path.equals("/jobs/overview")) {
-                    String json = archive.getJson();
-                    Jobs jobs = JsonUtils.read(json, Jobs.class);
-                    List<Overview> overviews = jobs.getJobs();
-                    return overviews.stream().filter(x -> x.getJid().equals(jobId)).map(Overview::getState).findFirst().orElse(FAILED_STATE);
-                }
-            }
-            return FAILED_STATE;
-        } catch (Exception e) {
-            return FAILED_STATE;
-        }
-    }
-}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java
deleted file mode 100644
index ab7f5de5a..000000000
--- a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.archives;
-
-import lombok.Data;
-
-import java.util.List;
-
-@Data
-public class Jobs {
-
-    private List<Overview> jobs;
-
-    public List<Overview> getJobs() {
-        return jobs;
-    }
-
-    public void setJobs(List<Overview> jobs) {
-        this.jobs = jobs;
-    }
-}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java
deleted file mode 100644
index c54a4f29c..000000000
--- a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.archives;
-
-import lombok.Data;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-@Data
-public class Overview {
-
-    private String jid;
-
-    private String name;
-
-    private String state;
-
-    @JsonProperty("start-time")
-    private Long startTime;
-
-    @JsonProperty("end-time")
-    private Long endTime;
-
-    private Integer duration;
-
-    @JsonProperty("last-modification")
-    private Long lastModification;
-
-}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 47ccc5a84..acaec9594 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -17,6 +17,9 @@
 
 package org.apache.streampark.flink.kubernetes.watcher
 
+import org.apache.commons.collections.CollectionUtils
+import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.history.FsJobArchivist
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
@@ -26,7 +29,7 @@ import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkTrackControl
 import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.hc.client5.http.fluent.Request
 import org.apache.hc.core5.util.Timeout
-import org.apache.streampark.archives.FetchArchives
+import org.apache.streampark.common.conf.Workspace
 import org.json4s.{DefaultFormats, JNothing, JNull}
 import org.json4s.JsonAST.JArray
 import org.json4s.jackson.JsonMethods.parse
@@ -39,6 +42,7 @@ import scala.concurrent.duration.DurationLong
 import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
 import scala.language.{implicitConversions, postfixOps}
 import scala.util.{Failure, Success, Try}
+import scala.collection.JavaConversions._
 
 /**
  * Watcher for continuously monitor flink job status on kubernetes-mode,
@@ -277,7 +281,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
           }
         } else if (isConnection) {
           logger.info("The deployment is deleted and enters the task failure process.")
-          FlinkJobState.of(FetchArchives.getJobStateFromArchiveFile(trackId.jobId))
+          FlinkJobState.of(FlinkHistoryArchives.getJobStateFromArchiveFile(trackId.jobId))
         } else {
           inferSilentOrLostFromPreCache(latest)
         }
@@ -417,3 +421,40 @@ private[kubernetes] object JobDetails {
   }
 
 }
+
+private[kubernetes] object FlinkHistoryArchives {
+
+  @transient
+  implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+
+  private[this] val FAILED_STATE = "FAILED"
+
+  def getJobStateFromArchiveFile(jobId: String): String = Try {
+    require(jobId != null, "[StreamPark] getJobStateFromArchiveFile: JobId cannot be null.")
+    val archivePath = new Path(Workspace.ARCHIVES_FILE_PATH, jobId)
+    val archivedJson = FsJobArchivist.getArchivedJsons(archivePath)
+    if (CollectionUtils.isNotEmpty(archivedJson)) {
+      archivedJson.foreach { a =>
+        if (a.getPath == "/jobs/overview") {
+          Try(parse(a.getJson)) match {
+            case Success(ok) =>
+              ok \ "jobs" match {
+                case JNothing | JNull => return FAILED_STATE
+                case JArray(arr) =>
+                  arr.foreach(x => {
+                    val jid = (x \ "jid").extractOpt[String].orNull
+                    if (jid == jobId) {
+                      return (x \ "state").extractOpt[String].orNull
+                    }
+                  })
+                case _ => return FAILED_STATE
+              }
+            case _ => return FAILED_STATE
+          }
+        }
+      }
+    }
+    FAILED_STATE
+  }.getOrElse(FAILED_STATE)
+
+}