You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2021/12/22 10:38:42 UTC

[flink] branch release-1.14 updated: [FLINK-25271][clients] Harden the ApplicationDispatcherBootstrapITCase.

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 68148d9  [FLINK-25271][clients] Harden the ApplicationDispatcherBootstrapITCase.
68148d9 is described below

commit 68148d99b528bfb5f8d641ee34ad9e083653803d
Author: David Moravek <dm...@apache.org>
AuthorDate: Fri Dec 17 10:49:57 2021 +0100

    [FLINK-25271][clients] Harden the ApplicationDispatcherBootstrapITCase.
---
 .../ApplicationDispatcherBootstrapITCase.java      | 35 ++++++++++++++++++----
 .../apache/flink/client/testjar/BlockingJob.java   | 20 ++++++++++---
 2 files changed, 45 insertions(+), 10 deletions(-)

diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
index 6ceb95d..98700c5 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
@@ -21,18 +21,21 @@ package org.apache.flink.client.deployment.application;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.cli.ClientOptions;
 import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.testjar.BlockingJob;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
 import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
 import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.TestingMiniCluster;
@@ -45,9 +48,11 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 
 import java.time.Duration;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 
@@ -81,6 +86,7 @@ public class ApplicationDispatcherBootstrapITCase extends TestLogger {
         final Configuration configuration = new Configuration();
         configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
         configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
+        configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, Duration.ofMillis(100));
         final TestingMiniClusterConfiguration clusterConfiguration =
                 TestingMiniClusterConfiguration.newBuilder()
                         .setConfiguration(configuration)
@@ -106,8 +112,15 @@ public class ApplicationDispatcherBootstrapITCase extends TestLogger {
                     JobStatus.RUNNING,
                     deadline);
 
-            // revoke & re-grant dispatcher leadership
+            // make sure the operator is actually running
+            BlockingJob.awaitRunning(blockId);
+
+            final CompletableFuture<JobResult> firstJobResult =
+                    cluster.requestJobResult(ApplicationDispatcherBootstrap.ZERO_JOB_ID);
             haServices.revokeDispatcherLeadership();
+            // make sure the leadership is revoked to avoid race conditions
+            Assertions.assertEquals(
+                    ApplicationStatus.UNKNOWN, firstJobResult.get().getApplicationStatus());
             haServices.grantDispatcherLeadership();
 
             // job is suspended, wait until it's running
@@ -121,14 +134,24 @@ public class ApplicationDispatcherBootstrapITCase extends TestLogger {
             BlockingJob.unblock(blockId);
 
             // and wait for it to actually finish
-            awaitJobStatus(
-                    cluster,
-                    ApplicationDispatcherBootstrap.ZERO_JOB_ID,
-                    JobStatus.FINISHED,
-                    deadline);
+            final CompletableFuture<JobResult> secondJobResult =
+                    cluster.requestJobResult(ApplicationDispatcherBootstrap.ZERO_JOB_ID);
+            Assertions.assertTrue(secondJobResult.get().isSuccess());
+            Assertions.assertEquals(
+                    ApplicationStatus.SUCCEEDED, secondJobResult.get().getApplicationStatus());
+
+            // the cluster should shut down automatically once the application completes
+            awaitClusterStopped(cluster, deadline);
+        } finally {
+            BlockingJob.cleanUp(blockId);
         }
     }
 
+    private static void awaitClusterStopped(MiniCluster cluster, Deadline deadline)
+            throws Exception {
+        CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), deadline);
+    }
+
     private static void awaitJobStatus(
             MiniCluster cluster, JobID jobId, JobStatus status, Deadline deadline)
             throws Exception {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/BlockingJob.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/BlockingJob.java
index 3291674..69fcfb5 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/BlockingJob.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/BlockingJob.java
@@ -35,11 +35,12 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 
 /**
- * A testing job whith blocks processing until it gets manually {@link #unblock(String) unblocked}.
+ * A testing job which blocks processing until it gets manually {@link #unblock(String) unblocked}.
  */
 public class BlockingJob {
 
-    private static final ConcurrentMap<String, CountDownLatch> BLOCKING = new ConcurrentHashMap<>();
+    private static final ConcurrentMap<String, CountDownLatch> RUNNING = new ConcurrentHashMap<>();
+    private static final ConcurrentMap<String, CountDownLatch> BLOCKED = new ConcurrentHashMap<>();
 
     public static PackagedProgram getProgram(String blockId) throws FlinkException {
         try {
@@ -57,8 +58,17 @@ public class BlockingJob {
         }
     }
 
+    public static void cleanUp(String blockId) {
+        RUNNING.remove(blockId);
+        BLOCKED.remove(blockId);
+    }
+
+    public static void awaitRunning(String blockId) throws InterruptedException {
+        RUNNING.computeIfAbsent(blockId, ignored -> new CountDownLatch(1)).await();
+    }
+
     public static void unblock(String blockId) {
-        BLOCKING.computeIfAbsent(blockId, ignored -> new CountDownLatch(1)).countDown();
+        BLOCKED.computeIfAbsent(blockId, ignored -> new CountDownLatch(1)).countDown();
     }
 
     public static void main(String[] args) throws Exception {
@@ -68,7 +78,9 @@ public class BlockingJob {
                 .map(element -> element + 1)
                 .map(
                         element -> {
-                            BLOCKING.computeIfAbsent(blockId, ignored -> new CountDownLatch(1))
+                            RUNNING.computeIfAbsent(blockId, ignored -> new CountDownLatch(1))
+                                    .countDown();
+                            BLOCKED.computeIfAbsent(blockId, ignored -> new CountDownLatch(1))
                                     .await();
                             return element;
                         })