You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by GitBox <gi...@apache.org> on 2023/01/08 10:28:28 UTC

[GitHub] [incubator-streampark] wolfboys opened a new pull request, #2243: [Improve] CompletableFuture utils improvement

wolfboys opened a new pull request, #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243

   [Improve] CompletableFuture improvement
   
   
   ## What changes were proposed in this pull request
   
   Issue Number: close #xxx <!-- REMOVE this line if no issue to close -->
   
   <!--(For example: This pull request proposed to add checkstyle plugin).-->
   
   ## Brief change log
   
   <!--*(for example:)*
   - *Add maven-checkstyle-plugin to root pom.xml*
   -->
   
   ## Verifying this change
   
   <!--*(Please pick either of the following options)*-->
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   <!--*(example:)*
   - *Added integration tests for end-to-end.*
   - *Added *Test to verify the change.*
   - *Manually verified the change by testing locally.* -->
   
   ## Does this pull request potentially affect one of the following parts
    - Dependencies (does it add or upgrade a dependency): no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1064248917


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -580,6 +584,36 @@ public List<String> historyUploadJars() {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception {
+    Application application = getById(id);
+    AssertUtils.state(application != null);
+    if (ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
+      CompletionStage<String> stage =
+          CompletableFuture.supplyAsync(
+                  () ->
+                      KubernetesDeploymentHelper.watchDeploymentLog(
+                          application.getK8sNamespace(),
+                          application.getJobName(),
+                          application.getJobId()))
+              .exceptionally(
+                  e -> {
+                    try {
+                      return String.format(
+                          "%s/%s_err.log", WebUtils.getAppTempDir(), application.getJobId());
+                    } catch (Exception ex) {
+                      throw new ApiDetailException(
+                          "Generate log path exception: " + ex.getMessage());
+                    }
+                  })
+              .thenApply(path -> logClient.rollViewLog(String.valueOf(path), offset, limit));
+      CompletableFuture<String> future = stage.toCompletableFuture();
+      return future.get(5, TimeUnit.SECONDS);

Review Comment:
   > Should we cancel the future after timeout?
   
   Thanks for your review, this code has been improved and some bugs have been solved, usually the feature does not need to be closed manually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065696925


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +150,73 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                1,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String expectedExceptionMessage = "Exception in exceptionally handler";
+    int processTime = 5000;
+    int timeOut = 1000;
+    CompletableFuture<String> future =
+        CompletableFutureUtils.supplyTimeout(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    Thread.sleep(processTime);
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return "success";
+                }),
+            timeOut,
+            TimeUnit.MILLISECONDS,
+            success -> success,
+            e -> {
+              throw new RuntimeException(expectedExceptionMessage);
+            });
+
+    assertThatThrownBy(future::get)
+        .hasMessageContaining(expectedExceptionMessage)
+        .hasCauseInstanceOf(RuntimeException.class)
+        .isInstanceOf(ExecutionException.class);

Review Comment:
   > 
   
   Cannot resolve method 'cause' in 'AbstractThrowableAssert'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#issuecomment-1376801895

   
   > @wolfboys @1996fanrui When I run gh login, I can't check pr without a github login jump, so I can't test accordingly. I don't know if it's a problem with the local network or with github
   
   Let me help you, come on


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#issuecomment-1376793990

   @wolfboys @1996fanrui  When I run gh login, I can't check pr without a github login jump, so I can't test accordingly. I don't know if it's a problem with the local network or with github


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065703200


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +150,73 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                1,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String expectedExceptionMessage = "Exception in exceptionally handler";
+    int processTime = 5000;
+    int timeOut = 1000;
+    CompletableFuture<String> future =
+        CompletableFutureUtils.supplyTimeout(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    Thread.sleep(processTime);
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return "success";
+                }),
+            timeOut,
+            TimeUnit.MILLISECONDS,
+            success -> success,
+            e -> {
+              throw new RuntimeException(expectedExceptionMessage);
+            });
+
+    assertThatThrownBy(future::get)
+        .hasMessageContaining(expectedExceptionMessage)
+        .hasCauseInstanceOf(RuntimeException.class)
+        .isInstanceOf(ExecutionException.class);

Review Comment:
   > .isInstanceOf(RuntimeException.class);
   
    assertThatThrownBy(future::get)
           .getCause()
           .as(expectedExceptionMessage)
           .isInstanceOf(RuntimeException.class);



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1064332532


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,64 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> "success"),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> "error")
+            .thenApply(r -> r)
+            .get();
+    System.out.println(resp);

