You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/14 08:17:16 UTC

[flink] 01/02: [FLINK-28928][tests] Add IT test for hybrid shuffle mode.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 641436d926c7f86eeff0cbe3a81ed56c81fb00bd
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Sep 5 16:02:59 2022 +0800

    [FLINK-28928][tests] Add IT test for hybrid shuffle mode.
---
 .../flink/test/runtime/BatchShuffleITCaseBase.java | 191 +++++++++++++++++++++
 .../flink/test/runtime/BlockingShuffleITCase.java  | 154 +++--------------
 .../flink/test/runtime/HybridShuffleITCase.java    |  72 ++++++++
 3 files changed, 286 insertions(+), 131 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
new file mode 100644
index 00000000000..168dc70ba7a
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base class for batch shuffle related IT tests. */
+class BatchShuffleITCaseBase {
+    private static final String RECORD = "batch shuffle test";
+
+    private static final int NUM_TASK_MANAGERS = 2;
+
+    private static final int NUM_SLOTS_PER_TASK_MANAGER = 10;
+
+    private static final int PARALLELISM = NUM_SLOTS_PER_TASK_MANAGER;
+
+    private static final int[] NUM_RECEIVED_RECORDS = new int[PARALLELISM];
+
+    private static Path tmpDir;
+
+    @BeforeAll
+    static void setup(@TempDir Path path) throws Exception {
+        tmpDir = TempDirUtils.newFolder(path, UUID.randomUUID().toString()).toPath();
+    }
+
+    protected JobGraph createJobGraph(int numRecordsToSend, boolean failExecution) {
+        return createJobGraph(numRecordsToSend, failExecution, false);
+    }
+
+    protected JobGraph createJobGraph(
+            int numRecordsToSend, boolean failExecution, boolean deletePartitionFile) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 0L));
+        env.setParallelism(NUM_SLOTS_PER_TASK_MANAGER);
+
+        DataStream<String> source =
+                new DataStreamSource<>(
+                        env,
+                        BasicTypeInfo.STRING_TYPE_INFO,
+                        new StreamSource<>(new StringSource(numRecordsToSend)),
+                        true,
+                        "source",
+                        Boundedness.BOUNDED);
+        source.rebalance()
+                .map(value -> value)
+                .shuffle()
+                .addSink(new VerifySink(failExecution, deletePartitionFile));
+
+        StreamGraph streamGraph = env.getStreamGraph();
+        streamGraph.setJobType(JobType.BATCH);
+        return StreamingJobGraphGenerator.createJobGraph(streamGraph);
+    }
+
+    protected Configuration getConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(CoreOptions.TMP_DIRS, tmpDir.toString());
+        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+        configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 100);
+        return configuration;
+    }
+
+    protected void executeJob(JobGraph jobGraph, Configuration configuration, int numRecordsToSend)
+            throws Exception {
+        JobGraphRunningUtil.execute(
+                jobGraph, configuration, NUM_TASK_MANAGERS, NUM_SLOTS_PER_TASK_MANAGER);
+        checkAllDataReceived(numRecordsToSend);
+    }
+
+    private void checkAllDataReceived(int numRecordsToSend) {
+        assertThat(Arrays.stream(NUM_RECEIVED_RECORDS).sum())
+                .isEqualTo(numRecordsToSend * PARALLELISM);
+    }
+
+    private static class StringSource implements ParallelSourceFunction<String> {
+        private volatile boolean isRunning = true;
+        private int numRecordsToSend;
+
+        StringSource(int numRecordsToSend) {
+            this.numRecordsToSend = numRecordsToSend;
+        }
+
+        @Override
+        public void run(SourceContext<String> ctx) throws Exception {
+            while (isRunning && numRecordsToSend-- > 0) {
+                ctx.collect(RECORD);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            isRunning = false;
+        }
+    }
+
+    private static class VerifySink extends RichSinkFunction<String> {
+        private final boolean failExecution;
+
+        private final boolean deletePartitionFile;
+
+        VerifySink(boolean failExecution, boolean deletePartitionFile) {
+            this.failExecution = failExecution;
+            this.deletePartitionFile = deletePartitionFile;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            NUM_RECEIVED_RECORDS[getRuntimeContext().getIndexOfThisSubtask()] = 0;
+            if (getRuntimeContext().getAttemptNumber() > 0
+                    || getRuntimeContext().getIndexOfThisSubtask() != 0) {
+                return;
+            }
+
+            if (deletePartitionFile) {
+                synchronized (BlockingShuffleITCase.class) {
+                    deleteFiles(tmpDir.toFile());
+                }
+            }
+
+            if (failExecution) {
+                throw new RuntimeException("expected exception.");
+            }
+        }
+
+        @Override
+        public void invoke(String value, Context context) throws Exception {
+            NUM_RECEIVED_RECORDS[getRuntimeContext().getIndexOfThisSubtask()]++;
+            assertThat(value).isEqualTo(RECORD);
+        }
+
+        private static void deleteFiles(File root) throws IOException {
+            File[] files = root.listFiles();
+            if (files == null || files.length == 0) {
+                return;
+            }
+
+            for (File file : files) {
+                if (!file.isDirectory()) {
+                    Files.deleteIfExists(file.toPath());
+                } else {
+                    deleteFiles(file);
+                }
+            }
+        }
+    }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index ac99299f866..f64d52b37d5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -18,75 +18,49 @@
 
 package org.apache.flink.test.runtime;
 
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.BatchShuffleMode;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobType;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.junit.Assert.assertEquals;
+import org.junit.jupiter.api.Test;
 
 /** Tests for blocking shuffle. */
-public class BlockingShuffleITCase {
-
-    private static final String RECORD = "hello, world!";
-
-    private final int numTaskManagers = 2;
-
-    private final int numSlotsPerTaskManager = 4;
-
-    @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+class BlockingShuffleITCase extends BatchShuffleITCaseBase {
 
     @Test
     public void testBoundedBlockingShuffle() throws Exception {
-        JobGraph jobGraph = createJobGraph(1000000, false);
+        final int numRecordsToSend = 1000000;
+        JobGraph jobGraph = createJobGraph(1000000, false, false);
         Configuration configuration = getConfiguration();
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
                 Integer.MAX_VALUE);
 
-        JobGraphRunningUtil.execute(
-                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+        executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
     @Test
     public void testBoundedBlockingShuffleWithoutData() throws Exception {
-        JobGraph jobGraph = createJobGraph(0, false);
+        JobGraph jobGraph = createJobGraph(0, false, false);
         Configuration configuration = getConfiguration();
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
                 Integer.MAX_VALUE);
 
-        JobGraphRunningUtil.execute(
-                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+        executeJob(jobGraph, configuration, 0);
     }
 
     @Test
     public void testSortMergeBlockingShuffle() throws Exception {
+        final int numRecordsToSend = 1000000;
         Configuration configuration = getConfiguration();
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
-        JobGraph jobGraph = createJobGraph(1000000, false);
-        JobGraphRunningUtil.execute(
-                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+        JobGraph jobGraph = createJobGraph(1000000, false, false);
+        executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
     @Test
@@ -95,9 +69,8 @@ public class BlockingShuffleITCase {
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
-        JobGraph jobGraph = createJobGraph(0, false);
-        JobGraphRunningUtil.execute(
-                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+        JobGraph jobGraph = createJobGraph(0, false, false);
+        executeJob(jobGraph, configuration, 0);
     }
 
     @Test
@@ -107,103 +80,22 @@ public class BlockingShuffleITCase {
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
                 Integer.MAX_VALUE);
 
-        JobGraph jobGraph = createJobGraph(0, true);
-        JobGraphRunningUtil.execute(
-                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+        JobGraph jobGraph = createJobGraph(0, false, true);
+        executeJob(jobGraph, configuration, 0);
     }
 
     @Test
     public void testDeletePartitionFileOfSortMergeBlockingShuffle() throws Exception {
         Configuration configuration = getConfiguration();
-        JobGraph jobGraph = createJobGraph(0, true);
-        JobGraphRunningUtil.execute(
-                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+        JobGraph jobGraph = createJobGraph(0, false, true);
+        executeJob(jobGraph, configuration, 0);
     }
 
-    private Configuration getConfiguration() {
-        Configuration configuration = new Configuration();
-        configuration.set(CoreOptions.TMP_DIRS, TEMP_FOLDER.getRoot().getAbsolutePath());
-        configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 100);
+    @Override
+    protected Configuration getConfiguration() {
+        Configuration configuration = super.getConfiguration();
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_BLOCKING);
         return configuration;
     }
-
-    private JobGraph createJobGraph(int numRecordsToSend, boolean deletePartitionFile) {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
-        env.setBufferTimeout(-1);
-        env.setParallelism(numTaskManagers * numSlotsPerTaskManager);
-        DataStream<String> source = env.addSource(new StringSource(numRecordsToSend));
-        source.rebalance()
-                .map((MapFunction<String, String>) value -> value)
-                .broadcast()
-                .addSink(new VerifySink(deletePartitionFile));
-
-        StreamGraph streamGraph = env.getStreamGraph();
-        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
-        // a scheduler supporting batch jobs is required for this job graph, because it contains
-        // blocking data exchanges.
-        // The scheduler is selected based on the JobType.
-        streamGraph.setJobType(JobType.BATCH);
-        return StreamingJobGraphGenerator.createJobGraph(streamGraph);
-    }
-
-    private static class StringSource implements ParallelSourceFunction<String> {
-        private volatile boolean isRunning = true;
-        private int numRecordsToSend;
-
-        StringSource(int numRecordsToSend) {
-            this.numRecordsToSend = numRecordsToSend;
-        }
-
-        @Override
-        public void run(SourceContext<String> ctx) throws Exception {
-            while (isRunning && numRecordsToSend-- > 0) {
-                ctx.collect(RECORD);
-            }
-        }
-
-        @Override
-        public void cancel() {
-            isRunning = false;
-        }
-    }
-
-    private static class VerifySink extends RichSinkFunction<String> {
-        private final boolean deletePartitionFile;
-
-        VerifySink(boolean deletePartitionFile) {
-            this.deletePartitionFile = deletePartitionFile;
-        }
-
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            if (!deletePartitionFile || getRuntimeContext().getAttemptNumber() > 0) {
-                return;
-            }
-
-            synchronized (BlockingShuffleITCase.class) {
-                deleteFiles(TEMP_FOLDER.getRoot());
-            }
-        }
-
-        @Override
-        public void invoke(String value, Context context) throws Exception {
-            assertEquals(RECORD, value);
-        }
-
-        private static void deleteFiles(File root) throws IOException {
-            File[] files = root.listFiles();
-            if (files == null || files.length == 0) {
-                return;
-            }
-
-            for (File file : files) {
-                if (!file.isDirectory()) {
-                    Files.deleteIfExists(file.toPath());
-                } else {
-                    deleteFiles(file);
-                }
-            }
-        }
-    }
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
new file mode 100644
index 00000000000..27171bd78f1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.junit.jupiter.api.Test;
+
+/** Tests for hybrid shuffle mode. */
+class HybridShuffleITCase extends BatchShuffleITCaseBase {
+
+    @Test
+    void testHybridFullExchanges() throws Exception {
+        final int numRecordsToSend = 10000;
+        Configuration configuration = getConfiguration();
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, false);
+        executeJob(jobGraph, configuration, numRecordsToSend);
+    }
+
+    @Test
+    void testHybridSelectiveExchanges() throws Exception {
+        final int numRecordsToSend = 10000;
+        Configuration configuration = getConfiguration();
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE,
+                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, false);
+        executeJob(jobGraph, configuration, numRecordsToSend);
+    }
+
+    @Test
+    void testHybridFullExchangesRestart() throws Exception {
+        final int numRecordsToSend = 10;
+        Configuration configuration = getConfiguration();
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, true);
+        executeJob(jobGraph, configuration, numRecordsToSend);
+    }
+
+    @Test
+    void testHybridSelectiveExchangesRestart() throws Exception {
+        final int numRecordsToSend = 10;
+        Configuration configuration = getConfiguration();
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE,
+                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, true);
+        executeJob(jobGraph, configuration, numRecordsToSend);
+    }
+}