You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/05/07 07:58:32 UTC

[flink] branch release-1.13 updated (d5ce4c2 -> 5b7e87f)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d5ce4c2  [hotfix][docs][python] Add introduction about the open method in Python DataStream API
     new d1df147  [FLINK-22406][tests] Add RestClusterClient to MiniClusterWithClientResource
     new 5b7e87f  [FLINK-22406][coordination][tests] Stabilize ReactiveModeITCase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../test/util/MiniClusterWithClientResource.java   |  29 +++++
 .../flink/test/scheduling/ReactiveModeITCase.java  | 144 +++++++--------------
 2 files changed, 78 insertions(+), 95 deletions(-)

[flink] 01/02: [FLINK-22406][tests] Add RestClusterClient to MiniClusterWithClientResource

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d1df147b2f737c60b57c8f6d267596e11e5e06e8
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed May 5 11:59:55 2021 +0200

    [FLINK-22406][tests] Add RestClusterClient to MiniClusterWithClientResource
---
 .../test/util/MiniClusterWithClientResource.java   | 29 ++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
index 62e0318..07ab415 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java
@@ -20,9 +20,11 @@ package org.apache.flink.test.util;
 
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.ExceptionUtils;
 
 /**
  * Starts a Flink mini cluster as a resource and registers the respective ExecutionEnvironment and
@@ -31,6 +33,7 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 public class MiniClusterWithClientResource extends MiniClusterResource {
 
     private ClusterClient<?> clusterClient;
+    private RestClusterClient<MiniClusterClient.MiniClusterId> restClusterClient;
 
     private TestEnvironment executionEnvironment;
 
@@ -43,6 +46,15 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
         return clusterClient;
     }
 
+    /**
+     * Returns a {@link RestClusterClient} that can be used to communicate with this mini cluster.
+     * Only use this if the client returned via {@link #getClusterClient()} does not fulfill your
+     * needs.
+     */
+    public RestClusterClient<?> getRestClusterClient() throws Exception {
+        return restClusterClient;
+    }
+
     public TestEnvironment getTestEnvironment() {
         return executionEnvironment;
     }
@@ -52,6 +64,7 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
         super.before();
 
         clusterClient = createMiniClusterClient();
+        restClusterClient = createRestClusterClient();
 
         executionEnvironment = new TestEnvironment(getMiniCluster(), getNumberSlots(), false);
         executionEnvironment.setAsContext();
@@ -76,6 +89,16 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
 
         clusterClient = null;
 