Review Comment:
   resolved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#issuecomment-1376776390

   Hi @wolfboys , just to managed the expectation, as I said before: `Sorry, I'm not familiar with flink on k8s related code, so I just feedback some technical comments.`
   
   My comments are just code specification, I didn't review the core logical of this PR. So I cannot approve this PR after my comments are resolved. It still needs some k8s reviewers.
   
   cc @MonsterChenzhuo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#issuecomment-1376816097

   > Hi @wolfboys , just to managed the expectation, as I said before: `Sorry, I'm not familiar with flink on k8s related code, so I just feedback some technical comments.`
   > 
   > My comments are just code specification, I didn't review the core logical of this PR. So I cannot approve this PR after my comments are resolved. It still needs some k8s reviewers.
   > 
   > cc @MonsterChenzhuo
   
   Thanks for your professional opinions. very good 👏👏👏


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065390828


##########
streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala:
##########
@@ -56,7 +56,12 @@ object CompletableFutureUtils {
       unit: TimeUnit,
       handle: JavaFunc[T, T],
       exceptionally: JavaFunc[Throwable, T]): CompletableFuture[T] = {
-    future.applyToEither(setTimeout(timeout, unit), handle).exceptionally(exceptionally)
+    future
+      .applyToEither(setTimeout(timeout, unit), handle)
+      .exceptionally(exceptionally)
+      .whenComplete(new BiConsumer[T, Throwable]() {
+        override def accept(t: T, u: Throwable): Unit = future.cancel(true)

Review Comment:
   > Could you please add some unit tests to check the future is canceled after timeout?
   
   hi @1996fanrui :
   
   whenComplete is the completion of the call, run after future successful and exceptionally(Include timeout)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065385774


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,77 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String resp;
+    String exResult = "exception";
+    try {
+      resp = exceptionally();
+    } catch (Exception e) {
+      resp = exResult;
+    }
+    Assertions.assertEquals(resp, exResult);
+  }
+
+  private String exceptionally() throws Exception {
+    return CompletableFutureUtils.supplyTimeout(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    Thread.sleep(5000);
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return "success";
+                }),
+            2,
+            TimeUnit.SECONDS,
+            success -> success,
+            e -> {
+              throw new RuntimeException("exception");
+            })
+        .thenApply(r -> r)
+        .get();
+  }

