You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/13 08:36:13 UTC

[flink] branch release-1.6 updated (4d72047 -> 933c27f)

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 4d72047  [FLINK-10011] Release JobGraph from SubmittedJobGraphStore in Dispatcher
     new e0b774f  [hotfix][benchmarks] fix StreamNetworkThroughputBenchmark#setUp not forwarding localMode
     new 9ba7dac  [hotfix][benchmarks] add @Override
     new 933c27f  [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/benchmark/SerializingLongReceiver.java      |  1 +
 .../StreamNetworkBenchmarkEnvironment.java         | 46 ++++++++++++++++------
 .../StreamNetworkPointToPointBenchmark.java        |  9 ++++-
 .../StreamNetworkThroughputBenchmark.java          | 26 ++++++++++--
 4 files changed, 66 insertions(+), 16 deletions(-)


[flink] 02/03: [hotfix][benchmarks] add @Override

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9ba7daceae4c3fd300bceb5b58764c5803a005ef
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Mon Sep 10 09:56:51 2018 +0200

    [hotfix][benchmarks] add @Override
---
 .../flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java    | 1 +
 1 file changed, 1 insertion(+)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
index 580612c..a93e9d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
@@ -40,6 +40,7 @@ public class SerializingLongReceiver extends ReceiverThread {
 			});
 	}
 
+	@Override
 	protected void readRecords(long lastExpectedRecord) throws Exception {
 		LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord);
 		final LongValue value = new LongValue();


[flink] 01/03: [hotfix][benchmarks] fix StreamNetworkThroughputBenchmark#setUp not forwarding localMode

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e0b774fc5c24dba356832985e812160cd6f6a603
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Mon Sep 10 11:05:28 2018 +0200

    [hotfix][benchmarks] fix StreamNetworkThroughputBenchmark#setUp not forwarding localMode
---
 .../runtime/io/benchmark/StreamNetworkThroughputBenchmark.java          | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index 1b0ef8a..c55dd43 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -60,7 +60,7 @@ public class StreamNetworkThroughputBenchmark {
 	}
 
 	public void setUp(int recordWriters, int channels, int flushTimeout, boolean localMode) throws Exception {
-		setUp(recordWriters, channels, flushTimeout, false, -1, -1);
+		setUp(recordWriters, channels, flushTimeout, localMode, -1, -1);
 	}
 
 	/**


[flink] 03/03: [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 933c27f963c5ef53be07a31c8701737b83b28e13
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Fri Sep 7 13:38:54 2018 +0200

    [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances
    
    This closes #6670.
---
 .../StreamNetworkBenchmarkEnvironment.java         | 46 ++++++++++++++++------
 .../StreamNetworkPointToPointBenchmark.java        |  9 ++++-
 .../StreamNetworkThroughputBenchmark.java          | 24 ++++++++++-
 3 files changed, 64 insertions(+), 15 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 6b53488..bfaed43 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -68,11 +68,6 @@ import static org.apache.flink.util.MathUtils.checkedDownCast;
  */
 public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
-	private static final int BUFFER_SIZE =
-		checkedDownCast(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes());
-
-	private static final int NUM_SLOTS_AND_THREADS = 1;
-
 	private static final InetAddress LOCAL_ADDRESS;
 
 	static {
@@ -96,6 +91,21 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
 	protected ResultPartitionID[] partitionIds;
 
+	public void setUp(
+			int writers,
+			int channels,
+			boolean localMode,
+			int senderBufferPoolSize,
+			int receiverBufferPoolSize) throws Exception {
+		setUp(
+			writers,
+			channels,
+			localMode,
+			senderBufferPoolSize,
+			receiverBufferPoolSize,
+			new Configuration());
+	}
+
 	/**
 	 * Sets up the environment including buffer pools and netty threads.
 	 *
@@ -115,7 +125,8 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 			int channels,
 			boolean localMode,
 			int senderBufferPoolSize,
-			int receiverBufferPoolSize) throws Exception {
+			int receiverBufferPoolSize,
+			Configuration config) throws Exception {
 		this.localMode = localMode;
 		this.channels = channels;
 		this.partitionIds = new ResultPartitionID[writers];
@@ -128,13 +139,13 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
 		ioManager = new IOManagerAsync();
 
-		senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize);
+		senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize, config);
 		senderEnv.start();
 		if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
 			receiverEnv = senderEnv;
 		}
 		else {
-			receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize);
+			receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize, config);
 			receiverEnv.start();
 		}
 
@@ -179,12 +190,25 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 	}
 
 	private NetworkEnvironment createNettyNetworkEnvironment(
-			@SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
+			@SuppressWarnings("SameParameterValue") int bufferPoolSize, Configuration config) throws Exception {
+
+		int segmentSize =
+			checkedDownCast(
+				MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE))
+					.getBytes());
+
+		// we need this because many configs have been written with a "-1" entry
+		// similar to TaskManagerServicesConfiguration#fromConfiguration()
+		// -> please note that this directly influences the number of netty threads!
+		int slots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+		if (slots == -1) {
+			slots = 1;
+		}
 
-		final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+		final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, segmentSize);
 
 		final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
-			new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
+			new NettyConfig(LOCAL_ADDRESS, 0, segmentSize, slots, config));
 
 		return new NetworkEnvironment(
 			bufferPool,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 6b96c62..a8d18e4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.types.LongValue;
 
@@ -61,6 +62,10 @@ public class StreamNetworkPointToPointBenchmark {
 		recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
 	}
 
+	public void setUp(long flushTimeout) throws Exception {
+		setUp(flushTimeout, new Configuration());
+	}
+
 	/**
 	 * Initializes the throughput benchmark with the given parameters.
 	 *
@@ -68,9 +73,9 @@ public class StreamNetworkPointToPointBenchmark {
 	 * 		output flushing interval of the
 	 * 		{@link org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher thread
 	 */
-	public void setUp(long flushTimeout) throws Exception {
+	public void setUp(long flushTimeout, Configuration config) throws Exception {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
-		environment.setUp(1, 1, false, -1, -1);
+		environment.setUp(1, 1, false, -1, -1, config);
 
 		receiver = environment.createReceiver();
 		recordWriter = environment.createRecordWriter(0, flushTimeout);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index c55dd43..28d7f35 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.LongValue;
 
 import java.util.concurrent.CompletableFuture;
@@ -63,6 +64,24 @@ public class StreamNetworkThroughputBenchmark {
 		setUp(recordWriters, channels, flushTimeout, localMode, -1, -1);
 	}
 
+	public void setUp(
+			int recordWriters,
+			int channels,
+			int flushTimeout,
+			boolean localMode,
+			int senderBufferPoolSize,
+			int receiverBufferPoolSize) throws Exception {
+		setUp(
+			recordWriters,
+			channels,
+			flushTimeout,
+			localMode,
+			senderBufferPoolSize,
+			receiverBufferPoolSize,
+			new Configuration()
+		);
+	}
+
 	/**
 	 * Initializes the throughput benchmark with the given parameters.
 	 *
@@ -78,9 +97,10 @@ public class StreamNetworkThroughputBenchmark {
 			int flushTimeout,
 			boolean localMode,
 			int senderBufferPoolSize,
-			int receiverBufferPoolSize) throws Exception {
+			int receiverBufferPoolSize,
+			Configuration config) throws Exception {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
-		environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize);
+		environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize, config);
 		receiver = environment.createReceiver();
 		writerThreads = new LongRecordWriterThread[recordWriters];
 		for (int writer = 0; writer < recordWriters; writer++) {