You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/26 03:55:42 UTC

[flink] branch release-1.16 updated: [hotfix] fix the problem that BatchShuffleItCase not subject to configuration.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new 074635cb813 [hotfix] fix the problem that BatchShuffleItCase not subject to configuration.
074635cb813 is described below

commit 074635cb813e4a4a3b1bc1363ca5232fbe4fd40b
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Sep 16 00:19:01 2022 +0800

    [hotfix] fix the problem that BatchShuffleItCase not subject to configuration.
    
    This closes #20857
---
 .../apache/flink/test/runtime/BatchShuffleITCaseBase.java  | 13 +++++++++----
 .../apache/flink/test/runtime/BlockingShuffleITCase.java   | 14 ++++++--------
 .../org/apache/flink/test/runtime/HybridShuffleITCase.java |  8 ++++----
 3 files changed, 19 insertions(+), 16 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
index 168dc70ba7a..c9595e9aa83 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
@@ -69,13 +69,18 @@ class BatchShuffleITCaseBase {
         tmpDir = TempDirUtils.newFolder(path, UUID.randomUUID().toString()).toPath();
     }
 
-    protected JobGraph createJobGraph(int numRecordsToSend, boolean failExecution) {
-        return createJobGraph(numRecordsToSend, failExecution, false);
+    protected JobGraph createJobGraph(
+            int numRecordsToSend, boolean failExecution, Configuration configuration) {
+        return createJobGraph(numRecordsToSend, failExecution, false, configuration);
     }
 
     protected JobGraph createJobGraph(
-            int numRecordsToSend, boolean failExecution, boolean deletePartitionFile) {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            int numRecordsToSend,
+            boolean failExecution,
+            boolean deletePartitionFile,
+            Configuration configuration) {
+        StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 0L));
         env.setParallelism(NUM_SLOTS_PER_TASK_MANAGER);
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index f64d52b37d5..21e13cd396a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -32,23 +32,21 @@ class BlockingShuffleITCase extends BatchShuffleITCaseBase {
     @Test
     public void testBoundedBlockingShuffle() throws Exception {
         final int numRecordsToSend = 1000000;
-        JobGraph jobGraph = createJobGraph(1000000, false, false);
         Configuration configuration = getConfiguration();
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
                 Integer.MAX_VALUE);
-
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, false, false, configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
     @Test
     public void testBoundedBlockingShuffleWithoutData() throws Exception {
-        JobGraph jobGraph = createJobGraph(0, false, false);
         Configuration configuration = getConfiguration();
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
                 Integer.MAX_VALUE);
-
+        JobGraph jobGraph = createJobGraph(0, false, false, configuration);
         executeJob(jobGraph, configuration, 0);
     }
 
@@ -59,7 +57,7 @@ class BlockingShuffleITCase extends BatchShuffleITCaseBase {
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
-        JobGraph jobGraph = createJobGraph(1000000, false, false);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, false, false, configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
@@ -69,7 +67,7 @@ class BlockingShuffleITCase extends BatchShuffleITCaseBase {
         configuration.setInteger(
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
-        JobGraph jobGraph = createJobGraph(0, false, false);
+        JobGraph jobGraph = createJobGraph(0, false, false, configuration);
         executeJob(jobGraph, configuration, 0);
     }
 
@@ -80,14 +78,14 @@ class BlockingShuffleITCase extends BatchShuffleITCaseBase {
                 NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
                 Integer.MAX_VALUE);
 
-        JobGraph jobGraph = createJobGraph(0, false, true);
+        JobGraph jobGraph = createJobGraph(0, false, true, configuration);
         executeJob(jobGraph, configuration, 0);
     }
 
     @Test
     public void testDeletePartitionFileOfSortMergeBlockingShuffle() throws Exception {
         Configuration configuration = getConfiguration();
-        JobGraph jobGraph = createJobGraph(0, false, true);
+        JobGraph jobGraph = createJobGraph(0, false, true, configuration);
         executeJob(jobGraph, configuration, 0);
     }
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
index 27171bd78f1..2df7fd93fa2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
@@ -34,7 +34,7 @@ class HybridShuffleITCase extends BatchShuffleITCaseBase {
         Configuration configuration = getConfiguration();
         configuration.set(
                 ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
-        JobGraph jobGraph = createJobGraph(numRecordsToSend, false);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, false, configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
@@ -45,7 +45,7 @@ class HybridShuffleITCase extends BatchShuffleITCaseBase {
         configuration.set(
                 ExecutionOptions.BATCH_SHUFFLE_MODE,
                 BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
-        JobGraph jobGraph = createJobGraph(numRecordsToSend, false);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, false, configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
@@ -55,7 +55,7 @@ class HybridShuffleITCase extends BatchShuffleITCaseBase {
         Configuration configuration = getConfiguration();
         configuration.set(
                 ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
-        JobGraph jobGraph = createJobGraph(numRecordsToSend, true);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, true, configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
@@ -66,7 +66,7 @@ class HybridShuffleITCase extends BatchShuffleITCaseBase {
         configuration.set(
                 ExecutionOptions.BATCH_SHUFFLE_MODE,
                 BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
-        JobGraph jobGraph = createJobGraph(numRecordsToSend, true);
+        JobGraph jobGraph = createJobGraph(numRecordsToSend, true, configuration);
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 }