You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/05/07 07:53:32 UTC
[flink] 02/02: [FLINK-22406][coordination][tests] Stabilize
ReactiveModeITCase
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 271ac1b8bac100b613c313e11773109db15015e5
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Apr 23 09:55:25 2021 +0200
[FLINK-22406][coordination][tests] Stabilize ReactiveModeITCase
---
.../flink/test/scheduling/ReactiveModeITCase.java | 144 +++++++--------------
1 file changed, 49 insertions(+), 95 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
index 438439d..9859a87 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
@@ -18,18 +18,22 @@
package org.apache.flink.test.scheduling;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
@@ -47,7 +51,7 @@ public class ReactiveModeITCase extends TestLogger {
private static final Configuration configuration = getReactiveModeConfiguration();
@Rule
- public final MiniClusterResource miniClusterResource =
+ public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(configuration)
@@ -76,49 +80,60 @@ public class ReactiveModeITCase extends TestLogger {
env.addSource(new FailOnParallelExecutionSource()).setMaxParallelism(1);
input.addSink(new DiscardingSink<>());
- env.executeAsync();
+ final JobClient jobClient = env.executeAsync();
- FailOnParallelExecutionSource.waitForScaleUpToParallelism(1);
+ waitUntilParallelismForVertexReached(
+ miniClusterResource.getRestClusterClient(), jobClient.getJobID(), 1);
}
/** Test that a job scales up when a TaskManager gets added to the cluster. */
@Test
public void testScaleUpOnAdditionalTaskManager() throws Exception {
- ParallelismTrackingSource.resetParallelismTracker();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- final DataStream<String> input = env.addSource(new ParallelismTrackingSource());
+ final DataStream<String> input = env.addSource(new DummySource());
input.addSink(new DiscardingSink<>());
- env.executeAsync();
+ final JobClient jobClient = env.executeAsync();
- ParallelismTrackingSource.waitForScaleUpToParallelism(
+ waitUntilParallelismForVertexReached(
+ miniClusterResource.getRestClusterClient(),
+ jobClient.getJobID(),
NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
// scale up to 2 TaskManagers:
miniClusterResource.getMiniCluster().startTaskManager();
- ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+
+ waitUntilParallelismForVertexReached(
+ miniClusterResource.getRestClusterClient(),
+ jobClient.getJobID(),
+ NUMBER_SLOTS_PER_TASK_MANAGER * (INITIAL_NUMBER_TASK_MANAGERS + 1));
}
@Test
public void testScaleDownOnTaskManagerLoss() throws Exception {
- ParallelismTrackingSource.resetParallelismTracker();
// test preparation: ensure we have 2 TaskManagers running
startAdditionalTaskManager();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure exactly one restart to avoid restart loops in error cases
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
- final DataStream<String> input = env.addSource(new ParallelismTrackingSource());
+ final DataStream<String> input = env.addSource(new DummySource());
input.addSink(new DiscardingSink<>());
- env.executeAsync();
+ final JobClient jobClient = env.executeAsync();
- ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+ waitUntilParallelismForVertexReached(
+ miniClusterResource.getRestClusterClient(),
+ jobClient.getJobID(),
+ NUMBER_SLOTS_PER_TASK_MANAGER * (INITIAL_NUMBER_TASK_MANAGERS + 1));
// scale down to 1 TaskManagers:
miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
- ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER);
+ waitUntilParallelismForVertexReached(
+ miniClusterResource.getRestClusterClient(),
+ jobClient.getJobID(),
+ NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_SLOTS_PER_TASK_MANAGER);
}
private int getNumberOfConnectedTaskManagers() throws ExecutionException, InterruptedException {
@@ -136,36 +151,11 @@ public class ReactiveModeITCase extends TestLogger {
Deadline.fromNow(Duration.ofMillis(10_000L)));
}
- /**
- * This source is tracking its parallelism internally. We can not use a CountDownLatch with a
- * predefined parallelism. When scheduling this source on more than one TaskManager in Reactive
- * Mode, it can happen that the source gets scheduled once the first TaskManager registers. In
- * this execution, the source would count down the latch by one already, but Reactive Mode would
- * trigger a restart once the next TaskManager arrives, ultimately breaking the count of the
- * latch.
- *
- * <p>This approach is a compromise that just tracks the number of running instances and allows
- * the test to wait for a parallelism to be reached. To avoid accidentally reaching the scale
- * while deallocating source instances, the {@link InstanceParallelismTracker} is only notifying
- * the wait method when new instances are added, not when they are removed.
- */
- private static class ParallelismTrackingSource extends RichParallelSourceFunction<String> {
+ private static class DummySource implements SourceFunction<String> {
private volatile boolean running = true;
- private static final InstanceParallelismTracker tracker = new InstanceParallelismTracker();
-
- public static void waitForScaleUpToParallelism(int parallelism)
- throws InterruptedException {
- tracker.waitForScaleUpToParallelism(parallelism);
- }
-
- public static void resetParallelismTracker() {
- tracker.reset();
- }
-
@Override
public void run(SourceContext<String> ctx) throws Exception {
- tracker.reportNewInstance();
while (running) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect("test");
@@ -178,61 +168,11 @@ public class ReactiveModeITCase extends TestLogger {
public void cancel() {
running = false;
}
-
- @Override
- public void close() throws Exception {
- tracker.reportStoppedInstance();
- }
- }
-
- private static class InstanceParallelismTracker {
- // only notify this lock on scale-up
- private final Object lock = new Object();
-
- private int instances = 0;
-
- public void reportStoppedInstance() {
- synchronized (lock) {
- instances--;
- }
- }
-
- public void reportNewInstance() {
- synchronized (lock) {
- instances++;
- lock.notifyAll();
- }
- }
-
- public void waitForScaleUpToParallelism(int parallelism) throws InterruptedException {
- synchronized (lock) {
- while (instances != parallelism) {
- lock.wait();
- }
- }
- }
-
- public void reset() {
- synchronized (lock) {
- instances = 0;
- }
- }
}
private static class FailOnParallelExecutionSource extends RichParallelSourceFunction<String> {
private volatile boolean running = true;
- private static final InstanceParallelismTracker tracker = new InstanceParallelismTracker();
-
- public static void waitForScaleUpToParallelism(int parallelism)
- throws InterruptedException {
- tracker.waitForScaleUpToParallelism(parallelism);
- }
-
- public static void resetParallelismTracker() {
- tracker.reset();
- }
-
@Override
public void open(Configuration parameters) throws Exception {
if (getRuntimeContext().getNumberOfParallelSubtasks() > 1) {
@@ -243,7 +183,6 @@ public class ReactiveModeITCase extends TestLogger {
@Override
public void run(SourceContext<String> ctx) throws Exception {
- tracker.reportNewInstance();
while (running) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect("test");
@@ -256,10 +195,25 @@ public class ReactiveModeITCase extends TestLogger {
public void cancel() {
running = false;
}
+ }
- @Override
- public void close() throws Exception {
- tracker.reportStoppedInstance();
- }
+ public static void waitUntilParallelismForVertexReached(
+ RestClusterClient<?> restClusterClient, JobID jobId, int targetParallelism)
+ throws Exception {
+
+ CommonTestUtils.waitUntilCondition(
+ () -> {
+ JobDetailsInfo detailsInfo = restClusterClient.getJobDetails(jobId).get();
+
+ for (JobDetailsInfo.JobVertexDetailsInfo jobVertexInfo :
+ detailsInfo.getJobVertexInfos()) {
+ if (jobVertexInfo.getName().contains("Source:")
+ && jobVertexInfo.getParallelism() == targetParallelism) {
+ return true;
+ }
+ }
+ return false;
+ },
+ Deadline.fromNow(Duration.ofSeconds(10)));
}
}