You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/14 08:17:17 UTC
[flink] 02/02: [FLINK-28928][tests] Add E2E test for hybrid shuffle mode.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 00855438f2ee7ef4027076553357552157eb452f
Author: Weijie Guo <re...@163.com>
AuthorDate: Tue Sep 13 22:16:00 2022 +0800
[FLINK-28928][tests] Add E2E test for hybrid shuffle mode.
This closes #20750
---
.../flink/sql/tests/BatchSQLTestProgram.java | 25 ++++++++++++++++++++--
flink-end-to-end-tests/run-nightly-tests.sh | 4 +++-
.../test-scripts/test_batch_sql.sh | 2 +-
3 files changed, 27 insertions(+), 4 deletions(-)
diff --git a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
index 616d607d94f..bba9e1e9fa0 100644
--- a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
+++ b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
@@ -18,11 +18,13 @@
package org.apache.flink.sql.tests;
+import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.IteratorInputFormat;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
@@ -55,9 +57,10 @@ public class BatchSQLTestProgram {
ParameterTool params = ParameterTool.fromArgs(args);
String outputPath = params.getRequired("outputPath");
String sqlStatement = params.getRequired("sqlStatement");
-
+ String shuffleType = params.getRequired("shuffleType");
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
-
+ BatchShuffleMode shuffleMode = checkAndGetShuffleMode(shuffleType);
+ tEnv.getConfig().set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode);
((TableEnvironmentInternal) tEnv)
.registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0));
((TableEnvironmentInternal) tEnv)
@@ -75,6 +78,24 @@ public class BatchSQLTestProgram {
result.getJobClient().get().getJobExecutionResult().get();
}
+ private static BatchShuffleMode checkAndGetShuffleMode(String shuffleType) {
+ BatchShuffleMode shuffleMode;
+ switch (shuffleType.toLowerCase()) {
+ case "blocking":
+ shuffleMode = BatchShuffleMode.ALL_EXCHANGES_BLOCKING;
+ break;
+ case "hybrid_full":
+ shuffleMode = BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL;
+ break;
+ case "hybrid_selective":
+ shuffleMode = BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE;
+ break;
+ default:
+ throw new IllegalArgumentException("unsupported shuffle type : " + shuffleType);
+ }
+ return shuffleMode;
+ }
+
/** TableSource for generated data. */
public static class GeneratorTableSource extends InputFormatTableSource<Row> {
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index a9a78c92aef..cbedaf87b0b 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -184,7 +184,9 @@ function run_group_2 {
run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh" "skip_check_exceptions"
run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_allround.sh"
- run_test "Batch SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh"
+ run_test "Batch SQL end-to-end test using blocking shuffle" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh blocking"
+ run_test "Batch SQL end-to-end test using hybrid full shuffle" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh hybrid_full"
+ run_test "Batch SQL end-to-end test using hybrid selective shuffle" "$END_TO_END_DIR/test-scripts/test_batch_sql.sh hybrid_selective"
run_test "Streaming SQL end-to-end test using planner loader" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions"
run_test "Streaming SQL end-to-end test using planner with Scala version" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh scala-planner" "skip_check_exceptions"
diff --git a/flink-end-to-end-tests/test-scripts/test_batch_sql.sh b/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
index b7e45e739f4..56af85370cc 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_sql.sh
@@ -73,7 +73,7 @@ start_cluster
# The task has total 2 x (1 + 1 + 1 + 1) + 1 = 9 slots
$FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath "file://${OUTPUT_FILE_PATH}" -sqlStatement \
- "INSERT INTO sinkTable $(sqlJobQuery)"
+ "INSERT INTO sinkTable $(sqlJobQuery)" -shuffleType $1
# check result:
#1980,1970-01-01 00:00:00.0