You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/02/11 06:35:15 UTC

[flink-benchmarks] branch master updated: [FLINK-25959] Add micro-benchmarks for the sort-based blocking shuffle

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
     new ffcdbb4  [FLINK-25959] Add micro-benchmarks for the sort-based blocking shuffle
ffcdbb4 is described below

commit ffcdbb45c88e9453bd815229eebaec1e776722cb
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Feb 10 15:28:04 2022 +0800

    [FLINK-25959] Add micro-benchmarks for the sort-based blocking shuffle
---
 .../benchmark/BlockingPartitionBenchmark.java      | 45 ++++++++++++++++++----
 .../BlockingPartitionRemoteChannelBenchmark.java   | 38 +++++++++++++++---
 2 files changed, 70 insertions(+), 13 deletions(-)

diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
index 3bf055c..bcef395 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
@@ -66,6 +66,16 @@ public class BlockingPartitionBenchmark extends BenchmarkBase {
         executeBenchmark(context.env);
     }
 
+    @Benchmark
+    public void compressedSortPartition(CompressedSortEnvironmentContext context) throws Exception {
+        executeBenchmark(context.env);
+    }
+
+    @Benchmark
+    public void uncompressedSortPartition(UncompressedSortEnvironmentContext context) throws Exception {
+        executeBenchmark(context.env);
+    }
+
     private void executeBenchmark(StreamExecutionEnvironment env) throws Exception {
         StreamGraph streamGraph =
                 StreamGraphUtils.buildGraphForBatchJob(env, RECORDS_PER_INVOCATION);
@@ -92,12 +102,17 @@ public class BlockingPartitionBenchmark extends BenchmarkBase {
         }
 
         protected Configuration createConfiguration(
-                boolean compressionEnabled, String subpartitionType) {
+                boolean compressionEnabled, String subpartitionType, boolean isSortShuffle) {
             Configuration configuration = super.createConfiguration();
 
-            configuration.setInteger(
-                    NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
-                    Integer.MAX_VALUE);
+            if (isSortShuffle) {
+                configuration.setInteger(
+                        NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+            } else {
+                configuration.setInteger(
+                        NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                        Integer.MAX_VALUE);
+            }
             configuration.setBoolean(
                     NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED,
                     compressionEnabled);
@@ -114,7 +129,7 @@ public class BlockingPartitionBenchmark extends BenchmarkBase {
             extends BlockingPartitionEnvironmentContext {
         @Override
         protected Configuration createConfiguration() {
-            return createConfiguration(false, "file");
+            return createConfiguration(false, "file", false);
         }
     }
 
@@ -122,7 +137,7 @@ public class BlockingPartitionBenchmark extends BenchmarkBase {
             extends BlockingPartitionEnvironmentContext {
         @Override
         protected Configuration createConfiguration() {
-            return createConfiguration(true, "file");
+            return createConfiguration(true, "file", false);
         }
     }
 
@@ -130,7 +145,23 @@ public class BlockingPartitionBenchmark extends BenchmarkBase {
             extends BlockingPartitionEnvironmentContext {
         @Override
         protected Configuration createConfiguration() {
-            return createConfiguration(false, "mmap");
+            return createConfiguration(false, "mmap", false);
+        }
+    }
+
+    public static class CompressedSortEnvironmentContext
+            extends BlockingPartitionEnvironmentContext {
+        @Override
+        protected Configuration createConfiguration() {
+            return createConfiguration(true, "file", true);
+        }
+    }
+
+    public static class UncompressedSortEnvironmentContext
+            extends BlockingPartitionEnvironmentContext {
+        @Override
+        protected Configuration createConfiguration() {
+            return createConfiguration(false, "file", true);
         }
     }
 }
diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index 09fa02b..ee69eb7 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -49,7 +49,15 @@ public class BlockingPartitionRemoteChannelBenchmark extends RemoteBenchmarkBase
     }
 
     @Benchmark
-    public void remoteFilePartition(BlockingPartitionEnvironmentContext context) throws Exception {
+    public void remoteFilePartition(RemoteFileEnvironmentContext context) throws Exception {
+        StreamGraph streamGraph =
+                StreamGraphUtils.buildGraphForBatchJob(context.env, RECORDS_PER_INVOCATION);
+        context.miniCluster.executeJobBlocking(
+                StreamingJobGraphGenerator.createJobGraph(streamGraph));
+    }
+
+    @Benchmark
+    public void remoteSortPartition(RemoteSortEnvironmentContext context) throws Exception {
         StreamGraph streamGraph =
                 StreamGraphUtils.buildGraphForBatchJob(context.env, RECORDS_PER_INVOCATION);
         context.miniCluster.executeJobBlocking(
@@ -67,13 +75,17 @@ public class BlockingPartitionRemoteChannelBenchmark extends RemoteBenchmarkBase
             env.setBufferTimeout(-1);
         }
 
-        @Override
-        protected Configuration createConfiguration() {
+        protected Configuration createConfiguration(boolean isSortShuffle) {
             Configuration configuration = super.createConfiguration();
 
-            configuration.setInteger(
-                    NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
-                    Integer.MAX_VALUE);
+            if (isSortShuffle) {
+                configuration.setInteger(
+                        NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+            } else {
+                configuration.setInteger(
+                        NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                        Integer.MAX_VALUE);
+            }
             configuration.setString(
                     NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file");
             configuration.setString(
@@ -87,4 +99,18 @@ public class BlockingPartitionRemoteChannelBenchmark extends RemoteBenchmarkBase
             return NUM_VERTICES;
         }
     }
+
+    public static class RemoteFileEnvironmentContext extends BlockingPartitionEnvironmentContext {
+        @Override
+        protected Configuration createConfiguration() {
+            return createConfiguration(false);
+        }
+    }
+
+    public static class RemoteSortEnvironmentContext extends BlockingPartitionEnvironmentContext {
+        @Override
+        protected Configuration createConfiguration() {
+            return createConfiguration(true);
+        }
+    }
 }