You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/14 08:17:15 UTC

[flink] branch release-1.16 updated (8fb13a80390 -> 00855438f2e)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 8fb13a80390 [FLINK-29102] Fix unstable ChangelogLocalRecoveryITCase
     new 641436d926c [FLINK-28928][tests] Add IT test for hybrid shuffle mode.
     new 00855438f2e [FLINK-28928][tests] Add E2E test for hybrid shuffle mode.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/sql/tests/BatchSQLTestProgram.java       |  25 ++-
 flink-end-to-end-tests/run-nightly-tests.sh        |   4 +-
 .../test-scripts/test_batch_sql.sh                 |   2 +-
 .../flink/test/runtime/BatchShuffleITCaseBase.java | 191 +++++++++++++++++++++
 .../flink/test/runtime/BlockingShuffleITCase.java  | 154 +++--------------
 .../flink/test/runtime/HybridShuffleITCase.java    |  72 ++++++++
 6 files changed, 313 insertions(+), 135 deletions(-)
 create mode 100644 flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
 create mode 100644 flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java


[flink] 02/02: [FLINK-28928][tests] Add E2E test for hybrid shuffle mode.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 00855438f2ee7ef4027076553357552157eb452f
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Sep 13 22:16:00 2022 +0800

    [FLINK-28928][tests] Add E2E test for hybrid shuffle mode.
    
    This closes #20750
---
 .../flink/sql/tests/BatchSQLTestProgram.java       | 25 ++++++++++++++++++++--
 flink-end-to-end-tests/run-nightly-tests.sh        |  4 +++-
 .../test-scripts/test_batch_sql.sh                 |  2 +-
 3 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
index 616d607d94f..bba9e1e9fa0 100644
--- a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
+++ b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.sql.tests;
 
+import org.apache.flink.api.common.BatchShuffleMode;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.io.IteratorInputFormat;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
@@ -55,9 +57,10 @@ public class BatchSQLTestProgram {
         ParameterTool params = ParameterTool.fromArgs(args);
         String outputPath = params.getRequired("outputPath");
         String sqlStatement = params.getRequired("sqlStatement");
-
+        String shuffleType = params.getRequired("shuffleType");
         TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
-
+        BatchShuffleMode shuffleMode = checkAndGetShuffleMode(shuffleType);
+        tEnv.getConfig().set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
         ((TableEnvironmentInternal) tEnv)
                 .registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0));
         ((TableEnvironmentInternal) tEnv)
@@ -75,6 +78,24 @@ public class BatchSQLTestProgram {
         result.getJobClient().get().getJobExecutionResult().get();
     }
 
