You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/01 04:03:19 UTC
[incubator-streampark] branch dev updated: [improve] get jobState from Archivefile improvement (#1937)
This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 32d12bea6 [improve] get jobState from Archivefile improvement (#1937)
32d12bea6 is described below
commit 32d12bea61088c235cad8dea4a61f9bd04e2d576
Author: benjobs <be...@apache.org>
AuthorDate: Tue Nov 1 12:03:14 2022 +0800
[improve] get jobState from Archivefile improvement (#1937)
---
.../apache/streampark/archives/FetchArchives.java | 52 ----------------------
.../java/org/apache/streampark/archives/Jobs.java | 36 ---------------
.../org/apache/streampark/archives/Overview.java | 43 ------------------
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 45 ++++++++++++++++++-
4 files changed, 43 insertions(+), 133 deletions(-)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java
deleted file mode 100644
index aa31ab5ac..000000000
--- a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.archives;
-
-import org.apache.streampark.common.conf.Workspace;
-import org.apache.streampark.common.util.JsonUtils;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.history.FsJobArchivist;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-
-import java.util.List;
-import java.util.Objects;
-
-public class FetchArchives {
-
- private static final String FAILED_STATE = "FAILED";
-
- public static String getJobStateFromArchiveFile(String jobId) {
- try {
- Objects.requireNonNull(jobId, "JobId cannot be empty.");
- Path archiveFilePath = new Path(String.format("%s/%s", Workspace.ARCHIVES_FILE_PATH()), jobId);
- for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(archiveFilePath)) {
- String path = archive.getPath();
- if (path.equals("/jobs/overview")) {
- String json = archive.getJson();
- Jobs jobs = JsonUtils.read(json, Jobs.class);
- List<Overview> overviews = jobs.getJobs();
- return overviews.stream().filter(x -> x.getJid().equals(jobId)).map(Overview::getState).findFirst().orElse(FAILED_STATE);
- }
- }
- return FAILED_STATE;
- } catch (Exception e) {
- return FAILED_STATE;
- }
- }
-}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java
deleted file mode 100644
index ab7f5de5a..000000000
--- a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.archives;
-
-import lombok.Data;
-
-import java.util.List;
-
-@Data
-public class Jobs {
-
- private List<Overview> jobs;
-
- public List<Overview> getJobs() {
- return jobs;
- }
-
- public void setJobs(List<Overview> jobs) {
- this.jobs = jobs;
- }
-}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java
deleted file mode 100644
index c54a4f29c..000000000
--- a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.archives;
-
-import lombok.Data;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-@Data
-public class Overview {
-
- private String jid;
-
- private String name;
-
- private String state;
-
- @JsonProperty("start-time")
- private Long startTime;
-
- @JsonProperty("end-time")
- private Long endTime;
-
- private Integer duration;
-
- @JsonProperty("last-modification")
- private Long lastModification;
-
-}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 47ccc5a84..acaec9594 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -17,6 +17,9 @@
package org.apache.streampark.flink.kubernetes.watcher
+import org.apache.commons.collections.CollectionUtils
+import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.history.FsJobArchivist
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
@@ -26,7 +29,7 @@ import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkTrackControl
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.hc.client5.http.fluent.Request
import org.apache.hc.core5.util.Timeout
-import org.apache.streampark.archives.FetchArchives
+import org.apache.streampark.common.conf.Workspace
import org.json4s.{DefaultFormats, JNothing, JNull}
import org.json4s.JsonAST.JArray
import org.json4s.jackson.JsonMethods.parse
@@ -39,6 +42,7 @@ import scala.concurrent.duration.DurationLong
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
import scala.language.{implicitConversions, postfixOps}
import scala.util.{Failure, Success, Try}
+import scala.collection.JavaConversions._
/**
* Watcher for continuously monitor flink job status on kubernetes-mode,
@@ -277,7 +281,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
}
} else if (isConnection) {
logger.info("The deployment is deleted and enters the task failure process.")
- FlinkJobState.of(FetchArchives.getJobStateFromArchiveFile(trackId.jobId))
+ FlinkJobState.of(FlinkHistoryArchives.getJobStateFromArchiveFile(trackId.jobId))
} else {
inferSilentOrLostFromPreCache(latest)
}
@@ -417,3 +421,40 @@ private[kubernetes] object JobDetails {
}
}
+
+private[kubernetes] object FlinkHistoryArchives {
+
+ @transient
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+
+ private[this] val FAILED_STATE = "FAILED"
+
+ def getJobStateFromArchiveFile(jobId: String): String = Try {
+ require(jobId != null, "[StreamPark] getJobStateFromArchiveFile: JobId cannot be null.")
+ val archivePath = new Path(Workspace.ARCHIVES_FILE_PATH, jobId)
+ val archivedJson = FsJobArchivist.getArchivedJsons(archivePath)
+ if (CollectionUtils.isNotEmpty(archivedJson)) {
+ archivedJson.foreach { a =>
+ if (a.getPath == "/jobs/overview") {
+ Try(parse(a.getJson)) match {
+ case Success(ok) =>
+ ok \ "jobs" match {
+ case JNothing | JNull => return FAILED_STATE
+ case JArray(arr) =>
+ arr.foreach(x => {
+ val jid = (x \ "jid").extractOpt[String].orNull
+ if (jid == jobId) {
+ return (x \ "state").extractOpt[String].orNull
+ }
+ })
+ case _ => return FAILED_STATE
+ }
+ case _ => return FAILED_STATE
+ }
+ }
+ }
+ }
+ FAILED_STATE
+ }.getOrElse(FAILED_STATE)
+
+}