You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by fa...@apache.org on 2022/10/31 09:18:37 UTC
[incubator-streampark] branch dev updated: [improve]Precision state judgment based on archive logs (#1910)
This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 7aef25964 [improve]Precision state judgment based on archive logs (#1910)
7aef25964 is described below
commit 7aef25964e0fae49818f8b4d3b94f7c70b1a520c
Author: monster <60...@users.noreply.github.com>
AuthorDate: Mon Oct 31 17:18:29 2022 +0800
[improve]Precision state judgment based on archive logs (#1910)
---
.../apache/streampark/common/conf/Workspace.scala | 19 +++++---
.../streampark/console/core/entity/Project.java | 2 +-
.../console/core/runner/EnvInitializer.java | 8 ++--
.../core/service/impl/ApplicationServiceImpl.java | 5 ++-
.../core/service/impl/ProjectServiceImpl.java | 2 +-
.../streampark-flink-kubernetes/pom.xml | 23 ++++++++++
.../apache/streampark/archives/FetchArchives.java | 52 ++++++++++++++++++++++
.../java/org/apache/streampark/archives/Jobs.java | 36 +++++++++++++++
.../org/apache/streampark/archives/Overview.java | 43 ++++++++++++++++++
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 3 +-
.../streampark/flink/packer/maven/MavenTool.scala | 2 +-
11 files changed, 179 insertions(+), 16 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index 8206a2c6a..a8add22a8 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -41,22 +41,27 @@ object Workspace {
/**
* dirPath of the maven local repository with built-in compilation process
*/
- lazy val MAVEN_LOCAL_DIR = s"$localWorkspace/mvnrepo"
+ lazy val MAVEN_LOCAL_PATH = s"$localWorkspace/mvnrepo"
/**
- * local sourceCode dir.(for git...)
+ * local sourceCode path.(for git...)
*/
- lazy val PROJECT_LOCAL_DIR = s"$localWorkspace/project"
+ lazy val PROJECT_LOCAL_PATH = s"$localWorkspace/project"
/**
- * local log dir.
+ * local log path.
*/
- lazy val LOG_LOCAL_DIR = s"$localWorkspace/logs"
+ lazy val LOG_LOCAL_PATH = s"$localWorkspace/logs"
/**
- * project build log dir.
+ * project build log path.
*/
- lazy val PROJECT_BUILD_LOG_DIR = s"$LOG_LOCAL_DIR/build_logs"
+ lazy val PROJECT_BUILD_LOG_PATH = s"$LOG_LOCAL_PATH/build_logs"
+
+ /**
+ * project archives path
+ */
+ lazy val ARCHIVES_FILE_PATH = s"${remote.WORKSPACE}/historyserver/archive"
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
index 99afed078..fc1b99bed 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Project.java
@@ -113,7 +113,7 @@ public class Project implements Serializable {
@JsonIgnore
public File getAppSource() {
if (appSource == null) {
- appSource = Workspace.PROJECT_LOCAL_DIR();
+ appSource = Workspace.PROJECT_LOCAL_PATH();
}
File sourcePath = new File(appSource);
if (!sourcePath.exists()) {
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 139da7aed..5ac81ba3c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -74,9 +74,9 @@ public class EnvInitializer implements ApplicationRunner {
String appHome = WebUtils.getAppHome();
if (appHome == null) {
throw new ExceptionInInitializerError(String.format("[StreamPark] System initialization check failed," +
- " The system initialization check failed. If started local for development and debugging," +
- " please ensure the -D%s parameter is clearly specified," +
- " more detail: https://streampark.apache.org/docs/user-guide/development",
+ " The system initialization check failed. If started local for development and debugging," +
+ " please ensure the -D%s parameter is clearly specified," +
+ " more detail: https://streampark.apache.org/docs/user-guide/development",
ConfigConst.KEY_APP_HOME()));
}
@@ -231,7 +231,7 @@ public class EnvInitializer implements ApplicationRunner {
// 2.4) create maven local repository dir
- String localMavenRepo = Workspace.MAVEN_LOCAL_DIR();
+ String localMavenRepo = Workspace.MAVEN_LOCAL_PATH();
if (FsOperator.lfs().exists(localMavenRepo)) {
FsOperator.lfs().mkdirs(localMavenRepo);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 9e1909b7b..8efc9d4c6 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -108,6 +108,7 @@ import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.springframework.beans.factory.annotation.Autowired;
@@ -1241,6 +1242,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
String appConf;
String flinkUserJar = null;
+ String jobId = new JobID().toHexString();
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setAppId(application.getId());
applicationLog.setOptionTime(new Date());
@@ -1362,6 +1364,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
} else {
if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
AssertUtils.state(buildResult != null);
+ properties.put(JobManagerOptions.ARCHIVE_DIR.key(), Workspace.ARCHIVES_FILE_PATH());
DockerImageBuildResponse result = buildResult.as(DockerImageBuildResponse.class);
String ingressTemplates = application.getIngressTemplate();
String domainName = application.getDefaultModeIngress();
@@ -1396,7 +1399,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
ExecutionMode.of(application.getExecutionMode()),
resolveOrder,
application.getId(),
- new JobID().toHexString(),
+ jobId,
application.getJobName(),
appConf,
application.getApplicationType(),
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index aa347e44b..47e04cf3e 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -322,7 +322,7 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
}
private String getBuildLogPath(Long projectId) {
- return String.format("%s/%s/build.log", Workspace.PROJECT_BUILD_LOG_DIR(), projectId);
+ return String.format("%s/%s/build.log", Workspace.PROJECT_BUILD_LOG_PATH(), projectId);
}
}
diff --git a/streampark-flink/streampark-flink-kubernetes/pom.xml b/streampark-flink/streampark-flink-kubernetes/pom.xml
index e7bc68a97..83cc9130e 100644
--- a/streampark-flink/streampark-flink-kubernetes/pom.xml
+++ b/streampark-flink/streampark-flink-kubernetes/pom.xml
@@ -96,6 +96,29 @@
<version>${snakeyaml.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
</dependencies>
<build>
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java
new file mode 100644
index 000000000..aa31ab5ac
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.util.JsonUtils;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+ private static final String FAILED_STATE = "FAILED";
+
+ public static String getJobStateFromArchiveFile(String jobId) {
+ try {
+ Objects.requireNonNull(jobId, "JobId cannot be empty.");
+ Path archiveFilePath = new Path(String.format("%s/%s", Workspace.ARCHIVES_FILE_PATH()), jobId);
+ for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(archiveFilePath)) {
+ String path = archive.getPath();
+ if (path.equals("/jobs/overview")) {
+ String json = archive.getJson();
+ Jobs jobs = JsonUtils.read(json, Jobs.class);
+ List<Overview> overviews = jobs.getJobs();
+ return overviews.stream().filter(x -> x.getJid().equals(jobId)).map(Overview::getState).findFirst().orElse(FAILED_STATE);
+ }
+ }
+ return FAILED_STATE;
+ } catch (Exception e) {
+ return FAILED_STATE;
+ }
+ }
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java
new file mode 100644
index 000000000..ab7f5de5a
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class Jobs {
+
+ private List<Overview> jobs;
+
+ public List<Overview> getJobs() {
+ return jobs;
+ }
+
+ public void setJobs(List<Overview> jobs) {
+ this.jobs = jobs;
+ }
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java
new file mode 100644
index 000000000..c54a4f29c
--- /dev/null
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+@Data
+public class Overview {
+
+ private String jid;
+
+ private String name;
+
+ private String state;
+
+ @JsonProperty("start-time")
+ private Long startTime;
+
+ @JsonProperty("end-time")
+ private Long endTime;
+
+ private Integer duration;
+
+ @JsonProperty("last-modification")
+ private Long lastModification;
+
+}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 50f6fd96d..47ccc5a84 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -26,6 +26,7 @@ import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkTrackControl
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.hc.client5.http.fluent.Request
import org.apache.hc.core5.util.Timeout
+import org.apache.streampark.archives.FetchArchives
import org.json4s.{DefaultFormats, JNothing, JNull}
import org.json4s.JsonAST.JArray
import org.json4s.jackson.JsonMethods.parse
@@ -276,7 +277,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
}
} else if (isConnection) {
logger.info("The deployment is deleted and enters the task failure process.")
- FlinkJobState.FAILED
+ FlinkJobState.of(FetchArchives.getJobStateFromArchiveFile(trackId.jobId))
} else {
inferSilentOrLostFromPreCache(latest)
}
diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index fc99e98fc..0302309a2 100644
--- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -187,7 +187,7 @@ object MavenTool extends Logger {
/**
* default maven local repository
*/
- lazy val localRepo = new LocalRepository(Workspace.MAVEN_LOCAL_DIR)
+ lazy val localRepo = new LocalRepository(Workspace.MAVEN_LOCAL_PATH)
def newRepoSystem(): RepositorySystem = {