+    private static BatchShuffleMode checkAndGetShuffleMode(String shuffleType) {
+        BatchShuffleMode shuffleMode;
+        switch (shuffleType.toLowerCase()) {
+            case "blocking":
+                shuffleMode = BatchShuffleMode.ALL_EXCHANGES_BLOCKING;
+                break;
+            case "hybrid_full":
+                shuffleMode = BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL;
+                break;
+            case "hybrid_selective":
+                shuffleMode = BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE;
+                break;
+            default:
+                throw new IllegalArgumentException("unsupported shuffle type : " + shuffleType);
+        }
+        return shuffleMode;
+    }
+
     /** TableSource for generated data. */
     public static class GeneratorTableSource extends InputFormatTableSource<Row> {
 
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index a9a78c92aef..cbedaf87b0b 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -184,7 +184,9 @@ function run_group_2 {
     run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" "skip_check_exceptions"
 
     run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
-    run_test "Batch SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh"
+    run_test "Batch SQL end-to-end test using blocking shuffle" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh blocking"
+    run_test "Batch SQL end-to-end test using hybrid full shuffle" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh hybrid_full"
+    run_test "Batch SQL end-to-end test using hybrid selective shuffle" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh hybrid_selective"
     run_test "Streaming SQL end-to-end test using planner loader" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions"
     run_test "Streaming SQL end-to-end test using planner with Scala version" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh scala-planner" "skip_check_exceptions"
 
diff --git a/flink-end-to-end-tests/test-scripts/test_batch_sql.sh b/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
index b7e45e739f4..56af85370cc 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
@@ -73,7 +73,7 @@ start_cluster
 
 # The task has total 2 x (1 + 1 + 1 + 1) + 1 = 9 slots
 $FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath "file://${OUTPUT_FILE_PATH}" -sqlStatement \
-    "INSERT INTO sinkTable $(sqlJobQuery)"
+    "INSERT INTO sinkTable $(sqlJobQuery)" -shuffleType $1
 
 # check result:
 #1980,1970-01-01 00:00:00.0


[flink] 01/02: [FLINK-28928][tests] Add IT test for hybrid shuffle mode.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 641436d926c7f86eeff0cbe3a81ed56c81fb00bd
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Sep 5 16:02:59 2022 +0800

    [FLINK-28928][tests] Add IT test for hybrid shuffle mode.
---
 .../flink/test/runtime/BatchShuffleITCaseBase.java | 191 +++++++++++++++++++++
 .../flink/test/runtime/BlockingShuffleITCase.java  | 154 +++--------------
 .../flink/test/runtime/HybridShuffleITCase.java    |  72 ++++++++
 3 files changed, 286 insertions(+), 131 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
new file mode 100644
index 00000000000..168dc70ba7a
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base class for batch shuffle related IT tests. */
+class BatchShuffleITCaseBase {
+    private static final String RECORD = "batch shuffle test";
+
+    private static final int NUM_TASK_MANAGERS = 2;
+
+    private static final int NUM_SLOTS_PER_TASK_MANAGER = 10;
+
+    private static final int PARALLELISM = NUM_SLOTS_PER_TASK_MANAGER;
+
+    private static final int[] NUM_RECEIVED_RECORDS = new int[PARALLELISM];
+
+    private static Path tmpDir;
+
+    @BeforeAll
+    static void setup(@TempDir Path path) throws Exception {
+        tmpDir = TempDirUtils.newFolder(path, UUID.randomUUID().toString()).toPath();
+    }
+
+    protected JobGraph createJobGraph(int numRecordsToSend, boolean failExecution) {
+        return createJobGraph(numRecordsToSend, failExecution, false);
+    }
+
+    protected JobGraph createJobGraph(
+            int numRecordsToSend, boolean failExecution, boolean deletePartitionFile) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 0L));
+        env.setParallelism(NUM_SLOTS_PER_TASK_MANAGER);
+
+        DataStream<String> source =
+                new DataStreamSource<>(
+                        env,
+                        BasicTypeInfo.STRING_TYPE_INFO,
+                        new StreamSource<>(new StringSource(numRecordsToSend)),
+                        true,
+                        "source",
+                        Boundedness.BOUNDED);
+        source.rebalance()
+                .map(value -> value)
+                .shuffle()
+                .addSink(new VerifySink(failExecution, deletePartitionFile));
+
+        StreamGraph streamGraph = env.getStreamGraph();
+        streamGraph.setJobType(JobType.BATCH);
+        return StreamingJobGraphGenerator.createJobGraph(streamGraph);
+    }
+
+    protected Configuration getConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(CoreOptions.TMP_DIRS, tmpDir.toString());
+        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+        configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 100);
+        return configuration;
+    }
+
+    protected void executeJob(JobGraph jobGraph, Configuration configuration, int numRecordsToSend)
+            throws Exception {
+        JobGraphRunningUtil.execute(
+                jobGraph, configuration, NUM_TASK_MANAGERS, NUM_SLOTS_PER_TASK_MANAGER);
+        checkAllDataReceived(numRecordsToSend);
+    }
+
+    private void checkAllDataReceived(int numRecordsToSend) {
+        assertThat(Arrays.stream(NUM_RECEIVED_RECORDS).sum())
+                .isEqualTo(numRecordsToSend * PARALLELISM);
+    }
+
+    private static class StringSource implements ParallelSourceFunction<String> {
+        private volatile boolean isRunning = true;
+        private int numRecordsToSend;
+
+        StringSource(int numRecordsToSend) {
+            this.numRecordsToSend = numRecordsToSend;
+        }
+
+        @Override
+        public void run(SourceContext<String> ctx) throws Exception {
+            while (isRunning && numRecordsToSend-- > 0) {
+                ctx.collect(RECORD);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            isRunning = false;
+        }
+    }
+
+    private static class VerifySink extends RichSinkFunction<String> {
+        private final boolean failExecution;
+
+        private final boolean deletePartitionFile;
+
+        VerifySink(boolean failExecution, boolean deletePartitionFile) {
+            this.failExecution = failExecution;
+            this.deletePartitionFile = deletePartitionFile;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            NUM_RECEIVED_RECORDS[getRuntimeContext().getIndexOfThisSubtask()] = 0;
+            if (getRuntimeContext().getAttemptNumber() > 0
+                    || getRuntimeContext().getIndexOfThisSubtask() != 0) {
+                return;
+            }
+
+            if (deletePartitionFile) {
+                synchronized (BlockingShuffleITCase.class) {
+                    deleteFiles(tmpDir.toFile());
+                }
+            }
+
+            if (failExecution) {
+                throw new RuntimeException("expected exception.");
+            }
+        }
+
+        @Override
+        public void invoke(String value, Context context) throws Exception {
+            NUM_RECEIVED_RECORDS[getRuntimeContext().getIndexOfThisSubtask()]++;
+            assertThat(value).isEqualTo(RECORD);
+        }
+
+        private static void deleteFiles(File root) throws IOException {
+            File[] files = root.listFiles();
+            if (files == null || files.length == 0) {
+                return;
+            }
+
+            for (File file : files) {
+                if (!file.isDirectory()) {
+                    Files.deleteIfExists(file.toPath());
+                } else {
+                    deleteFiles(file);
+                }
+            }
+        }
+    }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index ac99299f866..f64d52b37d5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -18,75 +18,49 @@
 
 package org.apache.flink.test.runtime;
 
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.BatchShuffleMode;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobType;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.junit.Assert.assertEquals;
+import org.junit.jupiter.api.Test;
 
 /** Tests for blocking shuffle. */
-public class BlockingShuffleITCase {
-
-    private static final String RECORD = "hello, world!";
-
-    private final int numTaskManagers = 2;
-
-    private final int numSlotsPerTaskManager = 4;
-
-    @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+class BlockingShuffleITCase extends BatchShuffleITCaseBase {
 
     @Test
     public void testBoundedBlockingShuffle() throws Exception {
-        JobGraph jobGraph = createJobGraph(1000000, false);
+        final int numRecordsToSend = 1000000;
+        JobGraph jobGraph = createJobGraph(1000000, false, false);
         Configuration configuration = getConfiguration();
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
                 Integer.MAX_VALUE);
 
-        JobGraphRunningUtil.execute(
-                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+        executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
     @Test
     public void testBoundedBlockingShuffleWithoutData() throws Exception {
-        JobGraph jobGraph = createJobGraph(0, false);
+        JobGraph jobGraph = createJobGraph(0, false, false);
         Configuration configuration = getConfiguration();
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
                 Integer.MAX_VALUE);
 
-        JobGraphRunningUtil.execute(
-                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+        executeJob(jobGraph, configuration, 0);
     }
 
     @Test
     public void testSortMergeBlockingShuffle() throws Exception {
+        final int numRecordsToSend = 1000000;
         Configuration configuration = getConfiguration();
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
-        JobGraph jobGraph = createJobGraph(1000000, false);
-        JobGraphRunningUtil.execute(
-                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+        JobGraph jobGraph = createJobGraph(1000000, false, false);
+        executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
     @Test
@@ -95,9 +69,8 @@ public class BlockingShuffleITCase {
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
-        JobGraph jobGraph = createJobGraph(0, false);
-        JobGraphRunningUtil.execute(
-                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+        JobGraph jobGraph = createJobGraph(0, false, false);
+        executeJob(jobGraph, configuration, 0);
     }
 
     @Test
@@ -107,103 +80,22 @@ public class BlockingShuffleITCase {
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
                 Integer.MAX_VALUE);
 
-        JobGraph jobGraph = createJobGraph(0, true);
-        JobGraphRunningUtil.execute(
-                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+        JobGraph jobGraph = createJobGraph(0, false, true);
+        executeJob(jobGraph, configuration, 0);
     }
 
     @Test
     public void testDeletePartitionFileOfSortMergeBlockingShuffle() throws Exception {
         Configuration configuration = getConfiguration();
-        JobGraph jobGraph = createJobGraph(0, true);
-        JobGraphRunningUtil.execute(
-                jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+        JobGraph jobGraph = createJobGraph(0, false, true);
+        executeJob(jobGraph, configuration, 0);
     }
 
-    private Configuration getConfiguration() {
-        Configuration configuration = new Configuration();
-        configuration.set(CoreOptions.TMP_DIRS, TEMP_FOLDER.getRoot().getAbsolutePath());
-        configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 100);
+    @Override
+    protected Configuration getConfiguration() {
+        Configuration configuration = super.getConfiguration();
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_BLOCKING);
         return configuration;
     }
-
-    private JobGraph createJobGraph(int numRecordsToSend, boolean deletePartitionFile) {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
-        env.setBufferTimeout(-1);
-        env.setParallelism(numTaskManagers * numSlotsPerTaskManager);
-        DataStream<String> source = env.addSource(new StringSource(numRecordsToSend));
-        source.rebalance()
-                .map((MapFunction<String, String>) value -> value)
-                .broadcast()
-                .addSink(new VerifySink(deletePartitionFile));
-
-        StreamGraph streamGraph = env.getStreamGraph();
-        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
-        // a scheduler supporting batch jobs is required for this job graph, because it contains
-        // blocking data exchanges.
-        // The scheduler is selected based on the JobType.
-        streamGraph.setJobType(JobType.BATCH);
-        return StreamingJobGraphGenerator.createJobGraph(streamGraph);
-    }
-
-    private static class StringSource implements ParallelSourceFunction<String> {
-        private volatile boolean isRunning = true;
-        private int numRecordsToSend;
-
-        StringSource(int numRecordsToSend) {
-            this.numRecordsToSend = numRecordsToSend;
-        }
-
-        @Override
-        public void run(SourceContext<String> ctx) throws Exception {
-            while (isRunning && numRecordsToSend-- > 0) {
-                ctx.collect(RECORD);
-            }
-        }
-
-        @Override
-        public void cancel() {
-            isRunning = false;
-        }
-    }
-
-    private static class VerifySink extends RichSinkFunction<String> {
-        private final boolean deletePartitionFile;
-
-        VerifySink(boolean deletePartitionFile) {
-            this.deletePartitionFile = deletePartitionFile;
-        }
-
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            if (!deletePartitionFile || getRuntimeContext().getAttemptNumber() > 0) {
-                return;
-            }
-
-            synchronized (BlockingShuffleITCase.class) {
-                deleteFiles(TEMP_FOLDER.getRoot());
-            }
-        }
-
-        @Override
-        public void invoke(String value, Context context) throws Exception {
-            assertEquals(RECORD, value);
-        }
-
-        private static void deleteFiles(File root) throws IOException {
-            File[] files = root.listFiles();
-            if (files == null || files.length == 0) {
-                return;
-            }
-
-            for (File file : files) {
-                if (!file.isDirectory()) {
-                    Files.deleteIfExists(file.toPath());
-                } else {
-                    deleteFiles(file);
-                }
-            }
-        }
-    }
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
new file mode 100644
index 00000000000..27171bd78f1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.junit.jupiter.api.Test;
+
+/** Tests for hybrid shuffle mode. */
+class HybridShuffleITCase extends BatchShuffleITCaseBase {
+
+    @Test
+    void testHybridFullExchanges() throws Exception {
+        final int numRecordsToSend = 10000;
+        Configuration configuration = getConfiguration();
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, false);
+        executeJob(jobGraph, configuration, numRecordsToSend);
+    }
+
+    @Test
+    void testHybridSelectiveExchanges() throws Exception {
+        final int numRecordsToSend = 10000;
+        Configuration configuration = getConfiguration();
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE,
+                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, false);
+        executeJob(jobGraph, configuration, numRecordsToSend);
+    }
+
+    @Test
+    void testHybridFullExchangesRestart() throws Exception {
+        final int numRecordsToSend = 10;
+        Configuration configuration = getConfiguration();
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, true);
+        executeJob(jobGraph, configuration, numRecordsToSend);
+    }
+
+    @Test
+    void testHybridSelectiveExchangesRestart() throws Exception {
+        final int numRecordsToSend = 10;
+        Configuration configuration = getConfiguration();
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE,
+                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, true);
+        executeJob(jobGraph, configuration, numRecordsToSend);
+    }
+}