You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by GitBox <gi...@apache.org> on 2022/10/26 09:53:36 UTC

[GitHub] [incubator-streampark] MonsterChenzhuo opened a new pull request, #1910: [improve]Precision state judgment based on archive logs

MonsterChenzhuo opened a new pull request, #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910

   <!--
   Thank you for contributing to StreamPark! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   ## Contribution Checklist
   
     - If this is your first time, please read our contributor guidelines: [Submit Code](https://streampark.apache.org/community/submit_guide/submit_code).
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/streampark/issues).
   
     - Name the pull request in the form "[Feature] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
   
     - If the PR is unfinished, add `[WIP]` in your PR title, e.g., `[WIP][Feature] Title of the pull request`.
   
   -->
   
   ## What changes were proposed in this pull request
   
   Issue Number: close #1423 
   
   <!--(For example: This pull request proposed to add checkstyle plugin).-->
   
   ## Brief change log
   
   <!--*(for example:)*
   - *Add maven-checkstyle-plugin to root pom.xml*
   -->
   
   ## Verifying this change
   
   <!--*(Please pick either of the following options)*-->
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts
    - Dependencies (does it add or upgrade a dependency): yes
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1007073467


##########
streampark-console/streampark-console-service/src/main/resources/application.yml:
##########
@@ -91,6 +91,8 @@ streampark:
   workspace:
     local: /opt/streampark_workspace
     remote: hdfs://hdfscluster/streampark   # support hdfs:///streampark/ 、 /streampark 、hdfs://host:ip/streampark/
+  archive:
+    path: hdfs:///streampark/completed-jobs/

