You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/26 03:55:42 UTC
[flink] branch release-1.16 updated: [hotfix] fix the problem that BatchShuffleItCase not subject to configuration.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 074635cb813 [hotfix] fix the problem that BatchShuffleItCase not subject to configuration.
074635cb813 is described below
commit 074635cb813e4a4a3b1bc1363ca5232fbe4fd40b
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Sep 16 00:19:01 2022 +0800
[hotfix] fix the problem that BatchShuffleItCase not subject to configuration.
This closes #20857
---
.../apache/flink/test/runtime/BatchShuffleITCaseBase.java | 13 +++++++++----
.../apache/flink/test/runtime/BlockingShuffleITCase.java | 14 ++++++--------
.../org/apache/flink/test/runtime/HybridShuffleITCase.java | 8 ++++----
3 files changed, 19 insertions(+), 16 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
index 168dc70ba7a..c9595e9aa83 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
@@ -69,13 +69,18 @@ class BatchShuffleITCaseBase {
tmpDir = TempDirUtils.newFolder(path, UUID.randomUUID().toString()).toPath();
}
- protected JobGraph createJobGraph(int numRecordsToSend, boolean failExecution) {
- return createJobGraph(numRecordsToSend, failExecution, false);
+ protected JobGraph createJobGraph(
+ int numRecordsToSend, boolean failExecution, Configuration configuration) {
+ return createJobGraph(numRecordsToSend, failExecution, false, configuration);
}
protected JobGraph createJobGraph(
- int numRecordsToSend, boolean failExecution, boolean deletePartitionFile) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ int numRecordsToSend,
+ boolean failExecution,
+ boolean deletePartitionFile,
+ Configuration configuration) {
+ StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 0L));
env.setParallelism(NUM_SLOTS_PER_TASK_MANAGER);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index f64d52b37d5..21e13cd396a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -32,23 +32,21 @@ class BlockingShuffleITCase extends BatchShuffleITCaseBase {
@Test
public void testBoundedBlockingShuffle() throws Exception {
final int numRecordsToSend = 1000000;
- JobGraph jobGraph = createJobGraph(1000000, false, false);
Configuration configuration = getConfiguration();
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
Integer.MAX_VALUE);
-
+ JobGraph jobGraph = createJobGraph(numRecordsToSend, false, false, configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
@Test
public void testBoundedBlockingShuffleWithoutData() throws Exception {
- JobGraph jobGraph = createJobGraph(0, false, false);
Configuration configuration = getConfiguration();
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
Integer.MAX_VALUE);
-
+ JobGraph jobGraph = createJobGraph(0, false, false, configuration);
executeJob(jobGraph, configuration, 0);
}
@@ -59,7 +57,7 @@ class BlockingShuffleITCase extends BatchShuffleITCaseBase {
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
- JobGraph jobGraph = createJobGraph(1000000, false, false);
+ JobGraph jobGraph = createJobGraph(numRecordsToSend, false, false, configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
@@ -69,7 +67,7 @@ class BlockingShuffleITCase extends BatchShuffleITCaseBase {
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
- JobGraph jobGraph = createJobGraph(0, false, false);
+ JobGraph jobGraph = createJobGraph(0, false, false, configuration);
executeJob(jobGraph, configuration, 0);
}
@@ -80,14 +78,14 @@ class BlockingShuffleITCase extends BatchShuffleITCaseBase {
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
Integer.MAX_VALUE);
- JobGraph jobGraph = createJobGraph(0, false, true);
+ JobGraph jobGraph = createJobGraph(0, false, true, configuration);
executeJob(jobGraph, configuration, 0);
}
@Test
public void testDeletePartitionFileOfSortMergeBlockingShuffle() throws Exception {
Configuration configuration = getConfiguration();
- JobGraph jobGraph = createJobGraph(0, false, true);
+ JobGraph jobGraph = createJobGraph(0, false, true, configuration);
executeJob(jobGraph, configuration, 0);
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
index 27171bd78f1..2df7fd93fa2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
@@ -34,7 +34,7 @@ class HybridShuffleITCase extends BatchShuffleITCaseBase {
Configuration configuration = getConfiguration();
configuration.set(
ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
- JobGraph jobGraph = createJobGraph(numRecordsToSend, false);
+ JobGraph jobGraph = createJobGraph(numRecordsToSend, false, configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
@@ -45,7 +45,7 @@ class HybridShuffleITCase extends BatchShuffleITCaseBase {
configuration.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
- JobGraph jobGraph = createJobGraph(numRecordsToSend, false);
+ JobGraph jobGraph = createJobGraph(numRecordsToSend, false, configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
@@ -55,7 +55,7 @@ class HybridShuffleITCase extends BatchShuffleITCaseBase {
Configuration configuration = getConfiguration();
configuration.set(
ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
- JobGraph jobGraph = createJobGraph(numRecordsToSend, true);
+ JobGraph jobGraph = createJobGraph(numRecordsToSend, true, configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
@@ -66,7 +66,7 @@ class HybridShuffleITCase extends BatchShuffleITCaseBase {
configuration.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
- JobGraph jobGraph = createJobGraph(numRecordsToSend, true);
+ JobGraph jobGraph = createJobGraph(numRecordsToSend, true, configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
}
}