You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "dmvk (via GitHub)" <gi...@apache.org> on 2023/03/02 08:51:54 UTC

[GitHub] [flink] dmvk commented on a diff in pull request #22022: [FLINK-31133][tests] Fix timeouts in PartiallyFinishedSourcesITCase

dmvk commented on code in PR #22022:
URL: https://github.com/apache/flink/pull/22022#discussion_r1122740515


##########
flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java:
##########
@@ -103,6 +110,14 @@ private void ack(TestCommand cmd) {
 
     @Override
     public void cancel() {
+        stop();
+    }
+
+    private void stop() {
+        commandQueue.unsubscribe(operatorID, commandExecutor);
         isRunning = false;
+        if (!scheduledCommands.isEmpty()) {

Review Comment:
   Should `scheduledCommands` be volatile as well? IIUC `open + run` and `stop` could be executed from different threads.



##########
flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java:
##########
@@ -164,22 +164,23 @@ private void waitForFailover(BlockingQueue<TestEvent> queue) throws Exception {
     }
 
     private void handleFailoverTimeout(TimeoutException e) throws Exception {
+        JobStatus jobStatus = miniClusterResource.getClusterClient().getJobStatus(jobID).get();
         String message =
                 String.format(
                         "Unable to failover the job: %s; job status: %s",
-                        e.getMessage(),
-                        miniClusterResource.getClusterClient().getJobStatus(jobID).get());
-        Optional<SerializedThrowable> throwable =
-                miniClusterResource
-                        .getClusterClient()
-                        .requestJobResult(jobID)
-                        .get()
-                        .getSerializedThrowable();
-        if (throwable.isPresent()) {
-            throw new RuntimeException(message, throwable.get());
-        } else {
-            throw new RuntimeException(message);
+                        e.getMessage(), jobStatus);
+        if (jobStatus.isGloballyTerminalState()) {
+            Optional<SerializedThrowable> throwable =

Review Comment:
   ```suggestion
               // Only searching for the throwable in case of a terminal state prevents timeouts when waiting for the job result.
               Optional<SerializedThrowable> maybeThrowable =
   ```



##########
flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java:
##########
@@ -164,22 +164,23 @@ private void waitForFailover(BlockingQueue<TestEvent> queue) throws Exception {
     }
 
     private void handleFailoverTimeout(TimeoutException e) throws Exception {
+        JobStatus jobStatus = miniClusterResource.getClusterClient().getJobStatus(jobID).get();
         String message =
                 String.format(
                         "Unable to failover the job: %s; job status: %s",
-                        e.getMessage(),
-                        miniClusterResource.getClusterClient().getJobStatus(jobID).get());
-        Optional<SerializedThrowable> throwable =
-                miniClusterResource
-                        .getClusterClient()
-                        .requestJobResult(jobID)
-                        .get()
-                        .getSerializedThrowable();
-        if (throwable.isPresent()) {
-            throw new RuntimeException(message, throwable.get());
-        } else {
-            throw new RuntimeException(message);
+                        e.getMessage(), jobStatus);
+        if (jobStatus.isGloballyTerminalState()) {
+            Optional<SerializedThrowable> throwable =

Review Comment:
   Would it make sense to check for `jobStatus == JobStatus.FAILED` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org