You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/05/07 07:53:30 UTC

[flink] branch master updated (323c662 -> 271ac1b)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 323c662  [FLINK-21095][ci] Remove legacy slot management profile
     new a1c1846  [FLINK-22406][tests] Add RestClusterClient to MiniClusterWithClientResource
     new 271ac1b  [FLINK-22406][coordination][tests] Stabilize ReactiveModeITCase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../test/util/MiniClusterWithClientResource.java   |  29 +++++
 .../flink/test/scheduling/ReactiveModeITCase.java  | 144 +++++++--------------
 2 files changed, 78 insertions(+), 95 deletions(-)

[flink] 01/02: [FLINK-22406][tests] Add RestClusterClient to MiniClusterWithClientResource

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a1c184631a6b4d90bddaa352b1bc4a0ac11eb50d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 5 11:59:55 2021 +0200

    [FLINK-22406][tests] Add RestClusterClient to MiniClusterWithClientResource
---
 .../test/util/MiniClusterWithClientResource.java   | 29 ++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
index 62e0318..07ab415 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
@@ -20,9 +20,11 @@ package org.apache.flink.test.util;
 
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.ExceptionUtils;
 
 /**
  * Starts a Flink mini cluster as a resource and registers the respective ExecutionEnvironment and
@@ -31,6 +33,7 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 public class MiniClusterWithClientResource extends MiniClusterResource {
 
     private ClusterClient<?> clusterClient;
+    private RestClusterClient<MiniClusterClient.MiniClusterId> restClusterClient;
 
     private TestEnvironment executionEnvironment;
 
@@ -43,6 +46,15 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
         return clusterClient;
     }
 
+    /**
+     * Returns a {@link RestClusterClient} that can be used to communicate with this mini cluster.
+     * Only use this if the client returned via {@link #getClusterClient()} does not fulfill your
+     * needs.
+     */
+    public RestClusterClient<?> getRestClusterClient() throws Exception {
+        return restClusterClient;
+    }
+
     public TestEnvironment getTestEnvironment() {
         return executionEnvironment;
     }
@@ -52,6 +64,7 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
         super.before();
 
         clusterClient = createMiniClusterClient();
+        restClusterClient = createRestClusterClient();
 
         executionEnvironment = new TestEnvironment(getMiniCluster(), getNumberSlots(), false);
         executionEnvironment.setAsContext();
@@ -76,6 +89,16 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
 
         clusterClient = null;
 
+        if (restClusterClient != null) {
+            try {
+                restClusterClient.close();
+            } catch (Exception e) {
+                exception = ExceptionUtils.firstOrSuppressed(e, exception);
+            }
+        }
+
+        restClusterClient = null;
+
         super.after();
 
         if (exception != null) {
@@ -86,4 +109,10 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
     private MiniClusterClient createMiniClusterClient() {
         return new MiniClusterClient(getClientConfiguration(), getMiniCluster());
     }
+
+    private RestClusterClient<MiniClusterClient.MiniClusterId> createRestClusterClient()
+            throws Exception {
+        return new RestClusterClient<>(
+                getClientConfiguration(), MiniClusterClient.MiniClusterId.INSTANCE);
+    }
 }

[flink] 02/02: [FLINK-22406][coordination][tests] Stabilize ReactiveModeITCase

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 271ac1b8bac100b613c313e11773109db15015e5
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Apr 23 09:55:25 2021 +0200

    [FLINK-22406][coordination][tests] Stabilize ReactiveModeITCase
---
 .../flink/test/scheduling/ReactiveModeITCase.java  | 144 +++++++--------------
 1 file changed, 49 insertions(+), 95 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
index 438439d..9859a87 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
@@ -18,18 +18,22 @@
 
 package org.apache.flink.test.scheduling;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
@@ -47,7 +51,7 @@ public class ReactiveModeITCase extends TestLogger {
     private static final Configuration configuration = getReactiveModeConfiguration();
 
     @Rule
-    public final MiniClusterResource miniClusterResource =
+    public final MiniClusterWithClientResource miniClusterResource =
             new MiniClusterWithClientResource(
                     new MiniClusterResourceConfiguration.Builder()
                             .setConfiguration(configuration)
@@ -76,49 +80,60 @@ public class ReactiveModeITCase extends TestLogger {
                 env.addSource(new FailOnParallelExecutionSource()).setMaxParallelism(1);
         input.addSink(new DiscardingSink<>());
 
-        env.executeAsync();
+        final JobClient jobClient = env.executeAsync();
 
-        FailOnParallelExecutionSource.waitForScaleUpToParallelism(1);
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(), jobClient.getJobID(), 1);
     }
 
     /** Test that a job scales up when a TaskManager gets added to the cluster. */
     @Test
     public void testScaleUpOnAdditionalTaskManager() throws Exception {
-        ParallelismTrackingSource.resetParallelismTracker();
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        final DataStream<String> input = env.addSource(new ParallelismTrackingSource());
+        final DataStream<String> input = env.addSource(new DummySource());
         input.addSink(new DiscardingSink<>());
 
-        env.executeAsync();
+        final JobClient jobClient = env.executeAsync();
 
-        ParallelismTrackingSource.waitForScaleUpToParallelism(
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(),
+                jobClient.getJobID(),
                 NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
 
         // scale up to 2 TaskManagers:
         miniClusterResource.getMiniCluster().startTaskManager();
-        ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(),
+                jobClient.getJobID(),
+                NUMBER_SLOTS_PER_TASK_MANAGER * (INITIAL_NUMBER_TASK_MANAGERS + 1));
     }
 
     @Test
     public void testScaleDownOnTaskManagerLoss() throws Exception {
-        ParallelismTrackingSource.resetParallelismTracker();
         // test preparation: ensure we have 2 TaskManagers running
         startAdditionalTaskManager();
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         // configure exactly one restart to avoid restart loops in error cases
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
-        final DataStream<String> input = env.addSource(new ParallelismTrackingSource());
+        final DataStream<String> input = env.addSource(new DummySource());
         input.addSink(new DiscardingSink<>());
 
-        env.executeAsync();
+        final JobClient jobClient = env.executeAsync();
 
-        ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(),
+                jobClient.getJobID(),
+                NUMBER_SLOTS_PER_TASK_MANAGER * (INITIAL_NUMBER_TASK_MANAGERS + 1));
 
         // scale down to 1 TaskManagers:
         miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
 
-        ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER);
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(),
+                jobClient.getJobID(),
+                NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_SLOTS_PER_TASK_MANAGER);
     }
 
     private int getNumberOfConnectedTaskManagers() throws ExecutionException, InterruptedException {
@@ -136,36 +151,11 @@ public class ReactiveModeITCase extends TestLogger {
                 Deadline.fromNow(Duration.ofMillis(10_000L)));
     }
 
