You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/11/05 08:48:29 UTC
[flink-benchmarks] 02/02: [FLINK-24659] Drop
RemoteBenchmarkBase.miniCluster field
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
commit 0f8d2e35f4915fbc7898672266f8af59009e06e1
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Nov 1 18:04:20 2021 +0100
[FLINK-24659] Drop RemoteBenchmarkBase.miniCluster field
Before this change all children of RemoteBenchmarkBase worked incorrectly since they
configured the environment for miniCluster from FlinkEnvironmentContext but in reality,
they used miniCluster from RemoteBenchmarkBase.
---
.../BlockingPartitionRemoteChannelBenchmark.java | 15 ++++-----
.../flink/benchmark/RemoteBenchmarkBase.java | 36 +++++++---------------
.../RemoteChannelThroughputBenchmark.java | 14 ++++-----
3 files changed, 26 insertions(+), 39 deletions(-)
diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index 8752798..aa30de7 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -48,20 +48,16 @@ public class BlockingPartitionRemoteChannelBenchmark extends RemoteBenchmarkBase
new Runner(options).run();
}
- @Override
- public int getNumberOfVertexes() {
- return NUM_VERTICES;
- }
-
@Benchmark
public void remoteFilePartition(BlockingPartitionEnvironmentContext context) throws Exception {
StreamGraph streamGraph =
StreamGraphUtils.buildGraphForBatchJob(context.env, RECORDS_PER_INVOCATION);
- miniCluster.executeJobBlocking(StreamingJobGraphGenerator.createJobGraph(streamGraph));
+ context.miniCluster.executeJobBlocking(
+ StreamingJobGraphGenerator.createJobGraph(streamGraph));
}
/** Environment context for specific file based bounded blocking partition. */
- public static class BlockingPartitionEnvironmentContext extends FlinkEnvironmentContext {
+ public static class BlockingPartitionEnvironmentContext extends RemoteBenchmarkContext {
@Override
public void setUp() throws Exception {
@@ -82,5 +78,10 @@ public class BlockingPartitionRemoteChannelBenchmark extends RemoteBenchmarkBase
FileUtils.getCurrentWorkingDirectory().toAbsolutePath().toString());
return configuration;
}
+
+ @Override
+ protected int getNumberOfVertices() {
+ return NUM_VERTICES;
+ }
}
}
diff --git a/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java b/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java
index 9fb58c4..e6fef61 100644
--- a/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java
@@ -18,12 +18,6 @@
package org.apache.flink.benchmark;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.TearDown;
-
/** Benchmark base for setting up the cluster to perform remote network shuffle. */
public abstract class RemoteBenchmarkBase extends BenchmarkBase {
@@ -31,26 +25,18 @@ public abstract class RemoteBenchmarkBase extends BenchmarkBase {
protected static final int RECORDS_PER_SUBTASK = 10_000_000;
protected static final int RECORDS_PER_INVOCATION = RECORDS_PER_SUBTASK * PARALLELISM;
- protected MiniCluster miniCluster;
-
- @Setup
- public void setUp() throws Exception {
- MiniClusterConfiguration miniClusterConfiguration =
- new MiniClusterConfiguration.Builder()
- .setNumTaskManagers(getNumberOfVertexes() * PARALLELISM)
- .setNumSlotsPerTaskManager(1)
- .build();
- miniCluster = new MiniCluster(miniClusterConfiguration);
- miniCluster.start();
- }
+ public abstract static class RemoteBenchmarkContext extends FlinkEnvironmentContext {
+ @Override
+ protected int getNumberOfTaskManagers() {
+ return getNumberOfVertices() * PARALLELISM;
+ }
- @TearDown
- public void tearDown() throws Exception {
- if (miniCluster != null) {
- miniCluster.close();
+ @Override
+ protected int getNumberOfSlotsPerTaskManager() {
+ return 1;
}
- }
- /** @return the number of vertexes the respective job graph contains. */
- abstract int getNumberOfVertexes();
+ /** @return the number of vertices the respective job graph contains. */
+ abstract int getNumberOfVertices();
+ }
}
diff --git a/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java b/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
index 924e395..4895be8 100644
--- a/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/RemoteChannelThroughputBenchmark.java
@@ -57,11 +57,6 @@ public class RemoteChannelThroughputBenchmark extends RemoteBenchmarkBase {
new Runner(options).run();
}
- @Override
- public int getNumberOfVertexes() {
- return NUM_VERTICES;
- }
-
@Benchmark
public void remoteRebalance(RemoteChannelThroughputBenchmarkContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
@@ -78,12 +73,12 @@ public class RemoteChannelThroughputBenchmark extends RemoteBenchmarkBase {
.addSink(new DiscardingSink<>())
.slotSharingGroup("sink");
- miniCluster.executeJobBlocking(
+ context.miniCluster.executeJobBlocking(
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()));
}
@State(Scope.Thread)
- public static class RemoteChannelThroughputBenchmarkContext extends FlinkEnvironmentContext {
+ public static class RemoteChannelThroughputBenchmarkContext extends RemoteBenchmarkContext {
@Param({ALIGNED, UNALIGNED, DEBLOAT})
public String mode = ALIGNED;
@@ -95,5 +90,10 @@ public class RemoteChannelThroughputBenchmark extends RemoteBenchmarkBase {
}
return configuration;
}
+
+ @Override
+ protected int getNumberOfVertices() {
+ return NUM_VERTICES;
+ }
}
}