You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/14 08:17:16 UTC
[flink] 01/02: [FLINK-28928][tests] Add IT test for hybrid shuffle mode.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 641436d926c7f86eeff0cbe3a81ed56c81fb00bd
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Sep 5 16:02:59 2022 +0800
[FLINK-28928][tests] Add IT test for hybrid shuffle mode.
---
.../flink/test/runtime/BatchShuffleITCaseBase.java | 191 +++++++++++++++++++++
.../flink/test/runtime/BlockingShuffleITCase.java | 154 +++--------------
.../flink/test/runtime/HybridShuffleITCase.java | 72 ++++++++
3 files changed, 286 insertions(+), 131 deletions(-)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
new file mode 100644
index 00000000000..168dc70ba7a
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BatchShuffleITCaseBase.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base class for batch shuffle related IT tests. */
+class BatchShuffleITCaseBase {
+ private static final String RECORD = "batch shuffle test";
+
+ private static final int NUM_TASK_MANAGERS = 2;
+
+ private static final int NUM_SLOTS_PER_TASK_MANAGER = 10;
+
+ private static final int PARALLELISM = NUM_SLOTS_PER_TASK_MANAGER;
+
+ private static final int[] NUM_RECEIVED_RECORDS = new int[PARALLELISM];
+
+ private static Path tmpDir;
+
+ @BeforeAll
+ static void setup(@TempDir Path path) throws Exception {
+ tmpDir = TempDirUtils.newFolder(path, UUID.randomUUID().toString()).toPath();
+ }
+
+ protected JobGraph createJobGraph(int numRecordsToSend, boolean failExecution) {
+ return createJobGraph(numRecordsToSend, failExecution, false);
+ }
+
+ protected JobGraph createJobGraph(
+ int numRecordsToSend, boolean failExecution, boolean deletePartitionFile) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 0L));
+ env.setParallelism(NUM_SLOTS_PER_TASK_MANAGER);
+
+ DataStream<String> source =
+ new DataStreamSource<>(
+ env,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ new StreamSource<>(new StringSource(numRecordsToSend)),
+ true,
+ "source",
+ Boundedness.BOUNDED);
+ source.rebalance()
+ .map(value -> value)
+ .shuffle()
+ .addSink(new VerifySink(failExecution, deletePartitionFile));
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ streamGraph.setJobType(JobType.BATCH);
+ return StreamingJobGraphGenerator.createJobGraph(streamGraph);
+ }
+
+ protected Configuration getConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.set(CoreOptions.TMP_DIRS, tmpDir.toString());
+ configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
+ configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 100);
+ return configuration;
+ }
+
+ protected void executeJob(JobGraph jobGraph, Configuration configuration, int numRecordsToSend)
+ throws Exception {
+ JobGraphRunningUtil.execute(
+ jobGraph, configuration, NUM_TASK_MANAGERS, NUM_SLOTS_PER_TASK_MANAGER);
+ checkAllDataReceived(numRecordsToSend);
+ }
+
+ private void checkAllDataReceived(int numRecordsToSend) {
+ assertThat(Arrays.stream(NUM_RECEIVED_RECORDS).sum())
+ .isEqualTo(numRecordsToSend * PARALLELISM);
+ }
+
+ private static class StringSource implements ParallelSourceFunction<String> {
+ private volatile boolean isRunning = true;
+ private int numRecordsToSend;
+
+ StringSource(int numRecordsToSend) {
+ this.numRecordsToSend = numRecordsToSend;
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ while (isRunning && numRecordsToSend-- > 0) {
+ ctx.collect(RECORD);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+ }
+
+ private static class VerifySink extends RichSinkFunction<String> {
+ private final boolean failExecution;
+
+ private final boolean deletePartitionFile;
+
+ VerifySink(boolean failExecution, boolean deletePartitionFile) {
+ this.failExecution = failExecution;
+ this.deletePartitionFile = deletePartitionFile;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ NUM_RECEIVED_RECORDS[getRuntimeContext().getIndexOfThisSubtask()] = 0;
+ if (getRuntimeContext().getAttemptNumber() > 0
+ || getRuntimeContext().getIndexOfThisSubtask() != 0) {
+ return;
+ }
+
+ if (deletePartitionFile) {
+ synchronized (BlockingShuffleITCase.class) {
+ deleteFiles(tmpDir.toFile());
+ }
+ }
+
+ if (failExecution) {
+ throw new RuntimeException("expected exception.");
+ }
+ }
+
+ @Override
+ public void invoke(String value, Context context) throws Exception {
+ NUM_RECEIVED_RECORDS[getRuntimeContext().getIndexOfThisSubtask()]++;
+ assertThat(value).isEqualTo(RECORD);
+ }
+
+ private static void deleteFiles(File root) throws IOException {
+ File[] files = root.listFiles();
+ if (files == null || files.length == 0) {
+ return;
+ }
+
+ for (File file : files) {
+ if (!file.isDirectory()) {
+ Files.deleteIfExists(file.toPath());
+ } else {
+ deleteFiles(file);
+ }
+ }
+ }
+ }
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index ac99299f866..f64d52b37d5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -18,75 +18,49 @@
package org.apache.flink.test.runtime;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobType;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.junit.Assert.assertEquals;
+import org.junit.jupiter.api.Test;
/** Tests for blocking shuffle. */
-public class BlockingShuffleITCase {
-
- private static final String RECORD = "hello, world!";
-
- private final int numTaskManagers = 2;
-
- private final int numSlotsPerTaskManager = 4;
-
- @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+class BlockingShuffleITCase extends BatchShuffleITCaseBase {
@Test
public void testBoundedBlockingShuffle() throws Exception {
- JobGraph jobGraph = createJobGraph(1000000, false);
+ final int numRecordsToSend = 1000000;
+ JobGraph jobGraph = createJobGraph(1000000, false, false);
Configuration configuration = getConfiguration();
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
Integer.MAX_VALUE);
- JobGraphRunningUtil.execute(
- jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+ executeJob(jobGraph, configuration, numRecordsToSend);
}
@Test
public void testBoundedBlockingShuffleWithoutData() throws Exception {
- JobGraph jobGraph = createJobGraph(0, false);
+ JobGraph jobGraph = createJobGraph(0, false, false);
Configuration configuration = getConfiguration();
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
Integer.MAX_VALUE);
- JobGraphRunningUtil.execute(
- jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+ executeJob(jobGraph, configuration, 0);
}
@Test
public void testSortMergeBlockingShuffle() throws Exception {
+ final int numRecordsToSend = 1000000;
Configuration configuration = getConfiguration();
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
- JobGraph jobGraph = createJobGraph(1000000, false);
- JobGraphRunningUtil.execute(
- jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+ JobGraph jobGraph = createJobGraph(1000000, false, false);
+ executeJob(jobGraph, configuration, numRecordsToSend);
}
@Test
@@ -95,9 +69,8 @@ public class BlockingShuffleITCase {
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
- JobGraph jobGraph = createJobGraph(0, false);
- JobGraphRunningUtil.execute(
- jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+ JobGraph jobGraph = createJobGraph(0, false, false);
+ executeJob(jobGraph, configuration, 0);
}
@Test
@@ -107,103 +80,22 @@ public class BlockingShuffleITCase {
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
Integer.MAX_VALUE);
- JobGraph jobGraph = createJobGraph(0, true);
- JobGraphRunningUtil.execute(
- jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+ JobGraph jobGraph = createJobGraph(0, false, true);
+ executeJob(jobGraph, configuration, 0);
}
@Test
public void testDeletePartitionFileOfSortMergeBlockingShuffle() throws Exception {
Configuration configuration = getConfiguration();
- JobGraph jobGraph = createJobGraph(0, true);
- JobGraphRunningUtil.execute(
- jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager);
+ JobGraph jobGraph = createJobGraph(0, false, true);
+ executeJob(jobGraph, configuration, 0);
}
- private Configuration getConfiguration() {
- Configuration configuration = new Configuration();
- configuration.set(CoreOptions.TMP_DIRS, TEMP_FOLDER.getRoot().getAbsolutePath());
- configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 100);
+ @Override
+ protected Configuration getConfiguration() {
+ Configuration configuration = super.getConfiguration();
+ configuration.set(
+ ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_BLOCKING);
return configuration;
}
-
- private JobGraph createJobGraph(int numRecordsToSend, boolean deletePartitionFile) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
- env.setBufferTimeout(-1);
- env.setParallelism(numTaskManagers * numSlotsPerTaskManager);
- DataStream<String> source = env.addSource(new StringSource(numRecordsToSend));
- source.rebalance()
- .map((MapFunction<String, String>) value -> value)
- .broadcast()
- .addSink(new VerifySink(deletePartitionFile));
-
- StreamGraph streamGraph = env.getStreamGraph();
- streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
- // a scheduler supporting batch jobs is required for this job graph, because it contains
- // blocking data exchanges.
- // The scheduler is selected based on the JobType.
- streamGraph.setJobType(JobType.BATCH);
- return StreamingJobGraphGenerator.createJobGraph(streamGraph);
- }
-
- private static class StringSource implements ParallelSourceFunction<String> {
- private volatile boolean isRunning = true;
- private int numRecordsToSend;
-
- StringSource(int numRecordsToSend) {
- this.numRecordsToSend = numRecordsToSend;
- }
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- while (isRunning && numRecordsToSend-- > 0) {
- ctx.collect(RECORD);
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
- }
-
- private static class VerifySink extends RichSinkFunction<String> {
- private final boolean deletePartitionFile;
-
- VerifySink(boolean deletePartitionFile) {
- this.deletePartitionFile = deletePartitionFile;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (!deletePartitionFile || getRuntimeContext().getAttemptNumber() > 0) {
- return;
- }
-
- synchronized (BlockingShuffleITCase.class) {
- deleteFiles(TEMP_FOLDER.getRoot());
- }
- }
-
- @Override
- public void invoke(String value, Context context) throws Exception {
- assertEquals(RECORD, value);
- }
-
- private static void deleteFiles(File root) throws IOException {
- File[] files = root.listFiles();
- if (files == null || files.length == 0) {
- return;
- }
-
- for (File file : files) {
- if (!file.isDirectory()) {
- Files.deleteIfExists(file.toPath());
- } else {
- deleteFiles(file);
- }
- }
- }
- }
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
new file mode 100644
index 00000000000..27171bd78f1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.junit.jupiter.api.Test;
+
+/** Tests for hybrid shuffle mode. */
+class HybridShuffleITCase extends BatchShuffleITCaseBase {
+
+ @Test
+ void testHybridFullExchanges() throws Exception {
+ final int numRecordsToSend = 10000;
+ Configuration configuration = getConfiguration();
+ configuration.set(
+ ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
+ JobGraph jobGraph = createJobGraph(numRecordsToSend, false);
+ executeJob(jobGraph, configuration, numRecordsToSend);
+ }
+
+ @Test
+ void testHybridSelectiveExchanges() throws Exception {
+ final int numRecordsToSend = 10000;
+ Configuration configuration = getConfiguration();
+ configuration.set(
+ ExecutionOptions.BATCH_SHUFFLE_MODE,
+ BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
+ JobGraph jobGraph = createJobGraph(numRecordsToSend, false);
+ executeJob(jobGraph, configuration, numRecordsToSend);
+ }
+
+ @Test
+ void testHybridFullExchangesRestart() throws Exception {
+ final int numRecordsToSend = 10;
+ Configuration configuration = getConfiguration();
+ configuration.set(
+ ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
+ JobGraph jobGraph = createJobGraph(numRecordsToSend, true);
+ executeJob(jobGraph, configuration, numRecordsToSend);
+ }
+
+ @Test
+ void testHybridSelectiveExchangesRestart() throws Exception {
+ final int numRecordsToSend = 10;
+ Configuration configuration = getConfiguration();
+ configuration.set(
+ ExecutionOptions.BATCH_SHUFFLE_MODE,
+ BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
+ JobGraph jobGraph = createJobGraph(numRecordsToSend, true);
+ executeJob(jobGraph, configuration, numRecordsToSend);
+ }
+}