You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/05/07 07:53:32 UTC

[flink] 02/02: [FLINK-22406][coordination][tests] Stabilize ReactiveModeITCase

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 271ac1b8bac100b613c313e11773109db15015e5
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Apr 23 09:55:25 2021 +0200

    [FLINK-22406][coordination][tests] Stabilize ReactiveModeITCase
---
 .../flink/test/scheduling/ReactiveModeITCase.java  | 144 +++++++--------------
 1 file changed, 49 insertions(+), 95 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
index 438439d..9859a87 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
@@ -18,18 +18,22 @@
 
 package org.apache.flink.test.scheduling;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
@@ -47,7 +51,7 @@ public class ReactiveModeITCase extends TestLogger {
     private static final Configuration configuration = getReactiveModeConfiguration();
 
     @Rule
-    public final MiniClusterResource miniClusterResource =
+    public final MiniClusterWithClientResource miniClusterResource =
             new MiniClusterWithClientResource(
                     new MiniClusterResourceConfiguration.Builder()
                             .setConfiguration(configuration)
@@ -76,49 +80,60 @@ public class ReactiveModeITCase extends TestLogger {
                 env.addSource(new FailOnParallelExecutionSource()).setMaxParallelism(1);
         input.addSink(new DiscardingSink<>());
 
-        env.executeAsync();
+        final JobClient jobClient = env.executeAsync();
 
-        FailOnParallelExecutionSource.waitForScaleUpToParallelism(1);
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(), jobClient.getJobID(), 1);
     }
 
     /** Test that a job scales up when a TaskManager gets added to the cluster. */
     @Test
     public void testScaleUpOnAdditionalTaskManager() throws Exception {
-        ParallelismTrackingSource.resetParallelismTracker();
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        final DataStream<String> input = env.addSource(new ParallelismTrackingSource());
+        final DataStream<String> input = env.addSource(new DummySource());
         input.addSink(new DiscardingSink<>());
 
-        env.executeAsync();
+        final JobClient jobClient = env.executeAsync();
 
-        ParallelismTrackingSource.waitForScaleUpToParallelism(
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(),
+                jobClient.getJobID(),
                 NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
 
         // scale up to 2 TaskManagers:
         miniClusterResource.getMiniCluster().startTaskManager();
-        ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(),
+                jobClient.getJobID(),
+                NUMBER_SLOTS_PER_TASK_MANAGER * (INITIAL_NUMBER_TASK_MANAGERS + 1));
     }
 
     @Test
     public void testScaleDownOnTaskManagerLoss() throws Exception {
-        ParallelismTrackingSource.resetParallelismTracker();
         // test preparation: ensure we have 2 TaskManagers running
         startAdditionalTaskManager();
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         // configure exactly one restart to avoid restart loops in error cases
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
-        final DataStream<String> input = env.addSource(new ParallelismTrackingSource());
+        final DataStream<String> input = env.addSource(new DummySource());
         input.addSink(new DiscardingSink<>());
 
-        env.executeAsync();
+        final JobClient jobClient = env.executeAsync();
 
-        ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(),
+                jobClient.getJobID(),
+                NUMBER_SLOTS_PER_TASK_MANAGER * (INITIAL_NUMBER_TASK_MANAGERS + 1));
 
         // scale down to 1 TaskManagers:
         miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
 
-        ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER);
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(),
+                jobClient.getJobID(),
+                NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_SLOTS_PER_TASK_MANAGER);
     }
 
     private int getNumberOfConnectedTaskManagers() throws ExecutionException, InterruptedException {
@@ -136,36 +151,11 @@ public class ReactiveModeITCase extends TestLogger {
                 Deadline.fromNow(Duration.ofMillis(10_000L)));
     }
 
