You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/13 08:17:18 UTC
[flink] 03/03: [FLINK-10301][network] extend
StreamNetworkBenchmarkEnvironment to allow custom Configuration instances
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f04af47a56a337f4d7d4ca6b914b3c0f62b4908d
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Fri Sep 7 13:38:54 2018 +0200
[FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances
This closes #6670.
---
.../StreamNetworkBenchmarkEnvironment.java | 46 ++++++++++++++++------
.../StreamNetworkPointToPointBenchmark.java | 9 ++++-
.../StreamNetworkThroughputBenchmark.java | 24 ++++++++++-
3 files changed, 64 insertions(+), 15 deletions(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 6b53488..bfaed43 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -68,11 +68,6 @@ import static org.apache.flink.util.MathUtils.checkedDownCast;
*/
public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
- private static final int BUFFER_SIZE =
- checkedDownCast(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes());
-
- private static final int NUM_SLOTS_AND_THREADS = 1;
-
private static final InetAddress LOCAL_ADDRESS;
static {
@@ -96,6 +91,21 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
protected ResultPartitionID[] partitionIds;
+ public void setUp(
+ int writers,
+ int channels,
+ boolean localMode,
+ int senderBufferPoolSize,
+ int receiverBufferPoolSize) throws Exception {
+ setUp(
+ writers,
+ channels,
+ localMode,
+ senderBufferPoolSize,
+ receiverBufferPoolSize,
+ new Configuration());
+ }
+
/**
* Sets up the environment including buffer pools and netty threads.
*
@@ -115,7 +125,8 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
int channels,
boolean localMode,
int senderBufferPoolSize,
- int receiverBufferPoolSize) throws Exception {
+ int receiverBufferPoolSize,
+ Configuration config) throws Exception {
this.localMode = localMode;
this.channels = channels;
this.partitionIds = new ResultPartitionID[writers];
@@ -128,13 +139,13 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
ioManager = new IOManagerAsync();
- senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize);
+ senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize, config);
senderEnv.start();
if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
receiverEnv = senderEnv;
}
else {
- receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize);
+ receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize, config);
receiverEnv.start();
}
@@ -179,12 +190,25 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
}
private NetworkEnvironment createNettyNetworkEnvironment(
- @SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
+ @SuppressWarnings("SameParameterValue") int bufferPoolSize, Configuration config) throws Exception {
+
+ int segmentSize =
+ checkedDownCast(
+ MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE))
+ .getBytes());
+
+ // we need this because many configs have been written with a "-1" entry
+ // similar to TaskManagerServicesConfiguration#fromConfiguration()
+ // -> please note that this directly influences the number of netty threads!
+ int slots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+ if (slots == -1) {
+ slots = 1;
+ }
- final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+ final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, segmentSize);
final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
- new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
+ new NettyConfig(LOCAL_ADDRESS, 0, segmentSize, slots, config));
return new NetworkEnvironment(
bufferPool,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 6b96c62..a8d18e4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.io.benchmark;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.types.LongValue;
@@ -61,6 +62,10 @@ public class StreamNetworkPointToPointBenchmark {
recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
}
+ public void setUp(long flushTimeout) throws Exception {
+ setUp(flushTimeout, new Configuration());
+ }
+
/**
* Initializes the throughput benchmark with the given parameters.
*
@@ -68,9 +73,9 @@ public class StreamNetworkPointToPointBenchmark {
* output flushing interval of the
* {@link org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher thread
*/
- public void setUp(long flushTimeout) throws Exception {
+ public void setUp(long flushTimeout, Configuration config) throws Exception {
environment = new StreamNetworkBenchmarkEnvironment<>();
- environment.setUp(1, 1, false, -1, -1);
+ environment.setUp(1, 1, false, -1, -1, config);
receiver = environment.createReceiver();
recordWriter = environment.createRecordWriter(0, flushTimeout);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index c55dd43..28d7f35 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.io.benchmark;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.LongValue;
import java.util.concurrent.CompletableFuture;
@@ -63,6 +64,24 @@ public class StreamNetworkThroughputBenchmark {
setUp(recordWriters, channels, flushTimeout, localMode, -1, -1);
}
+ public void setUp(
+ int recordWriters,
+ int channels,
+ int flushTimeout,
+ boolean localMode,
+ int senderBufferPoolSize,
+ int receiverBufferPoolSize) throws Exception {
+ setUp(
+ recordWriters,
+ channels,
+ flushTimeout,
+ localMode,
+ senderBufferPoolSize,
+ receiverBufferPoolSize,
+ new Configuration()
+ );
+ }
+
/**
* Initializes the throughput benchmark with the given parameters.
*
@@ -78,9 +97,10 @@ public class StreamNetworkThroughputBenchmark {
int flushTimeout,
boolean localMode,
int senderBufferPoolSize,
- int receiverBufferPoolSize) throws Exception {
+ int receiverBufferPoolSize,
+ Configuration config) throws Exception {
environment = new StreamNetworkBenchmarkEnvironment<>();
- environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize);
+ environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize, config);
receiver = environment.createReceiver();
writerThreads = new LongRecordWriterThread[recordWriters];
for (int writer = 0; writer < recordWriters; writer++) {