You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2021/12/22 10:38:42 UTC
[flink] branch release-1.14 updated: [FLINK-25271][clients] Harden the ApplicationDispatcherBootstrapITCase.
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 68148d9 [FLINK-25271][clients] Harden the ApplicationDispatcherBootstrapITCase.
68148d9 is described below
commit 68148d99b528bfb5f8d641ee34ad9e083653803d
Author: David Moravek <dm...@apache.org>
AuthorDate: Fri Dec 17 10:49:57 2021 +0100
[FLINK-25271][clients] Harden the ApplicationDispatcherBootstrapITCase.
---
.../ApplicationDispatcherBootstrapITCase.java | 35 ++++++++++++++++++----
.../apache/flink/client/testjar/BlockingJob.java | 20 ++++++++++---
2 files changed, 45 insertions(+), 10 deletions(-)
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
index 6ceb95d..98700c5 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
@@ -21,18 +21,21 @@ package org.apache.flink.client.deployment.application;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.testjar.BlockingJob;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
@@ -45,9 +48,11 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import java.time.Duration;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
@@ -81,6 +86,7 @@ public class ApplicationDispatcherBootstrapITCase extends TestLogger {
final Configuration configuration = new Configuration();
configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
+ configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, Duration.ofMillis(100));
final TestingMiniClusterConfiguration clusterConfiguration =
TestingMiniClusterConfiguration.newBuilder()
.setConfiguration(configuration)
@@ -106,8 +112,15 @@ public class ApplicationDispatcherBootstrapITCase extends TestLogger {
JobStatus.RUNNING,
deadline);
- // revoke & re-grant dispatcher leadership
+ // make sure the operator is actually running
+ BlockingJob.awaitRunning(blockId);
+
+ final CompletableFuture<JobResult> firstJobResult =
+ cluster.requestJobResult(ApplicationDispatcherBootstrap.ZERO_JOB_ID);
haServices.revokeDispatcherLeadership();
+ // make sure the leadership is revoked to avoid race conditions
+ Assertions.assertEquals(
+ ApplicationStatus.UNKNOWN, firstJobResult.get().getApplicationStatus());
haServices.grantDispatcherLeadership();
// job is suspended, wait until it's running
@@ -121,14 +134,24 @@ public class ApplicationDispatcherBootstrapITCase extends TestLogger {
BlockingJob.unblock(blockId);
// and wait for it to actually finish
- awaitJobStatus(
- cluster,
- ApplicationDispatcherBootstrap.ZERO_JOB_ID,
- JobStatus.FINISHED,
- deadline);
+ final CompletableFuture<JobResult> secondJobResult =
+ cluster.requestJobResult(ApplicationDispatcherBootstrap.ZERO_JOB_ID);
+ Assertions.assertTrue(secondJobResult.get().isSuccess());
+ Assertions.assertEquals(
+ ApplicationStatus.SUCCEEDED, secondJobResult.get().getApplicationStatus());
+
+ // the cluster should shut down automatically once the application completes
+ awaitClusterStopped(cluster, deadline);
+ } finally {
+ BlockingJob.cleanUp(blockId);
}
}
+ private static void awaitClusterStopped(MiniCluster cluster, Deadline deadline)
+ throws Exception {
+ CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), deadline);
+ }
+
private static void awaitJobStatus(
MiniCluster cluster, JobID jobId, JobStatus status, Deadline deadline)
throws Exception {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/BlockingJob.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/BlockingJob.java
index 3291674..69fcfb5 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/BlockingJob.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/BlockingJob.java
@@ -35,11 +35,12 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
/**
- * A testing job whith blocks processing until it gets manually {@link #unblock(String) unblocked}.
+ * A testing job which blocks processing until it gets manually {@link #unblock(String) unblocked}.
*/
public class BlockingJob {
- private static final ConcurrentMap<String, CountDownLatch> BLOCKING = new ConcurrentHashMap<>();
+ private static final ConcurrentMap<String, CountDownLatch> RUNNING = new ConcurrentHashMap<>();
+ private static final ConcurrentMap<String, CountDownLatch> BLOCKED = new ConcurrentHashMap<>();
public static PackagedProgram getProgram(String blockId) throws FlinkException {
try {
@@ -57,8 +58,17 @@ public class BlockingJob {
}
}
+ public static void cleanUp(String blockId) {
+ RUNNING.remove(blockId);
+ BLOCKED.remove(blockId);
+ }
+
+ public static void awaitRunning(String blockId) throws InterruptedException {
+ RUNNING.computeIfAbsent(blockId, ignored -> new CountDownLatch(1)).await();
+ }
+
public static void unblock(String blockId) {
- BLOCKING.computeIfAbsent(blockId, ignored -> new CountDownLatch(1)).countDown();
+ BLOCKED.computeIfAbsent(blockId, ignored -> new CountDownLatch(1)).countDown();
}
public static void main(String[] args) throws Exception {
@@ -68,7 +78,9 @@ public class BlockingJob {
.map(element -> element + 1)
.map(
element -> {
- BLOCKING.computeIfAbsent(blockId, ignored -> new CountDownLatch(1))
+ RUNNING.computeIfAbsent(blockId, ignored -> new CountDownLatch(1))
+ .countDown();
+ BLOCKED.computeIfAbsent(blockId, ignored -> new CountDownLatch(1))
.await();
return element;
})