You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/13 08:17:15 UTC

[flink] branch master updated (a933d87 -> f04af47)

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from a933d87  [FLINK-10011] Release JobGraph from SubmittedJobGraphStore in Dispatcher
     new 230fd17  [hotfix][benchmarks] fix StreamNetworkThroughputBenchmark#setUp not forwarding localMode
     new fbe816e  [hotfix][benchmarks] add @Override
     new f04af47  [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/benchmark/SerializingLongReceiver.java      |  1 +
 .../StreamNetworkBenchmarkEnvironment.java         | 46 ++++++++++++++++------
 .../StreamNetworkPointToPointBenchmark.java        |  9 ++++-
 .../StreamNetworkThroughputBenchmark.java          | 26 ++++++++++--
 4 files changed, 66 insertions(+), 16 deletions(-)


[flink] 03/03: [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f04af47a56a337f4d7d4ca6b914b3c0f62b4908d
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Fri Sep 7 13:38:54 2018 +0200

    [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances
    
    This closes #6670.
---
 .../StreamNetworkBenchmarkEnvironment.java         | 46 ++++++++++++++++------
 .../StreamNetworkPointToPointBenchmark.java        |  9 ++++-
 .../StreamNetworkThroughputBenchmark.java          | 24 ++++++++++-
 3 files changed, 64 insertions(+), 15 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 6b53488..bfaed43 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -68,11 +68,6 @@ import static org.apache.flink.util.MathUtils.checkedDownCast;
  */
 public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
-	private static final int BUFFER_SIZE =
-		checkedDownCast(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes());
-
-	private static final int NUM_SLOTS_AND_THREADS = 1;
-
 	private static final InetAddress LOCAL_ADDRESS;
 
 	static {
@@ -96,6 +91,21 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
 	protected ResultPartitionID[] partitionIds;
 
+	public void setUp(
+			int writers,
+			int channels,
+			boolean localMode,
+			int senderBufferPoolSize,
+			int receiverBufferPoolSize) throws Exception {
+		setUp(
+			writers,
+			channels,
+			localMode,
+			senderBufferPoolSize,
+			receiverBufferPoolSize,
+			new Configuration());
+	}
+
 	/**
 	 * Sets up the environment including buffer pools and netty threads.
 	 *
@@ -115,7 +125,8 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 			int channels,
 			boolean localMode,
 			int senderBufferPoolSize,
-			int receiverBufferPoolSize) throws Exception {
+			int receiverBufferPoolSize,
+			Configuration config) throws Exception {
 		this.localMode = localMode;
 		this.channels = channels;
 		this.partitionIds = new ResultPartitionID[writers];
@@ -128,13 +139,13 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
 		ioManager = new IOManagerAsync();
 
-		senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize);
+		senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize, config);
 		senderEnv.start();
 		if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
 			receiverEnv = senderEnv;
 		}
 		else {
-			receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize);
+			receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize, config);
 			receiverEnv.start();
 		}
 
@@ -179,12 +190,25 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 	}
 
 	private NetworkEnvironment createNettyNetworkEnvironment(
-			@SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
+			@SuppressWarnings("SameParameterValue") int bufferPoolSize, Configuration config) throws Exception {
+
+		int segmentSize =
+			checkedDownCast(
+				MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE))
+					.getBytes());
+
+		// we need this because many configs have been written with a "-1" entry
+		// similar to TaskManagerServicesConfiguration#fromConfiguration()
+		// -> please note that this directly influences the number of netty threads!
+		int slots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+		if (slots == -1) {
+			slots = 1;
+		}
 
-		final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+		final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, segmentSize);
 
 		final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
-			new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
+			new NettyConfig(LOCAL_ADDRESS, 0, segmentSize, slots, config));
 
 		return new NetworkEnvironment(
 			bufferPool,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 6b96c62..a8d18e4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.types.LongValue;
 
@@ -61,6 +62,10 @@ public class StreamNetworkPointToPointBenchmark {
 		recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
 	}
 
+	public void setUp(long flushTimeout) throws Exception {
+		setUp(flushTimeout, new Configuration());
+	}
+
 	/**
 	 * Initializes the throughput benchmark with the given parameters.
 	 *
@@ -68,9 +73,9 @@ public class StreamNetworkPointToPointBenchmark {
 	 * 		output flushing interval of the
 	 * 		{@link org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher thread
 	 */
-	public void setUp(long flushTimeout) throws Exception {
+	public void setUp(long flushTimeout, Configuration config) throws Exception {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
-		environment.setUp(1, 1, false, -1, -1);
+		environment.setUp(1, 1, false, -1, -1, config);
 
 		receiver = environment.createReceiver();
 		recordWriter = environment.createRecordWriter(0, flushTimeout);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index c55dd43..28d7f35 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.LongValue;
 
 import java.util.concurrent.CompletableFuture;
@@ -63,6 +64,24 @@ public class StreamNetworkThroughputBenchmark {
 		setUp(recordWriters, channels, flushTimeout, localMode, -1, -1);
 	}
 
+	public void setUp(
+			int recordWriters,
+			int channels,
+			int flushTimeout,
+			boolean localMode,
+			int senderBufferPoolSize,
+			int receiverBufferPoolSize) throws Exception {
+		setUp(
+			recordWriters,
+			channels,
+			flushTimeout,
+			localMode,
+			senderBufferPoolSize,
+			receiverBufferPoolSize,
+			new Configuration()
+		);
+	}
+
 	/**
 	 * Initializes the throughput benchmark with the given parameters.
 	 *
@@ -78,9 +97,10 @@ public class StreamNetworkThroughputBenchmark {
 			int flushTimeout,
 			boolean localMode,
 			int senderBufferPoolSize,
-			int receiverBufferPoolSize) throws Exception {
+			int receiverBufferPoolSize,
+			Configuration config) throws Exception {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
-		environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize);
+		environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize, config);
 		receiver = environment.createReceiver();
 		writerThreads = new LongRecordWriterThread[recordWriters];
 		for (int writer = 0; writer < recordWriters; writer++) {


[flink] 02/03: [hotfix][benchmarks] add @Override

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fbe816e9700818252aa9462f18d99de00f54b446
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Mon Sep 10 09:56:51 2018 +0200

    [hotfix][benchmarks] add @Override
---
 .../flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java    | 1 +
 1 file changed, 1 insertion(+)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
index 580612c..a93e9d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
@@ -40,6 +40,7 @@ public class SerializingLongReceiver extends ReceiverThread {
 			});
 	}
 
+	@Override
 	protected void readRecords(long lastExpectedRecord) throws Exception {
 		LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord);
 		final LongValue value = new LongValue();


[flink] 01/03: [hotfix][benchmarks] fix StreamNetworkThroughputBenchmark#setUp not forwarding localMode

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 230fd17f5b271a9e5bdb84ce8a8d360cf407c244
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Mon Sep 10 11:05:28 2018 +0200

    [hotfix][benchmarks] fix StreamNetworkThroughputBenchmark#setUp not forwarding localMode
---
 .../runtime/io/benchmark/StreamNetworkThroughputBenchmark.java          | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index 1b0ef8a..c55dd43 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -60,7 +60,7 @@ public class StreamNetworkThroughputBenchmark {
 	}
 
 	public void setUp(int recordWriters, int channels, int flushTimeout, boolean localMode) throws Exception {
-		setUp(recordWriters, channels, flushTimeout, false, -1, -1);
+		setUp(recordWriters, channels, flushTimeout, localMode, -1, -1);
 	}
 
 	/**