+        if (restClusterClient != null) {
+            try {
+                restClusterClient.close();
+            } catch (Exception e) {
+                exception = ExceptionUtils.firstOrSuppressed(e, exception);
+            }
+        }
+
+        restClusterClient = null;
+
         super.after();
 
         if (exception != null) {
@@ -86,4 +109,10 @@ public class MiniClusterWithClientResource extends MiniClusterResource {
     private MiniClusterClient createMiniClusterClient() {
         return new MiniClusterClient(getClientConfiguration(), getMiniCluster());
     }
+
+    private RestClusterClient<MiniClusterClient.MiniClusterId> createRestClusterClient()
+            throws Exception {
+        return new RestClusterClient<>(
+                getClientConfiguration(), MiniClusterClient.MiniClusterId.INSTANCE);
+    }
 }

[flink] 02/02: [FLINK-22406][coordination][tests] Stabilize ReactiveModeITCase

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b7e87f7c4c39bbc13c459be97b7d7b4102d0f45
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Apr 23 09:55:25 2021 +0200

    [FLINK-22406][coordination][tests] Stabilize ReactiveModeITCase
---
 .../flink/test/scheduling/ReactiveModeITCase.java  | 144 +++++++--------------
 1 file changed, 49 insertions(+), 95 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
index d0ca6e0..3a6c314 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/ReactiveModeITCase.java
@@ -18,19 +18,23 @@
 
 package org.apache.flink.test.scheduling;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
@@ -51,7 +55,7 @@ public class ReactiveModeITCase extends TestLogger {
     private static final Configuration configuration = getReactiveModeConfiguration();
 
     @Rule
-    public final MiniClusterResource miniClusterResource =
+    public final MiniClusterWithClientResource miniClusterResource =
             new MiniClusterWithClientResource(
                     new MiniClusterResourceConfiguration.Builder()
                             .setConfiguration(configuration)
@@ -85,49 +89,60 @@ public class ReactiveModeITCase extends TestLogger {
                 env.addSource(new FailOnParallelExecutionSource()).setMaxParallelism(1);
         input.addSink(new DiscardingSink<>());
 
-        env.executeAsync();
+        final JobClient jobClient = env.executeAsync();
 
-        FailOnParallelExecutionSource.waitForScaleUpToParallelism(1);
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(), jobClient.getJobID(), 1);
     }
 
     /** Test that a job scales up when a TaskManager gets added to the cluster. */
     @Test
     public void testScaleUpOnAdditionalTaskManager() throws Exception {
-        ParallelismTrackingSource.resetParallelismTracker();
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        final DataStream<String> input = env.addSource(new ParallelismTrackingSource());
+        final DataStream<String> input = env.addSource(new DummySource());
         input.addSink(new DiscardingSink<>());
 
-        env.executeAsync();
+        final JobClient jobClient = env.executeAsync();
 
-        ParallelismTrackingSource.waitForScaleUpToParallelism(
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(),
+                jobClient.getJobID(),
                 NUMBER_SLOTS_PER_TASK_MANAGER * INITIAL_NUMBER_TASK_MANAGERS);
 
         // scale up to 2 TaskManagers:
         miniClusterResource.getMiniCluster().startTaskManager();
-        ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(),
+                jobClient.getJobID(),
+                NUMBER_SLOTS_PER_TASK_MANAGER * (INITIAL_NUMBER_TASK_MANAGERS + 1));
     }
 
     @Test
     public void testScaleDownOnTaskManagerLoss() throws Exception {
-        ParallelismTrackingSource.resetParallelismTracker();
         // test preparation: ensure we have 2 TaskManagers running
         startAdditionalTaskManager();
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         // configure exactly one restart to avoid restart loops in error cases
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
-        final DataStream<String> input = env.addSource(new ParallelismTrackingSource());
+        final DataStream<String> input = env.addSource(new DummySource());
         input.addSink(new DiscardingSink<>());
 
-        env.executeAsync();
+        final JobClient jobClient = env.executeAsync();
 
-        ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER * 2);
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(),
+                jobClient.getJobID(),
+                NUMBER_SLOTS_PER_TASK_MANAGER * (INITIAL_NUMBER_TASK_MANAGERS + 1));
 
         // scale down to 1 TaskManagers:
         miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
 
-        ParallelismTrackingSource.waitForScaleUpToParallelism(NUMBER_SLOTS_PER_TASK_MANAGER);
+        waitUntilParallelismForVertexReached(
+                miniClusterResource.getRestClusterClient(),
+                jobClient.getJobID(),
+                NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_SLOTS_PER_TASK_MANAGER);
     }
 
     private int getNumberOfConnectedTaskManagers() throws ExecutionException, InterruptedException {
@@ -145,36 +160,11 @@ public class ReactiveModeITCase extends TestLogger {
                 Deadline.fromNow(Duration.ofMillis(10_000L)));
     }
 
