You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/14 08:17:17 UTC

[flink] 02/02: [FLINK-28928][tests] Add E2E test for hybrid shuffle mode.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 00855438f2ee7ef4027076553357552157eb452f
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Sep 13 22:16:00 2022 +0800

    [FLINK-28928][tests] Add E2E test for hybrid shuffle mode.
    
    This closes #20750
---
 .../flink/sql/tests/BatchSQLTestProgram.java       | 25 ++++++++++++++++++++--
 flink-end-to-end-tests/run-nightly-tests.sh        |  4 +++-
 .../test-scripts/test_batch_sql.sh                 |  2 +-
 3 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
index 616d607d94f..bba9e1e9fa0 100644
--- a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
+++ b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.sql.tests;
 
+import org.apache.flink.api.common.BatchShuffleMode;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.io.IteratorInputFormat;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
@@ -55,9 +57,10 @@ public class BatchSQLTestProgram {
         ParameterTool params = ParameterTool.fromArgs(args);
         String outputPath = params.getRequired("outputPath");
         String sqlStatement = params.getRequired("sqlStatement");
-
+        String shuffleType = params.getRequired("shuffleType");
         TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
-
+        BatchShuffleMode shuffleMode = checkAndGetShuffleMode(shuffleType);
+        tEnv.getConfig().set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
         ((TableEnvironmentInternal) tEnv)
                 .registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0));
         ((TableEnvironmentInternal) tEnv)
@@ -75,6 +78,24 @@ public class BatchSQLTestProgram {
         result.getJobClient().get().getJobExecutionResult().get();
     }
 
+    private static BatchShuffleMode checkAndGetShuffleMode(String shuffleType) {
+        BatchShuffleMode shuffleMode;
+        switch (shuffleType.toLowerCase()) {
+            case "blocking":
+                shuffleMode = BatchShuffleMode.ALL_EXCHANGES_BLOCKING;
+                break;
+            case "hybrid_full":
+                shuffleMode = BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL;
+                break;
+            case "hybrid_selective":
+                shuffleMode = BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE;
+                break;
+            default:
+                throw new IllegalArgumentException("unsupported shuffle type : " + shuffleType);
+        }
+        return shuffleMode;
+    }
+
     /** TableSource for generated data. */
     public static class GeneratorTableSource extends InputFormatTableSource<Row> {
 
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index a9a78c92aef..cbedaf87b0b 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -184,7 +184,9 @@ function run_group_2 {
     run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" "skip_check_exceptions"
 
     run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
-    run_test "Batch SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh"
+    run_test "Batch SQL end-to-end test using blocking shuffle" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh blocking"
+    run_test "Batch SQL end-to-end test using hybrid full shuffle" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh hybrid_full"
+    run_test "Batch SQL end-to-end test using hybrid selective shuffle" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh hybrid_selective"
     run_test "Streaming SQL end-to-end test using planner loader" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions"
     run_test "Streaming SQL end-to-end test using planner with Scala version" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh scala-planner" "skip_check_exceptions"
 
diff --git a/flink-end-to-end-tests/test-scripts/test_batch_sql.sh b/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
index b7e45e739f4..56af85370cc 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
@@ -73,7 +73,7 @@ start_cluster
 
 # The task has total 2 x (1 + 1 + 1 + 1) + 1 = 9 slots
 $FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath "file://${OUTPUT_FILE_PATH}" -sqlStatement \
-    "INSERT INTO sinkTable $(sqlJobQuery)"
+    "INSERT INTO sinkTable $(sqlJobQuery)" -shuffleType $1
 
 # check result:
 #1980,1970-01-01 00:00:00.0