You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by fa...@apache.org on 2022/10/31 09:18:37 UTC

[incubator-streampark] branch dev updated: [improve]Precision state judgment based on archive logs (#1910)

This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7aef25964 [improve]Precision state judgment based on archive logs (#1910)
7aef25964 is described below

commit 7aef25964e0fae49818f8b4d3b94f7c70b1a520c
Author: monster <60...@users.noreply.github.com>
AuthorDate: Mon Oct 31 17:18:29 2022 +0800

    [improve]Precision state judgment based on archive logs (#1910)
---
 .../apache/streampark/common/conf/Workspace.scala  | 19 +++++---
 .../streampark/console/core/entity/Project.java    |  2 +-
 .../console/core/runner/EnvInitializer.java        |  8 ++--
 .../core/service/impl/ApplicationServiceImpl.java  |  5 ++-
 .../core/service/impl/ProjectServiceImpl.java      |  2 +-
 .../streampark-flink-kubernetes/pom.xml            | 23 ++++++++++
 .../apache/streampark/archives/FetchArchives.java  | 52 ++++++++++++++++++++++
 .../java/org/apache/streampark/archives/Jobs.java  | 36 +++++++++++++++
 .../org/apache/streampark/archives/Overview.java   | 43 ++++++++++++++++++
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala |  3 +-
 .../streampark/flink/packer/maven/MavenTool.scala  |  2 +-
 11 files changed, 179 insertions(+), 16 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index 8206a2c6a..a8add22a8 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -41,22 +41,27 @@ object Workspace {
   /**
    * dirPath of the maven local repository with built-in compilation process
    */
-  lazy val MAVEN_LOCAL_DIR = s"$localWorkspace/mvnrepo"
+  lazy val MAVEN_LOCAL_PATH = s"$localWorkspace/mvnrepo"
 
   /**
-   * local sourceCode dir.(for git...)
+   * local sourceCode path.(for git...)
    */
-  lazy val PROJECT_LOCAL_DIR = s"$localWorkspace/project"
+  lazy val PROJECT_LOCAL_PATH = s"$localWorkspace/project"
 
   /**
-   * local log dir.
+   * local log path.
    */
-  lazy val LOG_LOCAL_DIR = s"$localWorkspace/logs"
+  lazy val LOG_LOCAL_PATH = s"$localWorkspace/logs"
 
   /**
-   * project build log dir.
+   * project build log path.
    */
-  lazy val PROJECT_BUILD_LOG_DIR = s"$LOG_LOCAL_DIR/build_logs"
+  lazy val PROJECT_BUILD_LOG_PATH = s"$LOG_LOCAL_PATH/build_logs"
+
+  /**
+   * project archives path
+   */
+  lazy val ARCHIVES_FILE_PATH = s"${remote.WORKSPACE}/historyserver/archive"
 
 }
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
index 99afed078..fc1b99bed 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
@@ -113,7 +113,7 @@ public class Project implements Serializable {
     @JsonIgnore
     public File getAppSource() {
         if (appSource == null) {
-            appSource = Workspace.PROJECT_LOCAL_DIR();
+            appSource = Workspace.PROJECT_LOCAL_PATH();
         }
         File sourcePath = new File(appSource);
         if (!sourcePath.exists()) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 139da7aed..5ac81ba3c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -74,9 +74,9 @@ public class EnvInitializer implements ApplicationRunner {
         String appHome = WebUtils.getAppHome();
         if (appHome == null) {
             throw new ExceptionInInitializerError(String.format("[StreamPark] System initialization check failed," +
-                " The system initialization check failed. If started local for development and debugging," +
-                " please ensure the -D%s parameter is clearly specified," +
-                " more detail: https://streampark.apache.org/docs/user-guide/development",
+                    " The system initialization check failed. If started local for development and debugging," +
+                    " please ensure the -D%s parameter is clearly specified," +
+                    " more detail: https://streampark.apache.org/docs/user-guide/development",
                 ConfigConst.KEY_APP_HOME()));
         }
 
@@ -231,7 +231,7 @@ public class EnvInitializer implements ApplicationRunner {
 
             // 2.4) create maven local repository dir
 
-            String localMavenRepo = Workspace.MAVEN_LOCAL_DIR();
+            String localMavenRepo = Workspace.MAVEN_LOCAL_PATH();
             if (FsOperator.lfs().exists(localMavenRepo)) {
                 FsOperator.lfs().mkdirs(localMavenRepo);
             }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 9e1909b7b..8efc9d4c6 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -108,6 +108,7 @@ import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -1241,6 +1242,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
 
         String appConf;
         String flinkUserJar = null;
+        String jobId = new JobID().toHexString();
         ApplicationLog applicationLog = new ApplicationLog();
         applicationLog.setAppId(application.getId());
         applicationLog.setOptionTime(new Date());
@@ -1362,6 +1364,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
+                properties.put(JobManagerOptions.ARCHIVE_DIR.key(), Workspace.ARCHIVES_FILE_PATH());
                 DockerImageBuildResponse result = buildResult.as(DockerImageBuildResponse.class);
                 String ingressTemplates = application.getIngressTemplate();
                 String domainName = application.getDefaultModeIngress();
@@ -1396,7 +1399,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             ExecutionMode.of(application.getExecutionMode()),
             resolveOrder,
             application.getId(),
-            new JobID().toHexString(),
+            jobId,
             application.getJobName(),
             appConf,
             application.getApplicationType(),
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index aa347e44b..47e04cf3e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -322,7 +322,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
     }
 
     private String getBuildLogPath(Long projectId) {
-        return String.format("%s/%s/build.log", Workspace.PROJECT_BUILD_LOG_DIR(), projectId);
+        return String.format("%s/%s/build.log", Workspace.PROJECT_BUILD_LOG_PATH(), projectId);
     }
 
 }
diff --git a/streampark-flink/streampark-flink-kubernetes/pom.xml b/streampark-flink/streampark-flink-kubernetes/pom.xml
index e7bc68a97..83cc9130e 100644
--- a/streampark-flink/streampark-flink-kubernetes/pom.xml
+++ b/streampark-flink/streampark-flink-kubernetes/pom.xml
@@ -96,6 +96,29 @@
             <version>${snakeyaml.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java
new file mode 100644
index 000000000..aa31ab5ac
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.util.JsonUtils;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    private static final String FAILED_STATE = "FAILED";
+
+    public static String getJobStateFromArchiveFile(String jobId) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Path archiveFilePath = new Path(String.format("%s/%s", Workspace.ARCHIVES_FILE_PATH()), jobId);
+            for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(archiveFilePath)) {
+                String path = archive.getPath();
+                if (path.equals("/jobs/overview")) {
+                    String json = archive.getJson();
+                    Jobs jobs = JsonUtils.read(json, Jobs.class);
+                    List<Overview> overviews = jobs.getJobs();
+                    return overviews.stream().filter(x -> x.getJid().equals(jobId)).map(Overview::getState).findFirst().orElse(FAILED_STATE);
+                }
+            }
+            return FAILED_STATE;
+        } catch (Exception e) {
+            return FAILED_STATE;
+        }
+    }
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java
new file mode 100644
index 000000000..ab7f5de5a
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class Jobs {
+
+    private List<Overview> jobs;
+
+    public List<Overview> getJobs() {
+        return jobs;
+    }
+
+    public void setJobs(List<Overview> jobs) {
+        this.jobs = jobs;
+    }
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java
new file mode 100644
index 000000000..c54a4f29c
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+@Data
+public class Overview {
+
+    private String jid;
+
+    private String name;
+
+    private String state;
+
+    @JsonProperty("start-time")
+    private Long startTime;
+
+    @JsonProperty("end-time")
+    private Long endTime;
+
+    private Integer duration;
+
+    @JsonProperty("last-modification")
+    private Long lastModification;
+
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 50f6fd96d..47ccc5a84 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -26,6 +26,7 @@ import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkTrackControl
 import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
 import org.apache.hc.client5.http.fluent.Request
 import org.apache.hc.core5.util.Timeout
+import org.apache.streampark.archives.FetchArchives
 import org.json4s.{DefaultFormats, JNothing, JNull}
 import org.json4s.JsonAST.JArray
 import org.json4s.jackson.JsonMethods.parse
@@ -276,7 +277,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
           }
         } else if (isConnection) {
           logger.info("The deployment is deleted and enters the task failure process.")
-          FlinkJobState.FAILED
+          FlinkJobState.of(FetchArchives.getJobStateFromArchiveFile(trackId.jobId))
         } else {
           inferSilentOrLostFromPreCache(latest)
         }
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index fc99e98fc..0302309a2 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -187,7 +187,7 @@ object MavenTool extends Logger {
     /**
      * default maven local repository
      */
-    lazy val localRepo = new LocalRepository(Workspace.MAVEN_LOCAL_DIR)
+    lazy val localRepo = new LocalRepository(Workspace.MAVEN_LOCAL_PATH)
 
 
     def newRepoSystem(): RepositorySystem = {