You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/05 08:49:18 UTC

[flink] 01/04: [FLINK-24603][e2e] Simplify FlinkDistribution#submitJob

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e79710849706fd275a04beccfe24d4d61e1c38f9
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Oct 28 12:18:45 2021 +0200

    [FLINK-24603][e2e] Simplify FlinkDistribution#submitJob
    
    We wait either way for the process to complete (be it for the submission when detached, or the job completion when attached to parse the JobID), so we can just run the submission process in a blocking fashion
---
 .../flink/tests/util/flink/FlinkDistribution.java  | 24 +++++++---------------
 1 file changed, 7 insertions(+), 17 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 0632f62..4f779d5 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -188,24 +188,14 @@ final class FlinkDistribution {
                     }
                 };
 
-        try (AutoClosableProcess flink =
-                AutoClosableProcess.create(commands.toArray(new String[0]))
-                        .setStdoutProcessor(stdoutProcessor)
-                        .runNonBlocking()) {
-            if (jobSubmission.isDetached()) {
-                try {
-                    flink.getProcess().waitFor();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
-            }
+        AutoClosableProcess.create(commands.toArray(new String[0]))
+                .setStdoutProcessor(stdoutProcessor)
+                .runBlocking();
 
-            try {
-                return JobID.fromHexString(
-                        rawJobIdFuture.get(timeout.getSeconds(), TimeUnit.SECONDS));
-            } catch (Exception e) {
-                throw new IOException("Could not determine Job ID.", e);
-            }
+        try {
+            return JobID.fromHexString(rawJobIdFuture.get(timeout.getSeconds(), TimeUnit.SECONDS));
+        } catch (Exception e) {
+            throw new IOException("Could not determine Job ID.", e);
         }
     }