Review Comment:
   If so, we shouldn't use a new directory every time we need to use remote storage. A reasonable solution is : `workspace.remote` should support various storages, and `archive.path` should under the `workspace.remote`.
   
   In summary: we should use `workspace.remote` uniformly, and in the future, do lifecycle management for all directories under `workspace.local` and workspace.remote`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#issuecomment-1296433090

   hi MonsterChenzhuo:
   Please sync the latest code. and resolve conflict file. thx.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1007065388


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1361,6 +1368,11 @@ public void start(Application appParam, boolean auto) throws Exception {
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
+                String flinkConfPath = String.format("%s/conf/flink-conf.yaml", flinkEnv.getFlinkHome());
+                ParameterTool parameter = ParameterTool.fromPropertiesFile(flinkConfPath);

Review Comment:
   If user set the `JobManagerOptions.ARCHIVE_DIR.key()` in the `-D` properties, should we check it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1006567047


##########
streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala:
##########
@@ -276,7 +277,9 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
           }
         } else if (isConnection) {
           logger.info("The deployment is deleted and enters the task failure process.")
-          FlinkJobState.FAILED
+          val jobId = trackId.jobId
+          val archivePath = trackController.flinkArchives.get(jobId)
+          FlinkJobState.of(FetchArchives.fetchArchives(trackId.jobId, archivePath))

Review Comment:
   1.Yes, you should clear the cache
   2.The state of the restart is guaranteed by the state field in the t_flink_app table in mysql



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1007984995


##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+public class Overview {
+
+    public String getJid() {
+        return jid;
+    }
+
+    public void setJid(String jid) {
+        this.jid = jid;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+    public Long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(Long startTime) {
+        this.startTime = startTime;
+    }
+
+    public Long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(Long endTime) {
+        this.endTime = endTime;
+    }
+
+    public Integer getDuration() {
+        return duration;
+    }
+
+    public void setDuration(Integer duration) {
+        this.duration = duration;
+    }
+
+    public Long getLastModification() {
+        return lastModification;
+    }
+
+    public void setLastModification(Long lastModification) {
+        this.lastModification = lastModification;
+    }
+
+    public Task getTasks() {

Review Comment:
   <img width="778" alt="图片" src="https://user-images.githubusercontent.com/60029759/198581849-708a9df4-b0b6-436a-bc6d-19885b3a4ffa.png">
   The parsing of json involves these objects



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008818143


##########
streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala:
##########
@@ -102,6 +102,11 @@ case class Workspace(storageType: StorageType) {
    */
   lazy val APP_JARS = s"$WORKSPACE/jars"
 
+  /**
+   * project archives path
+   */
+  lazy val ARCHIVES_FILE_PATH = s"$WORKSPACE/historyserver/archive"

Review Comment:
     lazy val ARCHIVES_FILE_PATH = s"${Workspace.remote.WORKSPACE}/historyserver/archive"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1006542844


##########
streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala:
##########
@@ -236,3 +240,19 @@ object MetricCache {
   def build(): MetricCache = new MetricCache()
 
 }
+
+class ArchivesCache {
+  def put(k: String, v: String): Unit = cache.put(k, v)
+
+  def get(k: String): String = cache.getIfPresent(k)
+
+  def asMap(): Map[String, String] = cache.asMap().toMap
+
+  def cleanUp(): Unit = cache.cleanUp()
+
+  val cache: Cache[String, String] = Caffeine.newBuilder.build()

Review Comment:
   The FlinkJobStatusWatcher.dowatch() method needs to get the archived path corresponding to the task when scheduling. The archived path is put into the cache when the job is started, and the path in the cache is retrieved when the job is scheduled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1006558138


##########
streampark-flink/streampark-flink-kubernetes/pom.xml:
##########
@@ -96,6 +96,24 @@
             <version>${snakeyaml.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <optional>true</optional>
+        </dependency>

Review Comment:
   My side of the design is the default is hdfs, if you are other storage, you need to add their own to the lib under the relevant dependencies



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1007990499


##########
streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala:
##########
@@ -236,3 +240,19 @@ object MetricCache {
   def build(): MetricCache = new MetricCache()
 
 }
+
+class ArchivesCache {
+  def put(k: String, v: String): Unit = cache.put(k, v)
+
+  def get(k: String): String = cache.getIfPresent(k)
+
+  def asMap(): Map[String, String] = cache.asMap().toMap
+
+  def cleanUp(): Unit = cache.cleanUp()
+
+  val cache: Cache[String, String] = Caffeine.newBuilder.build()

Review Comment:
   The underlying Caffeine is map. i am using the logic of the previous cache here. Cache expiration I added this: 
   def invalidate(key: ClusterKey): Unit = cache.invalidate(key) 
   to control.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1009076305


##########
streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala:
##########
@@ -41,22 +41,27 @@ object Workspace {
   /**
    * dirPath of the maven local repository with built-in compilation process
    */
-  lazy val MAVEN_LOCAL_DIR = s"$localWorkspace/mvnrepo"
+  lazy val MAVEN_LOCAL_PATH = s"$localWorkspace/mvnrepo"
 
   /**
-   * local sourceCode dir.(for git...)
+   * local sourceCode path.(for git...)
    */
-  lazy val PROJECT_LOCAL_DIR = s"$localWorkspace/project"
+  lazy val PROJECT_LOCAL_PATH = s"$localWorkspace/project"
 
   /**
-   * local log dir.
+   * local log path.
    */
-  lazy val LOG_LOCAL_DIR = s"$localWorkspace/logs"
+  lazy val LOG_LOCAL_PATH = s"$localWorkspace/logs"
 
   /**
-   * project build log dir.
+   * project build log path.
    */
-  lazy val PROJECT_BUILD_LOG_DIR = s"$LOG_LOCAL_DIR/build_logs"
+  lazy val PROJECT_BUILD_LOG_PATH = s"$LOG_LOCAL_PATH/build_logs"
+
+  /**
+   * project archives path
+   */
+  lazy val ARCHIVES_FILE_PATH = s"${Workspace.remote.WORKSPACE}/historyserver/archive"

Review Comment:
   you can change it to:
   ```  lazy val ARCHIVES_FILE_PATH = s"${remote.WORKSPACE}/historyserver/archive"```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1005806252


##########
streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala:
##########
@@ -236,3 +240,19 @@ object MetricCache {
   def build(): MetricCache = new MetricCache()
 
 }
+
+class ArchivesCache {
+  def put(k: String, v: String): Unit = cache.put(k, v)
+
+  def get(k: String): String = cache.getIfPresent(k)
+
+  def asMap(): Map[String, String] = cache.asMap().toMap
+
+  def cleanUp(): Unit = cache.cleanUp()
+
+  val cache: Cache[String, String] = Caffeine.newBuilder.build()

Review Comment:
   The cache size or expire policy are not set, why use cache here?



##########
streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala:
##########
@@ -276,7 +277,9 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
           }
         } else if (isConnection) {
           logger.info("The deployment is deleted and enters the task failure process.")
-          FlinkJobState.FAILED
+          val jobId = trackId.jobId
+          val archivePath = trackController.flinkArchives.get(jobId)
+          FlinkJobState.of(FetchArchives.fetchArchives(trackId.jobId, archivePath))

Review Comment:
   1. Should we clear the jobId in the `flinkArchives` after get the jobState?
   2. If the StreamPark backend service is restarted. how to get the archivePath?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#issuecomment-1296577804

   cc @1996fanrui PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1009072616


##########
streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala:
##########
@@ -41,22 +41,27 @@ object Workspace {
   /**
    * dirPath of the maven local repository with built-in compilation process
    */
-  lazy val MAVEN_LOCAL_DIR = s"$localWorkspace/mvnrepo"
+  lazy val MAVEN_LOCAL_PATH = s"$localWorkspace/mvnrepo"
 
   /**
-   * local sourceCode dir.(for git...)
+   * local sourceCode path.(for git...)
    */
-  lazy val PROJECT_LOCAL_DIR = s"$localWorkspace/project"
+  lazy val PROJECT_LOCAL_PATH = s"$localWorkspace/project"
 
   /**
-   * local log dir.
+   * local log path.
    */
-  lazy val LOG_LOCAL_DIR = s"$localWorkspace/logs"
+  lazy val LOG_LOCAL_PATH = s"$localWorkspace/logs"
 
   /**
-   * project build log dir.
+   * project build log path.
    */
-  lazy val PROJECT_BUILD_LOG_DIR = s"$LOG_LOCAL_DIR/build_logs"
+  lazy val PROJECT_BUILD_LOG_PATH = s"$LOG_LOCAL_PATH/build_logs"
+
+  /**
+   * project archives path
+   */
+  lazy val ARCHIVES_FILE_PATH = s"$localWorkspace/historyserver/archive"

Review Comment:
   > @wolfboys why do you change it to the `localWorkspace`? As I understand, it should be the `remoteWorkspace`?
   
   oh, sorry, I see it as local.  It should really be `remote workspace`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1005738232


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1361,6 +1368,11 @@ public void start(Application appParam, boolean auto) throws Exception {
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
+                String flinkConfPath = String.format("%s/conf/flink-conf.yaml", flinkEnv.getFlinkHome());
+                ParameterTool parameter = ParameterTool.fromPropertiesFile(flinkConfPath);

Review Comment:
   Just check `flink-conf.yaml`? Should we check properties as well? 



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1361,6 +1368,11 @@ public void start(Application appParam, boolean auto) throws Exception {
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
+                String flinkConfPath = String.format("%s/conf/flink-conf.yaml", flinkEnv.getFlinkHome());
+                ParameterTool parameter = ParameterTool.fromPropertiesFile(flinkConfPath);
+                String fsPath = Optional.ofNullable(parameter.get("jobmanager.archive.fs.dir")).orElse(archivePath);

Review Comment:
   Should we use `JobManagerOptions.ARCHIVE_DIR.key()` here?



##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {

Review Comment:
   How about rename `fetchArchives`  to `getJobStateFromArchiveFile`?



##########
streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala:
##########
@@ -112,6 +112,8 @@ object ConfigConst {
 
   val KEY_FLINK_DEPLOYMENT_OPTION_PREFIX = "flink.deployment.option."
 
+  val KEY_JOBMANAGER_ARCHIVE_FS_DIR = "jobmanager.archive.fs.dir"

Review Comment:
   This code can be deleted, we can use `JobManagerOptions.ARCHIVE_DIR.key()` directly.



##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Objects.requireNonNull(archivePath, "archivePath cannot be empty.");
+            Path refreshPath = new Path(archivePath);
+            FileSystem refreshFS = refreshPath.getFileSystem();
+            RefreshLocation refreshLocation = new RefreshLocation(refreshPath, refreshFS);
+            Path refreshDir = refreshLocation.getPath();
+            // contents of /:refreshDir
+            FileStatus[] jobArchives;
+            jobArchives = listArchives(refreshLocation.getFs(), refreshDir);
+            Path jobArchivePath = Arrays.stream(jobArchives)
+                .map(FileStatus::getPath)
+                .filter(path -> path.getName().equals(jobId)).findFirst().orElse(null);
+
+            Objects.requireNonNull(jobArchivePath, "Archive Log Path Exception");
+            for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchivePath)) {
+                String path = archive.getPath();
+                System.out.println(path);
+                if (path.equals("/jobs/overview")) {
+                    String json = archive.getJson();
+                    ObjectMapper objectMapper = new ObjectMapper();
+                    Jobs jobs = objectMapper.readValue(json, Jobs.class);
+                    System.out.println(json);
+                    List<Overview> overviews = jobs.getJobs();
+                    return overviews.get(0).getState();

Review Comment:
   If the session mode is used, `jobs.getJobs();` have some jobs.
   
   In general, we should check `overview.getJid` here.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1361,6 +1368,11 @@ public void start(Application appParam, boolean auto) throws Exception {
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
+                String flinkConfPath = String.format("%s/conf/flink-conf.yaml", flinkEnv.getFlinkHome());

Review Comment:
   You can refactor `FlinkEnv.doSetFlinkConf()`. After refactor, you can call `flinkEnv.getFlinkConf()` directly.



##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Objects.requireNonNull(archivePath, "archivePath cannot be empty.");
+            Path refreshPath = new Path(archivePath);
+            FileSystem refreshFS = refreshPath.getFileSystem();
+            RefreshLocation refreshLocation = new RefreshLocation(refreshPath, refreshFS);
+            Path refreshDir = refreshLocation.getPath();
+            // contents of /:refreshDir
+            FileStatus[] jobArchives;
+            jobArchives = listArchives(refreshLocation.getFs(), refreshDir);
+            Path jobArchivePath = Arrays.stream(jobArchives)
+                .map(FileStatus::getPath)
+                .filter(path -> path.getName().equals(jobId)).findFirst().orElse(null);
+
+            Objects.requireNonNull(jobArchivePath, "Archive Log Path Exception");
+            for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchivePath)) {
+                String path = archive.getPath();
+                System.out.println(path);
+                if (path.equals("/jobs/overview")) {
+                    String json = archive.getJson();
+                    ObjectMapper objectMapper = new ObjectMapper();
+                    Jobs jobs = objectMapper.readValue(json, Jobs.class);
+                    System.out.println(json);

Review Comment:
   This line can be deleted.



##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Objects.requireNonNull(archivePath, "archivePath cannot be empty.");
+            Path refreshPath = new Path(archivePath);
+            FileSystem refreshFS = refreshPath.getFileSystem();
+            RefreshLocation refreshLocation = new RefreshLocation(refreshPath, refreshFS);
+            Path refreshDir = refreshLocation.getPath();
+            // contents of /:refreshDir
+            FileStatus[] jobArchives;
+            jobArchives = listArchives(refreshLocation.getFs(), refreshDir);
+            Path jobArchivePath = Arrays.stream(jobArchives)
+                .map(FileStatus::getPath)
+                .filter(path -> path.getName().equals(jobId)).findFirst().orElse(null);
+
+            Objects.requireNonNull(jobArchivePath, "Archive Log Path Exception");
+            for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchivePath)) {
+                String path = archive.getPath();
+                System.out.println(path);

Review Comment:
   This line can be deleted.



##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Objects.requireNonNull(archivePath, "archivePath cannot be empty.");
+            Path refreshPath = new Path(archivePath);
+            FileSystem refreshFS = refreshPath.getFileSystem();
+            RefreshLocation refreshLocation = new RefreshLocation(refreshPath, refreshFS);
+            Path refreshDir = refreshLocation.getPath();
+            // contents of /:refreshDir
+            FileStatus[] jobArchives;
+            jobArchives = listArchives(refreshLocation.getFs(), refreshDir);
+            Path jobArchivePath = Arrays.stream(jobArchives)
+                .map(FileStatus::getPath)
+                .filter(path -> path.getName().equals(jobId)).findFirst().orElse(null);
+
+            Objects.requireNonNull(jobArchivePath, "Archive Log Path Exception");
+            for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchivePath)) {
+                String path = archive.getPath();
+                System.out.println(path);
+                if (path.equals("/jobs/overview")) {
+                    String json = archive.getJson();
+                    ObjectMapper objectMapper = new ObjectMapper();
+                    Jobs jobs = objectMapper.readValue(json, Jobs.class);
+                    System.out.println(json);
+                    List<Overview> overviews = jobs.getJobs();
+                    return overviews.get(0).getState();
+                }
+                return "FAILED";
+            }
+            return "FAILED";
+        } catch (Exception e) {
+            return "FAILED";

Review Comment:
   Too many `"FAILED"`, this can be extract a static field.



##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Task.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+public class Task {

Review Comment:
   We didn't use the task, so we can remove the task in this version?
   
   And we can add it if it's needed in the future.



##########
streampark-flink/streampark-flink-kubernetes/pom.xml:
##########
@@ -96,6 +96,24 @@
             <version>${snakeyaml.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <optional>true</optional>
+        </dependency>

Review Comment:
   Are all dependencies necessary?
   
   In your kubernetes env, you use hdfs as your archive storage, so you add these dependencies, right?
   
   If yes.  If some users use s3, it will miss some jar, right?



##########
streampark-console/streampark-console-service/src/main/resources/application.yml:
##########
@@ -91,6 +91,8 @@ streampark:
   workspace:
     local: /opt/streampark_workspace
     remote: hdfs://hdfscluster/streampark   # support hdfs:///streampark/ 、 /streampark 、hdfs://host:ip/streampark/
+  archive:
+    path: hdfs:///streampark/completed-jobs/

Review Comment:
   The archive path shouldn't be configured. It should under the `workspace.remote`. What do you think?



##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Objects.requireNonNull(archivePath, "archivePath cannot be empty.");
+            Path refreshPath = new Path(archivePath);
+            FileSystem refreshFS = refreshPath.getFileSystem();
+            RefreshLocation refreshLocation = new RefreshLocation(refreshPath, refreshFS);
+            Path refreshDir = refreshLocation.getPath();
+            // contents of /:refreshDir
+            FileStatus[] jobArchives;
+            jobArchives = listArchives(refreshLocation.getFs(), refreshDir);

Review Comment:
   These 2 lines can be merged to 1 line.



##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Objects.requireNonNull(archivePath, "archivePath cannot be empty.");
+            Path refreshPath = new Path(archivePath);
+            FileSystem refreshFS = refreshPath.getFileSystem();
+            RefreshLocation refreshLocation = new RefreshLocation(refreshPath, refreshFS);
+            Path refreshDir = refreshLocation.getPath();
+            // contents of /:refreshDir
+            FileStatus[] jobArchives;
+            jobArchives = listArchives(refreshLocation.getFs(), refreshDir);
+            Path jobArchivePath = Arrays.stream(jobArchives)
+                .map(FileStatus::getPath)
+                .filter(path -> path.getName().equals(jobId)).findFirst().orElse(null);

Review Comment:
   How about list the `archivePath/jobId` dir directly?
   
   I don't think list the archivePath is necessary.



##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Objects.requireNonNull(archivePath, "archivePath cannot be empty.");
+            Path refreshPath = new Path(archivePath);
+            FileSystem refreshFS = refreshPath.getFileSystem();
+            RefreshLocation refreshLocation = new RefreshLocation(refreshPath, refreshFS);

Review Comment:
   Why need the `RefreshLocation`?
   
   The `refreshPath.getFileSystem()` can be used directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008784781


##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Task.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+public class Task {
+
+    private Integer total;
+
+    private Integer created;
+
+    private Integer scheduled;
+
+    private Integer deploying;
+
+    private Integer running;
+
+    private Integer finished;
+
+    private Integer canceling;
+
+    private Integer canceled;
+
+    private Integer failed;
+
+    private Integer reconciling;
+
+    public Integer getTotal() {

Review Comment:
   you can use lombok to replace getters and setters



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1006524627


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1361,6 +1368,11 @@ public void start(Application appParam, boolean auto) throws Exception {
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
+                String flinkConfPath = String.format("%s/conf/flink-conf.yaml", flinkEnv.getFlinkHome());
+                ParameterTool parameter = ParameterTool.fromPropertiesFile(flinkConfPath);

Review Comment:
                   String fsPath = Optional.ofNullable(parameter.get(JobManagerOptions.ARCHIVE_DIR.key())).orElse(archivePath);
                   
   How to flink-conf.yaml configuration no, will go to the default :orElse(archivePath)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1007092797


##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+public class Overview {
+
+    public String getJid() {
+        return jid;
+    }
+
+    public void setJid(String jid) {
+        this.jid = jid;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+    public Long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(Long startTime) {
+        this.startTime = startTime;
+    }
+
+    public Long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(Long endTime) {
+        this.endTime = endTime;
+    }
+
+    public Integer getDuration() {
+        return duration;
+    }
+
+    public void setDuration(Integer duration) {
+        this.duration = duration;
+    }
+
+    public Long getLastModification() {
+        return lastModification;
+    }
+
+    public void setLastModification(Long lastModification) {
+        this.lastModification = lastModification;
+    }
+
+    public Task getTasks() {

Review Comment:
   Why don't delete these 2 methods and Task class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008724635


##########
streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala:
##########
@@ -236,3 +240,19 @@ object MetricCache {
   def build(): MetricCache = new MetricCache()
 
 }
+
+class ArchivesCache {
+  def put(k: String, v: String): Unit = cache.put(k, v)
+
+  def get(k: String): String = cache.getIfPresent(k)
+
+  def asMap(): Map[String, String] = cache.asMap().toMap
+
+  def cleanUp(): Unit = cache.cleanUp()
+
+  val cache: Cache[String, String] = Caffeine.newBuilder.build()

Review Comment:
   Map supports `put`, `get` and `invalidate`, why use Cache?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008872869


##########
streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala:
##########
@@ -102,6 +102,11 @@ case class Workspace(storageType: StorageType) {
    */
   lazy val APP_JARS = s"$WORKSPACE/jars"
 
+  /**
+   * project archives path
+   */
+  lazy val ARCHIVES_FILE_PATH = s"$WORKSPACE/historyserver/archive"

Review Comment:
    This is how I call it :Workspace.remote().ARCHIVES_FILE_PATH() 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1007075297


##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Objects.requireNonNull(archivePath, "archivePath cannot be empty.");
+            Path refreshPath = new Path(archivePath);
+            FileSystem refreshFS = refreshPath.getFileSystem();
+            RefreshLocation refreshLocation = new RefreshLocation(refreshPath, refreshFS);
+            Path refreshDir = refreshLocation.getPath();
+            // contents of /:refreshDir
+            FileStatus[] jobArchives;
+            jobArchives = listArchives(refreshLocation.getFs(), refreshDir);
+            Path jobArchivePath = Arrays.stream(jobArchives)
+                .map(FileStatus::getPath)
+                .filter(path -> path.getName().equals(jobId)).findFirst().orElse(null);
+
+            Objects.requireNonNull(jobArchivePath, "Archive Log Path Exception");
+            for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchivePath)) {
+                String path = archive.getPath();
+                System.out.println(path);
+                if (path.equals("/jobs/overview")) {
+                    String json = archive.getJson();
+                    ObjectMapper objectMapper = new ObjectMapper();
+                    Jobs jobs = objectMapper.readValue(json, Jobs.class);
+                    System.out.println(json);
+                    List<Overview> overviews = jobs.getJobs();
+                    return overviews.get(0).getState();

Review Comment:
   > If the session mode is used, `jobs.getJobs();` have some jobs.
   
   That is, `List<Overview> overviews = jobs.getJobs();` will have some element. We should find the correct one based on jobId from overviews.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008878486


##########
streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala:
##########
@@ -102,6 +102,11 @@ case class Workspace(storageType: StorageType) {
    */
   lazy val APP_JARS = s"$WORKSPACE/jars"
 
+  /**
+   * project archives path
+   */
+  lazy val ARCHIVES_FILE_PATH = s"$WORKSPACE/historyserver/archive"

Review Comment:
   > lazy val ARCHIVES_FILE_PATH = s"${Workspace.remote.WORKSPACE}/historyserver/archive"
   
   see: https://github.com/apache/incubator-streampark/pull/1928



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1006557146


##########
streampark-console/streampark-console-service/src/main/resources/application.yml:
##########
@@ -91,6 +91,8 @@ streampark:
   workspace:
     local: /opt/streampark_workspace
     remote: hdfs://hdfscluster/streampark   # support hdfs:///streampark/ 、 /streampark 、hdfs://host:ip/streampark/
+  archive:
+    path: hdfs:///streampark/completed-jobs/

Review Comment:
   write together, if it is oss, this remote seems to support only hdfs, but we are supporting a variety of storage media



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1007554681


##########
streampark-console/streampark-console-service/src/main/resources/application.yml:
##########
@@ -91,6 +91,8 @@ streampark:
   workspace:
     local: /opt/streampark_workspace
     remote: hdfs://hdfscluster/streampark   # support hdfs:///streampark/ 、 /streampark 、hdfs://host:ip/streampark/
+  archive:
+    path: hdfs:///streampark/completed-jobs/

Review Comment:
   I'd like to leave this pr unchanged by default for now, and modify it later in a separate pr. Because I'm not sure if my changes here will lead to other problems



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1006535025


##########
streampark-flink/streampark-flink-kubernetes/pom.xml:
##########
@@ -96,6 +96,24 @@
             <version>${snakeyaml.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <optional>true</optional>
+        </dependency>

Review Comment:
   For different storage media, to join different related dependencies, here is the limit range: hdfs, s3 . Or is there a better way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1006540449


##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Objects.requireNonNull(archivePath, "archivePath cannot be empty.");
+            Path refreshPath = new Path(archivePath);
+            FileSystem refreshFS = refreshPath.getFileSystem();
+            RefreshLocation refreshLocation = new RefreshLocation(refreshPath, refreshFS);
+            Path refreshDir = refreshLocation.getPath();
+            // contents of /:refreshDir
+            FileStatus[] jobArchives;
+            jobArchives = listArchives(refreshLocation.getFs(), refreshDir);
+            Path jobArchivePath = Arrays.stream(jobArchives)
+                .map(FileStatus::getPath)
+                .filter(path -> path.getName().equals(jobId)).findFirst().orElse(null);
+
+            Objects.requireNonNull(jobArchivePath, "Archive Log Path Exception");
+            for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchivePath)) {
+                String path = archive.getPath();
+                System.out.println(path);
+                if (path.equals("/jobs/overview")) {
+                    String json = archive.getJson();
+                    ObjectMapper objectMapper = new ObjectMapper();
+                    Jobs jobs = objectMapper.readValue(json, Jobs.class);
+                    System.out.println(json);
+                    List<Overview> overviews = jobs.getJobs();
+                    return overviews.get(0).getState();

Review Comment:
   I don't quite understand what you mean by



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008785049


##########
streampark-flink/streampark-flink-kubernetes/pom.xml:
##########
@@ -96,6 +96,24 @@
             <version>${snakeyaml.version}</version>
         </dependency>
 
+        <dependency>

Review Comment:
   I don't see where these jars are used in the new code, why do we need to add these jars ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#issuecomment-1296276486

   @1996fanrui @wolfboys Thanks for the huge review work, I have revised all your suggestions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#issuecomment-1296609823

   @1996fanrui @wolfboys Thanks for the huge review work, I have revised all your suggestions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008803923


##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.streampark.common.conf.CommonConfig;
+import org.apache.streampark.common.util.JsonUtils;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    private static final String FAILED_STATE = "FAILED";
+
+    public static String getJobStateFromArchiveFile(String jobId) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Path archiveFilePath = new Path(CommonConfig.STREAMPARK_WORKSPACE_REMOTE().defaultValue().toString());

Review Comment:
   The `defaultValue ` may be wrong.
   
   1. You can refer the `Workspace.remote()`
   2. The archiveFilePath should be `$remote/historyserver/archive`
   3. When start an app, we should add `historyserver.archive.fs.dir=$remote/historyserver/archive` to the properties(that is -D).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008872869


##########
streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala:
##########
@@ -102,6 +102,11 @@ case class Workspace(storageType: StorageType) {
    */
   lazy val APP_JARS = s"$WORKSPACE/jars"
 
+  /**
+   * project archives path
+   */
+  lazy val ARCHIVES_FILE_PATH = s"$WORKSPACE/historyserver/archive"

Review Comment:
    This is how I call it :Workspace.remote().ARCHIVES_FILE_PATH() 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#issuecomment-1296295789

   > Hi @MonsterChenzhuo , thanks for your hard work.
   > 
   > The CI failed, please address it first.
   
   resolved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008724840


##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Overview.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+public class Overview {
+
+    public String getJid() {
+        return jid;
+    }
+
+    public void setJid(String jid) {
+        this.jid = jid;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+    public Long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(Long startTime) {
+        this.startTime = startTime;
+    }
+
+    public Long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(Long endTime) {
+        this.endTime = endTime;
+    }
+
+    public Integer getDuration() {
+        return duration;
+    }
+
+    public void setDuration(Integer duration) {
+        this.duration = duration;
+    }
+
+    public Long getLastModification() {
+        return lastModification;
+    }
+
+    public void setLastModification(Long lastModification) {
+        this.lastModification = lastModification;
+    }
+
+    public Task getTasks() {

Review Comment:
   Json includes tasks, but we don't use it.
   
   I mean we don't need to define Task in `Overview`, we just define some fields that we need.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008784647


##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    private static final String FAILED_STATE = "FAILED";
+
+    public static String getJobStateFromArchiveFile(String jobId, String archivePath) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Objects.requireNonNull(archivePath, "archivePath cannot be empty.");
+            Path archiveFilePath = new Path(String.format("%s/%s", archivePath, jobId));
+            for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(archiveFilePath)) {
+                String path = archive.getPath();
+                if (path.equals("/jobs/overview")) {
+                    String json = archive.getJson();
+                    ObjectMapper objectMapper = new ObjectMapper();

Review Comment:
   Why recreate the object `ObjectMapper` every time, instead of using `JsonUtils` to serialize ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1006522928


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1361,6 +1368,11 @@ public void start(Application appParam, boolean auto) throws Exception {
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
+                String flinkConfPath = String.format("%s/conf/flink-conf.yaml", flinkEnv.getFlinkHome());
+                ParameterTool parameter = ParameterTool.fromPropertiesFile(flinkConfPath);

Review Comment:
   properties? flink-conf.properties file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008818392


##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/Jobs.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import java.util.List;
+
+public class Jobs {
+
+    private List<Overview> jobs;
+
+    public List<Overview> getJobs() {

Review Comment:
   you can use lombok'@Data to replace getters and setters



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#issuecomment-1296278394

   Hi @MonsterChenzhuo , thanks for your hard work.
   
   The CI failed, please address it first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1007076154


##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/RefreshLocation.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+/**
+ * Container for the {@link Path} and {@link FileSystem} of a refresh directory.
+ */
+class RefreshLocation {

Review Comment:
   This class can be deleted.



##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Objects.requireNonNull(archivePath, "archivePath cannot be empty.");
+            Path refreshPath = new Path(archivePath);
+            FileSystem refreshFS = refreshPath.getFileSystem();
+            RefreshLocation refreshLocation = new RefreshLocation(refreshPath, refreshFS);
+            Path refreshDir = refreshLocation.getPath();
+            // contents of /:refreshDir
+            FileStatus[] jobArchives;
+            jobArchives = listArchives(refreshLocation.getFs(), refreshDir);
+            Path jobArchivePath = Arrays.stream(jobArchives)
+                .map(FileStatus::getPath)
+                .filter(path -> path.getName().equals(jobId)).findFirst().orElse(null);

Review Comment:
   You list all jobs from `archivePath`, then filter the jobId from all jobs.
   
   Why don't you list `archivePath/jobId` directly?



##########
streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkTrackController.scala:
##########
@@ -236,3 +240,19 @@ object MetricCache {
   def build(): MetricCache = new MetricCache()
 
 }
+
+class ArchivesCache {
+  def put(k: String, v: String): Unit = cache.put(k, v)
+
+  def get(k: String): String = cache.getIfPresent(k)
+
+  def asMap(): Map[String, String] = cache.asMap().toMap
+
+  def cleanUp(): Unit = cache.cleanUp()
+
+  val cache: Cache[String, String] = Caffeine.newBuilder.build()

Review Comment:
   I mean the cache size or expire policy are not set, why don't you use Map directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008726233


##########
streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala:
##########
@@ -276,7 +277,9 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
           }
         } else if (isConnection) {
           logger.info("The deployment is deleted and enters the task failure process.")
-          FlinkJobState.FAILED
+          val jobId = trackId.jobId
+          val archivePath = trackController.flinkArchives.get(jobId)
+          FlinkJobState.of(FetchArchives.fetchArchives(trackId.jobId, archivePath))

Review Comment:
   `val archivePath = trackController.flinkArchives.get(jobId)`
   
   We get `archivePath` from memory cache, I didn't see the `archivePath` store in mysql. 
   
   Please correct me if I'm wrong.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1009021494


##########
streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala:
##########
@@ -125,5 +125,10 @@ case class Workspace(storageType: StorageType) {
    */
   lazy val APP_JARS = s"$WORKSPACE/jars"
 
+  /**
+   * project archives path
+   */
+  lazy val ARCHIVES_FILE_PATH = s"${Workspace.remote.WORKSPACE}/historyserver/archive"

Review Comment:
   Currently, defined in `case class Workspace`,  need to move to `object Workspace`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1006564518


##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {
+        try {
+            Objects.requireNonNull(jobId, "JobId cannot be empty.");
+            Objects.requireNonNull(archivePath, "archivePath cannot be empty.");
+            Path refreshPath = new Path(archivePath);
+            FileSystem refreshFS = refreshPath.getFileSystem();
+            RefreshLocation refreshLocation = new RefreshLocation(refreshPath, refreshFS);
+            Path refreshDir = refreshLocation.getPath();
+            // contents of /:refreshDir
+            FileStatus[] jobArchives;
+            jobArchives = listArchives(refreshLocation.getFs(), refreshDir);
+            Path jobArchivePath = Arrays.stream(jobArchives)
+                .map(FileStatus::getPath)
+                .filter(path -> path.getName().equals(jobId)).findFirst().orElse(null);

Review Comment:
   Sorry, I didn't understand what you said



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui merged pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui merged PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008784400


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1361,6 +1369,10 @@ public void start(Application appParam, boolean auto) throws Exception {
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
+                ParameterTool parameter = ParameterTool.fromPropertiesFile(String.format("%s/conf/flink-conf.yaml", flinkEnv.getFlinkHome()));
+                String fsPath = Optional.ofNullable(properties.get(JobManagerOptions.ARCHIVE_DIR.key())).isPresent() ? properties.get(JobManagerOptions.ARCHIVE_DIR.key()) : Optional.ofNullable(parameter.get(JobManagerOptions.ARCHIVE_DIR.key())).orElse(String.format("%s/%s/", basePath, "completed-jobs"));

Review Comment:
   you can make code clearer and more readable



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1361,6 +1369,10 @@ public void start(Application appParam, boolean auto) throws Exception {
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
+                ParameterTool parameter = ParameterTool.fromPropertiesFile(String.format("%s/conf/flink-conf.yaml", flinkEnv.getFlinkHome()));

Review Comment:
   hi MonsterChenzhuo:
   In the console layer, we currently avoid using the flink api directly, You can refer to method `expire` of `SavePointServiceImpl.java`:
   ```
    FlinkEnv flinkEnv = flinkEnvService.getByAppId(application.getAppId());
           AssertUtils.state(flinkEnv != null);
            String fsPath = Integer.parseInt(
               flinkEnv.convertFlinkYamlAsMap()
                   .getOrDefault(JobManagerOptions.ARCHIVE_DIR.key(), "...")
           );
   ```
   ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008878037


##########
streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala:
##########
@@ -102,6 +102,11 @@ case class Workspace(storageType: StorageType) {
    */
   lazy val APP_JARS = s"$WORKSPACE/jars"
 
+  /**
+   * project archives path
+   */
+  lazy val ARCHIVES_FILE_PATH = s"$WORKSPACE/historyserver/archive"

Review Comment:
   > lazy val ARCHIVES_FILE_PATH = s"${Workspace.remote.WORKSPACE}/historyserver/archive"
   
   The design of this class is flawed and I'm working on re-improving it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1009057980


##########
streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala:
##########
@@ -41,22 +41,27 @@ object Workspace {
   /**
    * dirPath of the maven local repository with built-in compilation process
    */
-  lazy val MAVEN_LOCAL_DIR = s"$localWorkspace/mvnrepo"
+  lazy val MAVEN_LOCAL_PATH = s"$localWorkspace/mvnrepo"
 
   /**
-   * local sourceCode dir.(for git...)
+   * local sourceCode path.(for git...)
    */
-  lazy val PROJECT_LOCAL_DIR = s"$localWorkspace/project"
+  lazy val PROJECT_LOCAL_PATH = s"$localWorkspace/project"
 
   /**
-   * local log dir.
+   * local log path.
    */
-  lazy val LOG_LOCAL_DIR = s"$localWorkspace/logs"
+  lazy val LOG_LOCAL_PATH = s"$localWorkspace/logs"
 
   /**
-   * project build log dir.
+   * project build log path.
    */
-  lazy val PROJECT_BUILD_LOG_DIR = s"$LOG_LOCAL_DIR/build_logs"
+  lazy val PROJECT_BUILD_LOG_PATH = s"$LOG_LOCAL_PATH/build_logs"
+
+  /**
+   * project archives path
+   */
+  lazy val ARCHIVES_FILE_PATH = s"$localWorkspace/historyserver/archive"

Review Comment:
   @wolfboys why do you change it to the `localWorkspace`? As I understand, it should be the `remoteWorkspace`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1006521763


##########
streampark-flink/streampark-flink-kubernetes/src/main/java/org/apache/streampark/archives/FetchArchives.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.archives;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+public class FetchArchives {
+
+    public static String fetchArchives(String jobId, String archivePath) {

Review Comment:
   good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1006522928


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1361,6 +1368,11 @@ public void start(Application appParam, boolean auto) throws Exception {
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
+                String flinkConfPath = String.format("%s/conf/flink-conf.yaml", flinkEnv.getFlinkHome());
+                ParameterTool parameter = ParameterTool.fromPropertiesFile(flinkConfPath);

Review Comment:
   properties? I didn't quite understand



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org