-    /**
-     * This source is tracking its parallelism internally. We can not use a CountDownLatch with a
-     * predefined parallelism. When scheduling this source on more than one TaskManager in Reactive
-     * Mode, it can happen that the source gets scheduled once the first TaskManager registers. In
-     * this execution, the source would count down the latch by one already, but Reactive Mode would
-     * trigger a restart once the next TaskManager arrives, ultimately breaking the count of the
-     * latch.
-     *
-     * <p>This approach is a compromise that just tracks the number of running instances and allows
-     * the test to wait for a parallelism to be reached. To avoid accidentally reaching the scale
-     * while deallocating source instances, the {@link InstanceParallelismTracker} is only notifying
-     * the wait method when new instances are added, not when they are removed.
-     */
-    private static class ParallelismTrackingSource extends RichParallelSourceFunction<String> {
+    private static class DummySource implements SourceFunction<String> {
         private volatile boolean running = true;
 
-        private static final InstanceParallelismTracker tracker = new InstanceParallelismTracker();
-
-        public static void waitForScaleUpToParallelism(int parallelism)
-                throws InterruptedException {
-            tracker.waitForScaleUpToParallelism(parallelism);
-        }
-
-        public static void resetParallelismTracker() {
-            tracker.reset();
-        }
-
         @Override
         public void run(SourceContext<String> ctx) throws Exception {
-            tracker.reportNewInstance();
             while (running) {
                 synchronized (ctx.getCheckpointLock()) {
                     ctx.collect("test");
@@ -178,61 +168,11 @@ public class ReactiveModeITCase extends TestLogger {
         public void cancel() {
             running = false;
         }
-
-        @Override
-        public void close() throws Exception {
-            tracker.reportStoppedInstance();
-        }
-    }
-
-    private static class InstanceParallelismTracker {
-        // only notify this lock on scale-up
-        private final Object lock = new Object();
-
-        private int instances = 0;
-
-        public void reportStoppedInstance() {
-            synchronized (lock) {
-                instances--;
-            }
-        }
-
-        public void reportNewInstance() {
-            synchronized (lock) {
-                instances++;
-                lock.notifyAll();
-            }
-        }
-
-        public void waitForScaleUpToParallelism(int parallelism) throws InterruptedException {
-            synchronized (lock) {
-                while (instances != parallelism) {
-                    lock.wait();
-                }
-            }
-        }
-
-        public void reset() {
-            synchronized (lock) {
-                instances = 0;
-            }
-        }
     }
 
     private static class FailOnParallelExecutionSource extends RichParallelSourceFunction<String> {
         private volatile boolean running = true;
 
-        private static final InstanceParallelismTracker tracker = new InstanceParallelismTracker();
-
-        public static void waitForScaleUpToParallelism(int parallelism)
-                throws InterruptedException {
-            tracker.waitForScaleUpToParallelism(parallelism);
-        }
-
-        public static void resetParallelismTracker() {
-            tracker.reset();
-        }
-
         @Override
         public void open(Configuration parameters) throws Exception {
             if (getRuntimeContext().getNumberOfParallelSubtasks() > 1) {
@@ -243,7 +183,6 @@ public class ReactiveModeITCase extends TestLogger {
 
         @Override
         public void run(SourceContext<String> ctx) throws Exception {
-            tracker.reportNewInstance();
             while (running) {
                 synchronized (ctx.getCheckpointLock()) {
                     ctx.collect("test");
@@ -256,10 +195,25 @@ public class ReactiveModeITCase extends TestLogger {
         public void cancel() {
             running = false;
         }
+    }
 
-        @Override
-        public void close() throws Exception {
-            tracker.reportStoppedInstance();
-        }
+    public static void waitUntilParallelismForVertexReached(
+            RestClusterClient<?> restClusterClient, JobID jobId, int targetParallelism)
+            throws Exception {
+
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    JobDetailsInfo detailsInfo = restClusterClient.getJobDetails(jobId).get();
+
+                    for (JobDetailsInfo.JobVertexDetailsInfo jobVertexInfo :
+                            detailsInfo.getJobVertexInfos()) {
+                        if (jobVertexInfo.getName().contains("Source:")
+                                && jobVertexInfo.getParallelism() == targetParallelism) {
+                            return true;
+                        }
+                    }
+                    return false;
+                },
+                Deadline.fromNow(Duration.ofSeconds(10)));
     }
 }