You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/11/05 08:48:29 UTC

[flink-benchmarks] 02/02: [FLINK-24659] Drop RemoteBenchmarkBase.miniCluster field

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit 0f8d2e35f4915fbc7898672266f8af59009e06e1
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Nov 1 18:04:20 2021 +0100

    [FLINK-24659] Drop RemoteBenchmarkBase.miniCluster field
    
    Before this change all children of RemoteBenchmarkBase worked incorrectly since they
    configured the environment for miniCluster from FlinkEnvironmentContext but in reality,
    they used miniCluster from RemoteBenchmarkBase.
---
 .../BlockingPartitionRemoteChannelBenchmark.java   | 15 ++++-----
 .../flink/benchmark/RemoteBenchmarkBase.java       | 36 +++++++---------------
 .../RemoteChannelThroughputBenchmark.java          | 14 ++++-----
 3 files changed, 26 insertions(+), 39 deletions(-)

diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index 8752798..aa30de7 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -48,20 +48,16 @@ public class BlockingPartitionRemoteChannelBenchmark extends RemoteBenchmarkBase
         new Runner(options).run();
     }
 
-    @Override
-    public int getNumberOfVertexes() {
-        return NUM_VERTICES;
-    }
-
     @Benchmark
     public void remoteFilePartition(BlockingPartitionEnvironmentContext context) throws Exception {
         StreamGraph streamGraph =
                 StreamGraphUtils.buildGraphForBatchJob(context.env, RECORDS_PER_INVOCATION);
-        miniCluster.executeJobBlocking(StreamingJobGraphGenerator.createJobGraph(streamGraph));
+        context.miniCluster.executeJobBlocking(
+                StreamingJobGraphGenerator.createJobGraph(streamGraph));
     }
 
     /** Environment context for specific file based bounded blocking partition. */
-    public static class BlockingPartitionEnvironmentContext extends FlinkEnvironmentContext {
+    public static class BlockingPartitionEnvironmentContext extends RemoteBenchmarkContext {
 
         @Override
         public void setUp() throws Exception {
@@ -82,5 +78,10 @@ public class BlockingPartitionRemoteChannelBenchmark extends RemoteBenchmarkBase
                     FileUtils.getCurrentWorkingDirectory().toAbsolutePath().toString());
             return configuration;
         }
+
+        @Override
+        protected int getNumberOfVertices() {
+            return NUM_VERTICES;
+        }
     }
 }
diff --git a/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java b/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java
index 9fb58c4..e6fef61 100644
--- a/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.benchmark;
 
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.TearDown;
-
 /** Benchmark base for setting up the cluster to perform remote network shuffle. */
 public abstract class RemoteBenchmarkBase extends BenchmarkBase {
 
@@ -31,26 +25,18 @@ public abstract class RemoteBenchmarkBase extends BenchmarkBase {
     protected static final int RECORDS_PER_SUBTASK = 10_000_000;
     protected static final int RECORDS_PER_INVOCATION = RECORDS_PER_SUBTASK * PARALLELISM;
 
-    protected MiniCluster miniCluster;
-
-    @Setup
-    public void setUp() throws Exception {
-        MiniClusterConfiguration miniClusterConfiguration =
-                new MiniClusterConfiguration.Builder()
-                        .setNumTaskManagers(getNumberOfVertexes() * PARALLELISM)
-                        .setNumSlotsPerTaskManager(1)
-                        .build();
-        miniCluster = new MiniCluster(miniClusterConfiguration);
-        miniCluster.start();
-    }
+    public abstract static class RemoteBenchmarkContext extends FlinkEnvironmentContext {
+        @Override
+        protected int getNumberOfTaskManagers() {
+            return getNumberOfVertices() * PARALLELISM;
+        }
 
-    @TearDown
-    public void tearDown() throws Exception {
-        if (miniCluster != null) {
-            miniCluster.close();
+        @Override
+        protected int getNumberOfSlotsPerTaskManager() {
+            return 1;
         }
-    }
 
-    /** @return the number of vertexes the respective job graph contains. */
-    abstract int getNumberOfVertexes();
+        /** @return the number of vertices the respective job graph contains. */
+        abstract int getNumberOfVertices();
+    }
 }
diff --git a/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java b/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
index 924e395..4895be8 100644
--- a/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
@@ -57,11 +57,6 @@ public class RemoteChannelThroughputBenchmark extends RemoteBenchmarkBase {
         new Runner(options).run();
     }
 
-    @Override
-    public int getNumberOfVertexes() {
-        return NUM_VERTICES;
-    }
-
     @Benchmark
     public void remoteRebalance(RemoteChannelThroughputBenchmarkContext context) throws Exception {
         StreamExecutionEnvironment env = context.env;
@@ -78,12 +73,12 @@ public class RemoteChannelThroughputBenchmark extends RemoteBenchmarkBase {
                 .addSink(new DiscardingSink<>())
                 .slotSharingGroup("sink");
 
-        miniCluster.executeJobBlocking(
+        context.miniCluster.executeJobBlocking(
                 StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()));
     }
 
     @State(Scope.Thread)
-    public static class RemoteChannelThroughputBenchmarkContext extends FlinkEnvironmentContext {
+    public static class RemoteChannelThroughputBenchmarkContext extends RemoteBenchmarkContext {
         @Param({ALIGNED, UNALIGNED, DEBLOAT})
         public String mode = ALIGNED;
 
@@ -95,5 +90,10 @@ public class RemoteChannelThroughputBenchmark extends RemoteBenchmarkBase {
             }
             return configuration;
         }
+
+        @Override
+        protected int getNumberOfVertices() {
+            return NUM_VERTICES;
+        }
     }
 }