You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/13 08:17:18 UTC

[flink] 03/03: [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f04af47a56a337f4d7d4ca6b914b3c0f62b4908d
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Fri Sep 7 13:38:54 2018 +0200

    [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances
    
    This closes #6670.
---
 .../StreamNetworkBenchmarkEnvironment.java         | 46 ++++++++++++++++------
 .../StreamNetworkPointToPointBenchmark.java        |  9 ++++-
 .../StreamNetworkThroughputBenchmark.java          | 24 ++++++++++-
 3 files changed, 64 insertions(+), 15 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 6b53488..bfaed43 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -68,11 +68,6 @@ import static org.apache.flink.util.MathUtils.checkedDownCast;
  */
 public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
-	private static final int BUFFER_SIZE =
-		checkedDownCast(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes());
-
-	private static final int NUM_SLOTS_AND_THREADS = 1;
-
 	private static final InetAddress LOCAL_ADDRESS;
 
 	static {
@@ -96,6 +91,21 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
 	protected ResultPartitionID[] partitionIds;
 
+	public void setUp(
+			int writers,
+			int channels,
+			boolean localMode,
+			int senderBufferPoolSize,
+			int receiverBufferPoolSize) throws Exception {
+		setUp(
+			writers,
+			channels,
+			localMode,
+			senderBufferPoolSize,
+			receiverBufferPoolSize,
+			new Configuration());
+	}
+
 	/**
 	 * Sets up the environment including buffer pools and netty threads.
 	 *
@@ -115,7 +125,8 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 			int channels,
 			boolean localMode,
 			int senderBufferPoolSize,
-			int receiverBufferPoolSize) throws Exception {
+			int receiverBufferPoolSize,
+			Configuration config) throws Exception {
 		this.localMode = localMode;
 		this.channels = channels;
 		this.partitionIds = new ResultPartitionID[writers];
@@ -128,13 +139,13 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
 		ioManager = new IOManagerAsync();
 
-		senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize);
+		senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize, config);
 		senderEnv.start();
 		if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
 			receiverEnv = senderEnv;
 		}
 		else {
-			receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize);
+			receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize, config);
 			receiverEnv.start();
 		}
 
@@ -179,12 +190,25 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 	}
 
 	private NetworkEnvironment createNettyNetworkEnvironment(
-			@SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
+			@SuppressWarnings("SameParameterValue") int bufferPoolSize, Configuration config) throws Exception {
+
+		int segmentSize =
+			checkedDownCast(
+				MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE))
+					.getBytes());
+
+		// we need this because many configs have been written with a "-1" entry
+		// similar to TaskManagerServicesConfiguration#fromConfiguration()
+		// -> please note that this directly influences the number of netty threads!
+		int slots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+		if (slots == -1) {
+			slots = 1;
+		}
 
-		final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+		final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, segmentSize);
 
 		final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
-			new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
+			new NettyConfig(LOCAL_ADDRESS, 0, segmentSize, slots, config));
 
 		return new NetworkEnvironment(
 			bufferPool,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 6b96c62..a8d18e4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.types.LongValue;
 
@@ -61,6 +62,10 @@ public class StreamNetworkPointToPointBenchmark {
 		recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
 	}
 
+	public void setUp(long flushTimeout) throws Exception {
+		setUp(flushTimeout, new Configuration());
+	}
+
 	/**
 	 * Initializes the throughput benchmark with the given parameters.
 	 *
@@ -68,9 +73,9 @@ public class StreamNetworkPointToPointBenchmark {
 	 * 		output flushing interval of the
 	 * 		{@link org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher thread
 	 */
-	public void setUp(long flushTimeout) throws Exception {
+	public void setUp(long flushTimeout, Configuration config) throws Exception {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
-		environment.setUp(1, 1, false, -1, -1);
+		environment.setUp(1, 1, false, -1, -1, config);
 
 		receiver = environment.createReceiver();
 		recordWriter = environment.createRecordWriter(0, flushTimeout);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index c55dd43..28d7f35 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.LongValue;
 
 import java.util.concurrent.CompletableFuture;
@@ -63,6 +64,24 @@ public class StreamNetworkThroughputBenchmark {
 		setUp(recordWriters, channels, flushTimeout, localMode, -1, -1);
 	}
 
+	public void setUp(
+			int recordWriters,
+			int channels,
+			int flushTimeout,
+			boolean localMode,
+			int senderBufferPoolSize,
+			int receiverBufferPoolSize) throws Exception {
+		setUp(
+			recordWriters,
+			channels,
+			flushTimeout,
+			localMode,
+			senderBufferPoolSize,
+			receiverBufferPoolSize,
+			new Configuration()
+		);
+	}
+
 	/**
 	 * Initializes the throughput benchmark with the given parameters.
 	 *
@@ -78,9 +97,10 @@ public class StreamNetworkThroughputBenchmark {
 			int flushTimeout,
 			boolean localMode,
 			int senderBufferPoolSize,
-			int receiverBufferPoolSize) throws Exception {
+			int receiverBufferPoolSize,
+			Configuration config) throws Exception {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
-		environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize);
+		environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize, config);
 		receiver = environment.createReceiver();
 		writerThreads = new LongRecordWriterThread[recordWriters];
 		for (int writer = 0; writer < recordWriters; writer++) {