-    /**
-     * This source is tracking its parallelism internally. We can not use a CountDownLatch with a
-     * predefined parallelism. When scheduling this source on more than one TaskManager in Reactive
-     * Mode, it can happen that the source gets scheduled once the first TaskManager registers. In
-     * this execution, the source would count down the latch by one already, but Reactive Mode would
-     * trigger a restart once the next TaskManager arrives, ultimately breaking the count of the
-     * latch.
-     *
-     * <p>This approach is a compromise that just tracks the number of running instances and allows
-     * the test to wait for a parallelism to be reached. To avoid accidentally reaching the scale
-     * while deallocating source instances, the {@link InstanceParallelismTracker} is only notifying
-     * the wait method when new instances are added, not when they are removed.
-     */
-    private static class ParallelismTrackingSource extends RichParallelSourceFunction<String> {
+    private static class DummySource implements SourceFunction<String> {
         private volatile boolean running = true;
 
-        private static final InstanceParallelismTracker tracker = new InstanceParallelismTracker();
-
-        public static void waitForScaleUpToParallelism(int parallelism)
-                throws InterruptedException {
-            tracker.waitForScaleUpToParallelism(parallelism);
-        }
-
-        public static void resetParallelismTracker() {
-            tracker.reset();
-        }
-
         @Override
         public void run(SourceContext<String> ctx) throws Exception {
-            tracker.reportNewInstance();
             while (running) {
                 synchronized (ctx.getCheckpointLock()) {
                     ctx.collect("test");
@@ -187,61 +177,11 @@ public class ReactiveModeITCase extends TestLogger {
         public void cancel() {
             running = false;
         }
-
-        @Override
-        public void close() throws Exception {
-            tracker.reportStoppedInstance();
-        }
-    }
-
-    private static class InstanceParallelismTracker {
-        // only notify this lock on scale-up
-        private final Object lock = new Object();
-
-        private int instances = 0;
-
-        public void reportStoppedInstance() {
-            synchronized (lock) {
-                instances--;
-            }
-        }
-
-        public void reportNewInstance() {
-            synchronized (lock) {
-                instances++;
-                lock.notifyAll();
-            }
-        }
-
-        public void waitForScaleUpToParallelism(int parallelism) throws InterruptedException {
-            synchronized (lock) {
-                while (instances != parallelism) {
-                    lock.wait();
-                }
-            }
-        }
-
-        public void reset() {
-            synchronized (lock) {
-                instances = 0;
-            }
-        }
     }
 
     private static class FailOnParallelExecutionSource extends RichParallelSourceFunction<String> {
         private volatile boolean running = true;
 
-        private static final InstanceParallelismTracker tracker = new InstanceParallelismTracker();
-
-        public static void waitForScaleUpToParallelism(int parallelism)
-                throws InterruptedException {
-            tracker.waitForScaleUpToParallelism(parallelism);
-        }
-
-        public static void resetParallelismTracker() {
-            tracker.reset();
-        }
-
         @Override
         public void open(Configuration parameters) throws Exception {
             if (getRuntimeContext().getNumberOfParallelSubtasks() > 1) {
@@ -252,7 +192,6 @@ public class ReactiveModeITCase extends TestLogger {
 
         @Override
         public void run(SourceContext<String> ctx) throws Exception {
-            tracker.reportNewInstance();
             while (running) {
                 synchronized (ctx.getCheckpointLock()) {
                     ctx.collect("test");
@@ -265,10 +204,25 @@ public class ReactiveModeITCase extends TestLogger {
         public void cancel() {
             running = false;
         }
+    }
 
-        @Override
-        public void close() throws Exception {
-            tracker.reportStoppedInstance();
-        }
+    public static void waitUntilParallelismForVertexReached(
+            RestClusterClient<?> restClusterClient, JobID jobId, int targetParallelism)
+            throws Exception {
+
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    JobDetailsInfo detailsInfo = restClusterClient.getJobDetails(jobId).get();
+
+                    for (JobDetailsInfo.JobVertexDetailsInfo jobVertexInfo :
+                            detailsInfo.getJobVertexInfos()) {
+                        if (jobVertexInfo.getName().contains("Source:")
+                                && jobVertexInfo.getParallelism() == targetParallelism) {
+                            return true;
+                        }
+                    }
+                    return false;
+                },
+                Deadline.fromNow(Duration.ofSeconds(10)));
     }
 }