Review Comment:
   We can catch this exception instead of actually throw this exception, otherwise the test ci verification failed will fail



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1064162273


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -580,6 +584,36 @@ public List<String> historyUploadJars() {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception {
+    Application application = getById(id);
+    AssertUtils.state(application != null);
+    if (ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
+      CompletionStage<String> stage =
+          CompletableFuture.supplyAsync(
+                  () ->
+                      KubernetesDeploymentHelper.watchDeploymentLog(
+                          application.getK8sNamespace(),
+                          application.getJobName(),
+                          application.getJobId()))
+              .exceptionally(
+                  e -> {
+                    try {
+                      return String.format(
+                          "%s/%s_err.log", WebUtils.getAppTempDir(), application.getJobId());
+                    } catch (Exception ex) {
+                      throw new ApiDetailException(
+                          "Generate log path exception: " + ex.getMessage());
+                    }
+                  })
+              .thenApply(path -> logClient.rollViewLog(String.valueOf(path), offset, limit));
+      CompletableFuture<String> future = stage.toCompletableFuture();

Review Comment:
   ```suggestion
         CompletableFuture<String> future =
             CompletableFuture.supplyAsync(
                     () ->
                         KubernetesDeploymentHelper.watchDeploymentLog(
                             application.getK8sNamespace(),
                             application.getJobName(),
                             application.getJobId()))
                 .exceptionally(
                     e -> {
                       try {
                         return String.format(
                             "%s/%s_err.log", WebUtils.getAppTempDir(), application.getJobId());
                       } catch (Exception ex) {
                         throw new ApiDetailException(
                             "Generate log path exception: " + ex.getMessage());
                       }
                     })
                 .thenApply(path -> logClient.rollViewLog(String.valueOf(path), offset, limit));
   ```



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -580,6 +584,36 @@ public List<String> historyUploadJars() {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception {
+    Application application = getById(id);
+    AssertUtils.state(application != null);
+    if (ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
+      CompletionStage<String> stage =
+          CompletableFuture.supplyAsync(
+                  () ->
+                      KubernetesDeploymentHelper.watchDeploymentLog(
+                          application.getK8sNamespace(),
+                          application.getJobName(),
+                          application.getJobId()))
+              .exceptionally(
+                  e -> {
+                    try {
+                      return String.format(
+                          "%s/%s_err.log", WebUtils.getAppTempDir(), application.getJobId());
+                    } catch (Exception ex) {
+                      throw new ApiDetailException(
+                          "Generate log path exception: " + ex.getMessage());
+                    }
+                  })
+              .thenApply(path -> logClient.rollViewLog(String.valueOf(path), offset, limit));
+      CompletableFuture<String> future = stage.toCompletableFuture();
+      return future.get(5, TimeUnit.SECONDS);

Review Comment:
   Should we cancel the future after timeout?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065385774


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,77 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String resp;
+    String exResult = "exception";
+    try {
+      resp = exceptionally();
+    } catch (Exception e) {
+      resp = exResult;
+    }
+    Assertions.assertEquals(resp, exResult);
+  }
+
+  private String exceptionally() throws Exception {
+    return CompletableFutureUtils.supplyTimeout(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    Thread.sleep(5000);
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return "success";
+                }),
+            2,
+            TimeUnit.SECONDS,
+            success -> success,
+            e -> {
+              throw new RuntimeException("exception");
+            })
+        .thenApply(r -> r)
+        .get();
+  }

Review Comment:
   We can catch this exception instead of actually throw this exception, otherwise the test ci verification will fail



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1064297037


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LogClientService.java:
##########
@@ -33,10 +33,10 @@
 @Slf4j
 @Component
 public class LogClientService {
-  public String rollViewLog(String path, int skipLineNum, int limit) {
+  public String rollViewLog(String path, int offset, int limit) {
     String result = "";
     try {
-      List<String> lines = readPartFileContent(path, skipLineNum, limit);
+      List<String> lines = readPartFileContent(path, offset, limit);

Review Comment:
   The result can be removed, and return `""` in the end of this method directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1064332261


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LogClientService.java:
##########
@@ -33,10 +33,10 @@
 @Slf4j
 @Component
 public class LogClientService {
-  public String rollViewLog(String path, int skipLineNum, int limit) {
+  public String rollViewLog(String path, int offset, int limit) {
     String result = "";
     try {
-      List<String> lines = readPartFileContent(path, skipLineNum, limit);
+      List<String> lines = readPartFileContent(path, offset, limit);

Review Comment:
   > The result can be removed, and return `""` in the end of this method directly.
   
   thanks for your review, done.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -580,6 +583,39 @@ public List<String> historyUploadJars() {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception {
+    Application application = getById(id);
+    AssertUtils.state(application != null);
+    if (ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
+      CompletableFutureUtils.supplyTimeout(
+              CompletableFuture.supplyAsync(
+                  () ->
+                      KubernetesDeploymentHelper.watchDeploymentLog(
+                          application.getK8sNamespace(),
+                          application.getJobName(),
+                          application.getJobId())),
+              5,
+              TimeUnit.SECONDS,
+              success -> success,
+              exception -> {
+                String errorLog =
+                    String.format(
+                        "%s/%s_err.log", WebUtils.getAppTempDir(), application.getJobId());
+                File file = new File(errorLog);
+                if (file.exists()) {
+                  return errorLog;
+                } else {
+                  throw new ApiDetailException("get k8s job log failed: " + exception.getMessage());
+                }
+              })

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065338938


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,77 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String resp;
+    String exResult = "exception";
+    try {
+      resp = exceptionally();

Review Comment:
   ```suggestion
         resp = exceptionally();
         fail("It should be called.");
   ```



##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,77 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String resp;
+    String exResult = "exception";
+    try {
+      resp = exceptionally();
+    } catch (Exception e) {
+      resp = exResult;

Review Comment:
   We should check the exception type here instead of check the `exResult`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065348284


##########
streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala:
##########
@@ -56,7 +56,12 @@ object CompletableFutureUtils {
       unit: TimeUnit,
       handle: JavaFunc[T, T],
       exceptionally: JavaFunc[Throwable, T]): CompletableFuture[T] = {
-    future.applyToEither(setTimeout(timeout, unit), handle).exceptionally(exceptionally)
+    future
+      .applyToEither(setTimeout(timeout, unit), handle)
+      .exceptionally(exceptionally)
+      .whenComplete(new BiConsumer[T, Throwable]() {
+        override def accept(t: T, u: Throwable): Unit = future.cancel(true)

Review Comment:
   Could you please add some unit tests to check the future is canceled after timeout?



##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,77 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String resp;
+    String exResult = "exception";
+    try {
+      resp = exceptionally();

Review Comment:
   ```suggestion
         resp = exceptionally();
         fail("It should be called.");
   ```



##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,77 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String resp;
+    String exResult = "exception";
+    try {
+      resp = exceptionally();
+    } catch (Exception e) {
+      resp = exResult;

Review Comment:
   We should check the exception type here instead of check the `exResult`.



##########
streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala:
##########
@@ -75,36 +80,47 @@ object CompletableFutureUtils {
       unit: TimeUnit,
       handle: Consumer[T],
       exceptionally: Consumer[Throwable]): CompletableFuture[Unit] = {
-    future.applyToEither(
+
+    val unitFuture = future.applyToEither(
       setTimeout(timeout, unit),
-      new JavaFunc[T, Unit] {
+      new JavaFunc[T, Unit]() {
         override def apply(t: T): Unit = {
           if (handle != null) {
             handle.accept(t)
           }
         }
-      }).exceptionally(new JavaFunc[Throwable, Unit] {
+      }).exceptionally(new JavaFunc[Throwable, Unit]() {
       override def apply(t: Throwable): Unit = {
         if (exceptionally != null) {
           exceptionally.accept(t)
         }
       }
     })
+    cancelUnitFuture(unitFuture)
   }
 
   def runTimeout[T](future: CompletableFuture[T], timeout: Long, unit: TimeUnit): CompletableFuture[Unit] = {
-    runTimeout(
+    val unitFuture = runTimeout(
       future,
       timeout,
       unit,
       null,
-      new Consumer[Throwable] {
+      new Consumer[Throwable]() {
         override def accept(t: Throwable): Unit = {
           if (!future.isDone) {
             future.cancel(true)
           }
         }
       })
+    cancelUnitFuture(unitFuture)

Review Comment:
   The unitFuture is canceled three times:
   
   <img width="881" alt="image" src="https://user-images.githubusercontent.com/38427477/211469510-d7627789-c893-4067-bccb-6d199667db24.png">
   



##########
streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala:
##########
@@ -75,36 +80,47 @@ object CompletableFutureUtils {
       unit: TimeUnit,
       handle: Consumer[T],
       exceptionally: Consumer[Throwable]): CompletableFuture[Unit] = {
-    future.applyToEither(
+
+    val unitFuture = future.applyToEither(
       setTimeout(timeout, unit),
-      new JavaFunc[T, Unit] {
+      new JavaFunc[T, Unit]() {
         override def apply(t: T): Unit = {
           if (handle != null) {
             handle.accept(t)
           }
         }
-      }).exceptionally(new JavaFunc[Throwable, Unit] {
+      }).exceptionally(new JavaFunc[Throwable, Unit]() {
       override def apply(t: Throwable): Unit = {
         if (exceptionally != null) {
           exceptionally.accept(t)
         }
       }
     })
+    cancelUnitFuture(unitFuture)
   }
 
   def runTimeout[T](future: CompletableFuture[T], timeout: Long, unit: TimeUnit): CompletableFuture[Unit] = {
-    runTimeout(
+    val unitFuture = runTimeout(
       future,
       timeout,
       unit,
       null,
-      new Consumer[Throwable] {
+      new Consumer[Throwable]() {
         override def accept(t: Throwable): Unit = {
           if (!future.isDone) {
             future.cancel(true)
           }
         }
       })
+    cancelUnitFuture(unitFuture)
+  }
+
+  private[this] def cancelUnitFuture(future: CompletableFuture[Unit]): CompletableFuture[Unit] = {
+    future.whenComplete(new BiConsumer[Unit, Throwable]() {
+      override def accept(t: Unit, u: Throwable): Unit = {
+        future.cancel(true)

Review Comment:
   Check `!future.isDone` first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065403146


##########
streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala:
##########
@@ -75,36 +80,47 @@ object CompletableFutureUtils {
       unit: TimeUnit,
       handle: Consumer[T],
       exceptionally: Consumer[Throwable]): CompletableFuture[Unit] = {
-    future.applyToEither(
+
+    val unitFuture = future.applyToEither(
       setTimeout(timeout, unit),
-      new JavaFunc[T, Unit] {
+      new JavaFunc[T, Unit]() {
         override def apply(t: T): Unit = {
           if (handle != null) {
             handle.accept(t)
           }
         }
-      }).exceptionally(new JavaFunc[Throwable, Unit] {
+      }).exceptionally(new JavaFunc[Throwable, Unit]() {
       override def apply(t: Throwable): Unit = {
         if (exceptionally != null) {
           exceptionally.accept(t)
         }
       }
     })
+    cancelUnitFuture(unitFuture)
   }
 
   def runTimeout[T](future: CompletableFuture[T], timeout: Long, unit: TimeUnit): CompletableFuture[Unit] = {
-    runTimeout(
+    val unitFuture = runTimeout(
       future,
       timeout,
       unit,
       null,
-      new Consumer[Throwable] {
+      new Consumer[Throwable]() {
         override def accept(t: Throwable): Unit = {
           if (!future.isDone) {
             future.cancel(true)
           }
         }
       })
+    cancelUnitFuture(unitFuture)
+  }
+
+  private[this] def cancelUnitFuture(future: CompletableFuture[Unit]): CompletableFuture[Unit] = {
+    future.whenComplete(new BiConsumer[Unit, Throwable]() {
+      override def accept(t: Unit, u: Throwable): Unit = {
+        future.cancel(true)

Review Comment:
   > I know it can be executed multiple times, I mean we should not execute some useless code, and we just cancel the future when it needs to be canceled.
   > 
   > I see you updated the `!future.isCanceled`, it doesn't make sense. When future is finished before timeout, we don't need cancel the future, however `future.isCanceled == false`.
   > 
   > That's why I said, we should check `!future.isDone` instead of `!future.isCanceled`
   
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1064282729


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,64 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> "success"),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> "error")
+            .thenApply(r -> r)
+            .get();
+    System.out.println(resp);

Review Comment:
   All tests should check some conditions instead of `System.out.println`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#issuecomment-1375219986

   cc @MonsterChenzhuo PTAL if your have free time. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1066861534


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +150,73 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                1,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String expectedExceptionMessage = "Exception in exceptionally handler";
+    int processTime = 5000;
+    int timeOut = 1000;
+    CompletableFuture<String> future =
+        CompletableFutureUtils.supplyTimeout(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    Thread.sleep(processTime);
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return "success";
+                }),
+            timeOut,
+            TimeUnit.MILLISECONDS,
+            success -> success,
+            e -> {
+              throw new RuntimeException(expectedExceptionMessage);
+            });
+
+    assertThatThrownBy(future::get)
+        .hasMessageContaining(expectedExceptionMessage)
+        .hasCauseInstanceOf(RuntimeException.class)
+        .isInstanceOf(ExecutionException.class);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065363850


##########
streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala:
##########
@@ -75,36 +80,47 @@ object CompletableFutureUtils {
       unit: TimeUnit,
       handle: Consumer[T],
       exceptionally: Consumer[Throwable]): CompletableFuture[Unit] = {
-    future.applyToEither(
+
+    val unitFuture = future.applyToEither(
       setTimeout(timeout, unit),
-      new JavaFunc[T, Unit] {
+      new JavaFunc[T, Unit]() {
         override def apply(t: T): Unit = {
           if (handle != null) {
             handle.accept(t)
           }
         }
-      }).exceptionally(new JavaFunc[Throwable, Unit] {
+      }).exceptionally(new JavaFunc[Throwable, Unit]() {
       override def apply(t: Throwable): Unit = {
         if (exceptionally != null) {
           exceptionally.accept(t)
         }
       }
     })
+    cancelUnitFuture(unitFuture)
   }
 
   def runTimeout[T](future: CompletableFuture[T], timeout: Long, unit: TimeUnit): CompletableFuture[Unit] = {
-    runTimeout(
+    val unitFuture = runTimeout(
       future,
       timeout,
       unit,
       null,
-      new Consumer[Throwable] {
+      new Consumer[Throwable]() {
         override def accept(t: Throwable): Unit = {
           if (!future.isDone) {
             future.cancel(true)
           }
         }
       })
+    cancelUnitFuture(unitFuture)
+  }
+
+  private[this] def cancelUnitFuture(future: CompletableFuture[Unit]): CompletableFuture[Unit] = {
+    future.whenComplete(new BiConsumer[Unit, Throwable]() {
+      override def accept(t: Unit, u: Throwable): Unit = {
+        future.cancel(true)

Review Comment:
   > Check `!future.isDone` first.
   
   In fact, this judgment is not needed, future.cancel can be executed multiple times, and the judgment is made in the internal implementation. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065400229


##########
streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala:
##########
@@ -75,36 +80,47 @@ object CompletableFutureUtils {
       unit: TimeUnit,
       handle: Consumer[T],
       exceptionally: Consumer[Throwable]): CompletableFuture[Unit] = {
-    future.applyToEither(
+
+    val unitFuture = future.applyToEither(
       setTimeout(timeout, unit),
-      new JavaFunc[T, Unit] {
+      new JavaFunc[T, Unit]() {
         override def apply(t: T): Unit = {
           if (handle != null) {
             handle.accept(t)
           }
         }
-      }).exceptionally(new JavaFunc[Throwable, Unit] {
+      }).exceptionally(new JavaFunc[Throwable, Unit]() {
       override def apply(t: Throwable): Unit = {
         if (exceptionally != null) {
           exceptionally.accept(t)
         }
       }
     })
+    cancelUnitFuture(unitFuture)
   }
 
   def runTimeout[T](future: CompletableFuture[T], timeout: Long, unit: TimeUnit): CompletableFuture[Unit] = {
-    runTimeout(
+    val unitFuture = runTimeout(
       future,
       timeout,
       unit,
       null,
-      new Consumer[Throwable] {
+      new Consumer[Throwable]() {
         override def accept(t: Throwable): Unit = {
           if (!future.isDone) {
             future.cancel(true)
           }
         }
       })
+    cancelUnitFuture(unitFuture)
+  }
+
+  private[this] def cancelUnitFuture(future: CompletableFuture[Unit]): CompletableFuture[Unit] = {
+    future.whenComplete(new BiConsumer[Unit, Throwable]() {
+      override def accept(t: Unit, u: Throwable): Unit = {
+        future.cancel(true)

Review Comment:
   I know it can be executed multiple times, I mean we should not execute some useless code, and we just cancel the future when it needs to be canceled. 
   
   I see you updated the `!future.isCanceled`, it doesn't make sense. When future is finished before timeout, we don't need cancel the future, however `future.isCanceled == false`.
   
   That's why I said, we should check `!future.isDone` instead of `!future.isCanceled`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065399497


##########
streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala:
##########
@@ -56,7 +56,12 @@ object CompletableFutureUtils {
       unit: TimeUnit,
       handle: JavaFunc[T, T],
       exceptionally: JavaFunc[Throwable, T]): CompletableFuture[T] = {
-    future.applyToEither(setTimeout(timeout, unit), handle).exceptionally(exceptionally)
+    future
+      .applyToEither(setTimeout(timeout, unit), handle)
+      .exceptionally(exceptionally)
+      .whenComplete(new BiConsumer[T, Throwable]() {
+        override def accept(t: T, u: Throwable): Unit = future.cancel(true)

Review Comment:
   > I understand the code, I just mean we should add a unit test.
   
   okay. I provide this testcase



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065401951


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,77 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String resp;
+    String exResult = "exception";
+    try {
+      resp = exceptionally();
+    } catch (Exception e) {
+      resp = exResult;
+    }
+    Assertions.assertEquals(resp, exResult);
+  }
+
+  private String exceptionally() throws Exception {
+    return CompletableFutureUtils.supplyTimeout(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    Thread.sleep(5000);
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return "success";
+                }),
+            2,
+            TimeUnit.SECONDS,
+            success -> success,
+            e -> {
+              throw new RuntimeException("exception");
+            })
+        .thenApply(r -> r)
+        .get();
+  }

Review Comment:
   You can take a look how to use the  `assertThatThrownBy`. Apache flink use it as well.



##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,77 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String resp;
+    String exResult = "exception";
+    try {
+      resp = exceptionally();
+    } catch (Exception e) {
+      resp = exResult;
+    }
+    Assertions.assertEquals(resp, exResult);
+  }
+
+  private String exceptionally() throws Exception {
+    return CompletableFutureUtils.supplyTimeout(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    Thread.sleep(5000);
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return "success";
+                }),
+            2,
+            TimeUnit.SECONDS,
+            success -> success,
+            e -> {
+              throw new RuntimeException("exception");
+            })
+        .thenApply(r -> r)
+        .get();
+  }

Review Comment:
   You can take a look how to use the  `assertThatThrownBy`. Apache flink uses it as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065361014


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,77 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String resp;
+    String exResult = "exception";
+    try {
+      resp = exceptionally();
+    } catch (Exception e) {
+      resp = exResult;
+    }
+    Assertions.assertEquals(resp, exResult);
+  }
+
+  private String exceptionally() throws Exception {
+    return CompletableFutureUtils.supplyTimeout(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    Thread.sleep(5000);
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return "success";
+                }),
+            2,
+            TimeUnit.SECONDS,
+            success -> success,
+            e -> {
+              throw new RuntimeException("exception");
+            })
+        .thenApply(r -> r)
+        .get();
+  }

Review Comment:
   ```suggestion
   
     @Test
     public void thenSupplyException() {
       String expectedExceptionMessage = "Exception in exceptionally handler";
       CompletableFuture<String> future =
           CompletableFutureUtils.supplyTimeout(
               CompletableFuture.supplyAsync(
                   () -> {
                     try {
                       Thread.sleep(5000);
                     } catch (InterruptedException e) {
                       throw new RuntimeException(e);
                     }
                     return "success";
                   }),
               2,
               TimeUnit.SECONDS,
               success -> success,
               e -> {
                 throw new RuntimeException(expectedExceptionMessage);
               });
       assertThatThrownBy(future::get)
           .cause()
           .as(expectedExceptionMessage)
           .isInstanceOf(RuntimeException.class);
     }
   ```
   
   Using the `assertThatThrownBy` instead of try catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065638118


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +150,73 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                1,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String expectedExceptionMessage = "Exception in exceptionally handler";
+    int processTime = 5000;
+    int timeOut = 1000;
+    CompletableFuture<String> future =
+        CompletableFutureUtils.supplyTimeout(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    Thread.sleep(processTime);
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return "success";
+                }),
+            timeOut,
+            TimeUnit.MILLISECONDS,
+            success -> success,
+            e -> {
+              throw new RuntimeException(expectedExceptionMessage);
+            });
+
+    assertThatThrownBy(future::get)
+        .hasMessageContaining(expectedExceptionMessage)
+        .hasCauseInstanceOf(RuntimeException.class)
+        .isInstanceOf(ExecutionException.class);

Review Comment:
   ```suggestion
       assertThatThrownBy(future::get)
           .cause()
           .as(expectedExceptionMessage)
           .isInstanceOf(RuntimeException.class);
   ```



##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -15,16 +15,23 @@
  * limitations under the License.
  */
 
-package java.util.concurrent;
+package org.apache.org.apache.streampark.common.util;

Review Comment:
   The `org.apache` is repeated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1064298652


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java:
##########
@@ -580,6 +583,39 @@ public List<String> historyUploadJars() {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception {
+    Application application = getById(id);
+    AssertUtils.state(application != null);
+    if (ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
+      CompletableFutureUtils.supplyTimeout(
+              CompletableFuture.supplyAsync(
+                  () ->
+                      KubernetesDeploymentHelper.watchDeploymentLog(
+                          application.getK8sNamespace(),
+                          application.getJobName(),
+                          application.getJobId())),
+              5,
+              TimeUnit.SECONDS,
+              success -> success,
+              exception -> {
+                String errorLog =
+                    String.format(
+                        "%s/%s_err.log", WebUtils.getAppTempDir(), application.getJobId());
+                File file = new File(errorLog);
+                if (file.exists()) {
+                  return errorLog;
+                } else {
+                  throw new ApiDetailException("get k8s job log failed: " + exception.getMessage());
+                }
+              })

Review Comment:
   ```suggestion
   
         CompletableFuture future = CompletableFuture.supplyAsync(
                     () ->
                         KubernetesDeploymentHelper.watchDeploymentLog(
                             application.getK8sNamespace(),
                             application.getJobName(),
                             application.getJobId()));
         CompletableFutureUtils.supplyTimeout(future,
                 5,
                 TimeUnit.SECONDS,
                 success -> success,
                 exception -> {
                   String errorLog =
                       String.format(
                           "%s/%s_err.log", WebUtils.getAppTempDir(), application.getJobId());
                   File file = new File(errorLog);
                   if (file.exists()) {
                     return errorLog;
                   } else {
                     throw new ApiDetailException("get k8s job log failed: " + exception.getMessage());
                   }
                   if (!future.isDone()) {
                     future.cancel(true)
                   }
                 })
   ```
   
   The future need to be cancel after timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#issuecomment-1374836048

   cc @MonsterChenzhuo @1996fanrui  PTAL in your free time, thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065397250


##########
streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala:
##########
@@ -56,7 +56,12 @@ object CompletableFutureUtils {
       unit: TimeUnit,
       handle: JavaFunc[T, T],
       exceptionally: JavaFunc[Throwable, T]): CompletableFuture[T] = {
-    future.applyToEither(setTimeout(timeout, unit), handle).exceptionally(exceptionally)
+    future
+      .applyToEither(setTimeout(timeout, unit), handle)
+      .exceptionally(exceptionally)
+      .whenComplete(new BiConsumer[T, Throwable]() {
+        override def accept(t: T, u: Throwable): Unit = future.cancel(true)

Review Comment:
   I understand the code, I just mean we should add a unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065401951


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,77 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String resp;
+    String exResult = "exception";
+    try {
+      resp = exceptionally();
+    } catch (Exception e) {
+      resp = exResult;
+    }
+    Assertions.assertEquals(resp, exResult);
+  }
+
+  private String exceptionally() throws Exception {
+    return CompletableFutureUtils.supplyTimeout(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    Thread.sleep(5000);
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return "success";
+                }),
+            2,
+            TimeUnit.SECONDS,
+            success -> success,
+            e -> {
+              throw new RuntimeException("exception");
+            })
+        .thenApply(r -> r)
+        .get();
+  }

Review Comment:
   You can take a look how to use the  `assertThatThrownBy`,  `assertThatThrownBy` will catch the exception and it can check exception type and exception message. Apache flink uses it as well.
   
   This is flink community doc about junit5:
   
   https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit#heading=h.we2n5aifcjfq
   
   <img width="765" alt="image" src="https://user-images.githubusercontent.com/38427477/211485257-74f68a0a-82bd-41a2-9c23-1ef8e1ce4716.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1066860830


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -15,16 +15,23 @@
  * limitations under the License.
  */
 
-package java.util.concurrent;
+package org.apache.org.apache.streampark.common.util;

Review Comment:
   > The `org.apache` is repeated
   
   resolved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#issuecomment-1378571638

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys merged pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
wolfboys merged PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2243: [Improve] read flink on k8s deploy log improvement

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2243:
URL: https://github.com/apache/incubator-streampark/pull/2243#discussion_r1065361014


##########
streampark-console/streampark-console-service/src/test/java/org/apache/org/apache/streampark/common/util/CompletableFutureUtilsTest.java:
##########
@@ -143,4 +147,77 @@ private String runStart(int sec) {
     }
     return "start successful";
   }
+
+  @Test
+  public void thenSupplyNormally() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                CompletableFuture.supplyAsync(() -> successResult),
+                3,
+                TimeUnit.SECONDS,
+                success -> success,
+                e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+
+    Assertions.assertEquals(resp, successResult);
+  }
+
+  @Test
+  public void thenSupplyTimeout() throws Exception {
+    String successResult = "success";
+    String exceptionResult = "error";
+    CompletableFuture<String> future =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return successResult;
+            });
+    String resp =
+        CompletableFutureUtils.supplyTimeout(
+                future, 1, TimeUnit.SECONDS, success -> success, e -> exceptionResult)
+            .thenApply(r -> r)
+            .get();
+    Assertions.assertEquals(resp, exceptionResult);
+  }
+
+  @Test
+  public void thenSupplyException() {
+    String resp;
+    String exResult = "exception";
+    try {
+      resp = exceptionally();
+    } catch (Exception e) {
+      resp = exResult;
+    }
+    Assertions.assertEquals(resp, exResult);
+  }
+
+  private String exceptionally() throws Exception {
+    return CompletableFutureUtils.supplyTimeout(
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    Thread.sleep(5000);
+                  } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return "success";
+                }),
+            2,
+            TimeUnit.SECONDS,
+            success -> success,
+            e -> {
+              throw new RuntimeException("exception");
+            })
+        .thenApply(r -> r)
+        .get();
+  }

Review Comment:
   ```suggestion
   
     @Test
     public void thenSupplyException() {
       String expectedExceptionMessage = "Exception in exceptionally handler";
       CompletableFuture<String> future =
           CompletableFutureUtils.supplyTimeout(
               CompletableFuture.supplyAsync(
                   () -> {
                     try {
                       Thread.sleep(5000);
                     } catch (InterruptedException e) {
                       throw new RuntimeException(e);
                     }
                     return "success";
                   }),
               2,
               TimeUnit.SECONDS,
               success -> success,
               e -> {
                 throw new RuntimeException(expectedExceptionMessage);
               });
       assertThatThrownBy(future::get)
           .as(expectedExceptionMessage)
           .isInstanceOf(RuntimeException.class);
     }
   ```
   
   Using the `assertThatThrownBy` instead of try catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org