-    /**
-     * This source is tracking its parallelism internally. We can not use a CountDownLatch with a
-     * predefined parallelism. When scheduling this source on more than one TaskManager in Reactive
-     * Mode, it can happen that the source gets scheduled once the first TaskManager registers. In
-     * this execution, the source would count down the latch by one already, but Reactive Mode would
-     * trigger a restart once the next TaskManager arrives, ultimately breaking the count of the
-     * latch.
-     *
-     * <p>This approach is a compromise that just tracks the number of running instances and allows
-     * the test to wait for a parallelism to be reached. To avoid accidentally reaching the scale
-     * while deallocating source instances, the {@link InstanceParallelismTracker} is only notifying
-     * the wait method when new instances are added, not when they are removed.
-     */
-    private static class ParallelismTrackingSource extends RichParallelSourceFunction<String> {
+    private static class DummySource implements SourceFunction<String> {
         private volatile boolean running = true;
 
-        private static final InstanceParallelismTracker tracker = new InstanceParallelismTracker();
-
-        public static void waitForScaleUpToParallelism(int parallelism)
-                throws InterruptedException {
-            tracker.waitForScaleUpToParallelism(parallelism);
-        }
-
-        public static void resetParallelismTracker() {
-            tracker.reset();
-        }
-
         @Override
         public void run(SourceContext<String> ctx) throws Exception {
-            tracker.reportNewInstance();
             while (running) {
                 synchronized (ctx.getCheckpointLock()) {
                     ctx.collect("test");
@@ -178,61 +168,11 @@ public class ReactiveModeITCase extends TestLogger {
         public void cancel() {
             running = false;
         }
-
-        @Override
-        public void close() throws Exception {
-            tracker.reportStoppedInstance();
-        }
-    }
-
-    private static class InstanceParallelismTracker {
-        // only notify this lock on scale-up
-        private final Object lock = new Object();
-
-        private int instances = 0;
-
-        public void reportStoppedInstance() {
-            synchronized (lock) {
-                instances--;
-            }
-        }
-
-        public void reportNewInstance() {
-            synchronized (lock) {
-                instances++;
-                lock.notifyAll();
-            }
-        }
-
-        public void waitForScaleUpToParallelism(int parallelism) throws InterruptedException {
-            synchronized (lock) {
-                while (instances != parallelism) {
-                    lock.wait();
-                }
-            }
-        }
-
-        public void reset() {
-            synchronized (lock) {
-                instances = 0;
-            }
-        }
     }
 
     private static class FailOnParallelExecutionSource extends RichParallelSourceFunction<String> {
         private volatile boolean running = true;
 
-        private static final InstanceParallelismTracker tracker = new InstanceParallelismTracker();
-
-        public static void waitForScaleUpToParallelism(int parallelism)
-                throws InterruptedException {
-            tracker.waitForScaleUpToParallelism(parallelism);
-        }
-
-        public static void resetParallelismTracker() {
-            tracker.reset();
-        }
-
         @Override
         public void open(Configuration parameters) throws Exception {
             if (getRuntimeContext().getNumberOfParallelSubtasks() > 1) {
@@ -243,7 +183,6 @@ public class ReactiveModeITCase extends TestLogger {
 
         @Override
         public void run(SourceContext<String> ctx) throws Exception {
-            tracker.reportNewInstance();
             while (running) {
                 synchronized (ctx.getCheckpointLock()) {
                     ctx.collect("test");
@@ -256,10 +195,25 @@ public class ReactiveModeITCase extends TestLogger {
         public void cancel() {
             running = false;
         }
+    }
 
-        @Override
-        public void close() throws Exception {
-            tracker.reportStoppedInstance();
-        }
+    public static void waitUntilParallelismForVertexReached(
+            RestClusterClient<?> restClusterClient, JobID jobId, int targetParallelism)
+            throws Exception {
+
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    JobDetailsInfo detailsInfo = restClusterClient.getJobDetails(jobId).get();
+
+                    for (JobDetailsInfo.JobVertexDetailsInfo jobVertexInfo :
+                            detailsInfo.getJobVertexInfos()) {
+                        if (jobVertexInfo.getName().contains("Source:")
+                                && jobVertexInfo.getParallelism() == targetParallelism) {
+                            return true;
+                        }
+                    }
+                    return false;
+                },
+                Deadline.fromNow(Duration.ofSeconds(10)));
     }
 }