You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/02/11 06:35:15 UTC
[flink-benchmarks] branch master updated: [FLINK-25959] Add micro-benchmarks for the sort-based blocking shuffle
This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
The following commit(s) were added to refs/heads/master by this push:
new ffcdbb4 [FLINK-25959] Add micro-benchmarks for the sort-based blocking shuffle
ffcdbb4 is described below
commit ffcdbb45c88e9453bd815229eebaec1e776722cb
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Feb 10 15:28:04 2022 +0800
[FLINK-25959] Add micro-benchmarks for the sort-based blocking shuffle
---
.../benchmark/BlockingPartitionBenchmark.java | 45 ++++++++++++++++++----
.../BlockingPartitionRemoteChannelBenchmark.java | 38 +++++++++++++++---
2 files changed, 70 insertions(+), 13 deletions(-)
diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
index 3bf055c..bcef395 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
@@ -66,6 +66,16 @@ public class BlockingPartitionBenchmark extends BenchmarkBase {
executeBenchmark(context.env);
}
+ @Benchmark
+ public void compressedSortPartition(CompressedSortEnvironmentContext context) throws Exception {
+ executeBenchmark(context.env);
+ }
+
+ @Benchmark
+ public void uncompressedSortPartition(UncompressedSortEnvironmentContext context) throws Exception {
+ executeBenchmark(context.env);
+ }
+
private void executeBenchmark(StreamExecutionEnvironment env) throws Exception {
StreamGraph streamGraph =
StreamGraphUtils.buildGraphForBatchJob(env, RECORDS_PER_INVOCATION);
@@ -92,12 +102,17 @@ public class BlockingPartitionBenchmark extends BenchmarkBase {
}
protected Configuration createConfiguration(
- boolean compressionEnabled, String subpartitionType) {
+ boolean compressionEnabled, String subpartitionType, boolean isSortShuffle) {
Configuration configuration = super.createConfiguration();
- configuration.setInteger(
- NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
- Integer.MAX_VALUE);
+ if (isSortShuffle) {
+ configuration.setInteger(
+ NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+ } else {
+ configuration.setInteger(
+ NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+ Integer.MAX_VALUE);
+ }
configuration.setBoolean(
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED,
compressionEnabled);
@@ -114,7 +129,7 @@ public class BlockingPartitionBenchmark extends BenchmarkBase {
extends BlockingPartitionEnvironmentContext {
@Override
protected Configuration createConfiguration() {
- return createConfiguration(false, "file");
+ return createConfiguration(false, "file", false);
}
}
@@ -122,7 +137,7 @@ public class BlockingPartitionBenchmark extends BenchmarkBase {
extends BlockingPartitionEnvironmentContext {
@Override
protected Configuration createConfiguration() {
- return createConfiguration(true, "file");
+ return createConfiguration(true, "file", false);
}
}
@@ -130,7 +145,23 @@ public class BlockingPartitionBenchmark extends BenchmarkBase {
extends BlockingPartitionEnvironmentContext {
@Override
protected Configuration createConfiguration() {
- return createConfiguration(false, "mmap");
+ return createConfiguration(false, "mmap", false);
+ }
+ }
+
+ public static class CompressedSortEnvironmentContext
+ extends BlockingPartitionEnvironmentContext {
+ @Override
+ protected Configuration createConfiguration() {
+ return createConfiguration(true, "file", true);
+ }
+ }
+
+ public static class UncompressedSortEnvironmentContext
+ extends BlockingPartitionEnvironmentContext {
+ @Override
+ protected Configuration createConfiguration() {
+ return createConfiguration(false, "file", true);
}
}
}
diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index 09fa02b..ee69eb7 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -49,7 +49,15 @@ public class BlockingPartitionRemoteChannelBenchmark extends RemoteBenchmarkBase
}
@Benchmark
- public void remoteFilePartition(BlockingPartitionEnvironmentContext context) throws Exception {
+ public void remoteFilePartition(RemoteFileEnvironmentContext context) throws Exception {
+ StreamGraph streamGraph =
+ StreamGraphUtils.buildGraphForBatchJob(context.env, RECORDS_PER_INVOCATION);
+ context.miniCluster.executeJobBlocking(
+ StreamingJobGraphGenerator.createJobGraph(streamGraph));
+ }
+
+ @Benchmark
+ public void remoteSortPartition(RemoteSortEnvironmentContext context) throws Exception {
StreamGraph streamGraph =
StreamGraphUtils.buildGraphForBatchJob(context.env, RECORDS_PER_INVOCATION);
context.miniCluster.executeJobBlocking(
@@ -67,13 +75,17 @@ public class BlockingPartitionRemoteChannelBenchmark extends RemoteBenchmarkBase
env.setBufferTimeout(-1);
}
- @Override
- protected Configuration createConfiguration() {
+ protected Configuration createConfiguration(boolean isSortShuffle) {
Configuration configuration = super.createConfiguration();
- configuration.setInteger(
- NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
- Integer.MAX_VALUE);
+ if (isSortShuffle) {
+ configuration.setInteger(
+ NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+ } else {
+ configuration.setInteger(
+ NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+ Integer.MAX_VALUE);
+ }
configuration.setString(
NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file");
configuration.setString(
@@ -87,4 +99,18 @@ public class BlockingPartitionRemoteChannelBenchmark extends RemoteBenchmarkBase
return NUM_VERTICES;
}
}
+
+ public static class RemoteFileEnvironmentContext extends BlockingPartitionEnvironmentContext {
+ @Override
+ protected Configuration createConfiguration() {
+ return createConfiguration(false);
+ }
+ }
+
+ public static class RemoteSortEnvironmentContext extends BlockingPartitionEnvironmentContext {
+ @Override
+ protected Configuration createConfiguration() {
+ return createConfiguration(true);
+ }
+ }
}