You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by GitBox <gi...@apache.org> on 2022/10/30 03:02:32 UTC

[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1910: [improve]Precision state judgment based on archive logs

wolfboys commented on code in PR #1910:
URL: https://github.com/apache/incubator-streampark/pull/1910#discussion_r1008784400


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1361,6 +1369,10 @@ public void start(Application appParam, boolean auto) throws Exception {
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
+                ParameterTool parameter = ParameterTool.fromPropertiesFile(String.format("%s/conf/flink-conf.yaml", flinkEnv.getFlinkHome()));
+                String fsPath = Optional.ofNullable(properties.get(JobManagerOptions.ARCHIVE_DIR.key())).isPresent() ? properties.get(JobManagerOptions.ARCHIVE_DIR.key()) : Optional.ofNullable(parameter.get(JobManagerOptions.ARCHIVE_DIR.key())).orElse(String.format("%s/%s/", basePath, "completed-jobs"));

Review Comment:
   you can make code clearer and more readable



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -1361,6 +1369,10 @@ public void start(Application appParam, boolean auto) throws Exception {
         } else {
             if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 AssertUtils.state(buildResult != null);
+                ParameterTool parameter = ParameterTool.fromPropertiesFile(String.format("%s/conf/flink-conf.yaml", flinkEnv.getFlinkHome()));

Review Comment:
   hi MonsterChenzhuo:
   In the console layer, we currently avoid using the flink api directly, You can refer to method `expire` of `SavePointServiceImpl.java`:
   ```
    FlinkEnv flinkEnv = flinkEnvService.getByAppId(application.getAppId());
           AssertUtils.state(flinkEnv != null);
            String fsPath = Integer.parseInt(
               flinkEnv.convertFlinkYamlAsMap()
                   .getOrDefault(JobManagerOptions.ARCHIVE_DIR.key(), "...")
           );
   ```
   ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org