You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/13 09:03:52 UTC

[flink] branch release-1.5 updated (dd096eb -> 3b3cc12)

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from dd096eb  [FLINK-10011] Release JobGraph from SubmittedJobGraphStore in Dispatcher
     new 60f7853  [hotfix][benchmarks] fix StreamNetworkThroughputBenchmark#setUp not forwarding localMode
     new f1fc386  [hotfix][benchmarks] add @Override
     new 3b3cc12  [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../io/benchmark/SerializingLongReceiver.java      |  1 +
 .../StreamNetworkBenchmarkEnvironment.java         | 42 ++++++++++++++++------
 .../StreamNetworkPointToPointBenchmark.java        |  9 +++--
 .../StreamNetworkThroughputBenchmark.java          | 26 ++++++++++++--
 4 files changed, 63 insertions(+), 15 deletions(-)


[flink] 03/03: [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b3cc12459dded87fe96db3d9aa07cc049b7a446
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Fri Sep 7 13:38:54 2018 +0200

    [FLINK-10301][network] extend StreamNetworkBenchmarkEnvironment to allow custom Configuration instances
    
    This closes #6670.
---
 .../StreamNetworkBenchmarkEnvironment.java         | 42 ++++++++++++++++------
 .../StreamNetworkPointToPointBenchmark.java        |  9 +++--
 .../StreamNetworkThroughputBenchmark.java          | 24 +++++++++++--
 3 files changed, 61 insertions(+), 14 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 3e6dcf6..eabdb95 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -66,10 +66,6 @@ import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
  */
 public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
-	private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
-
-	private static final int NUM_SLOTS_AND_THREADS = 1;
-
 	private static final InetAddress LOCAL_ADDRESS;
 
 	static {
@@ -93,6 +89,21 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
 	protected ResultPartitionID[] partitionIds;
 
+	public void setUp(
+			int writers,
+			int channels,
+			boolean localMode,
+			int senderBufferPoolSize,
+			int receiverBufferPoolSize) throws Exception {
+		setUp(
+			writers,
+			channels,
+			localMode,
+			senderBufferPoolSize,
+			receiverBufferPoolSize,
+			new Configuration());
+	}
+
 	/**
 	 * Sets up the environment including buffer pools and netty threads.
 	 *
@@ -112,7 +123,8 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 			int channels,
 			boolean localMode,
 			int senderBufferPoolSize,
-			int receiverBufferPoolSize) throws Exception {
+			int receiverBufferPoolSize,
+			Configuration config) throws Exception {
 		this.localMode = localMode;
 		this.channels = channels;
 		this.partitionIds = new ResultPartitionID[writers];
@@ -125,13 +137,13 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
 		ioManager = new IOManagerAsync();
 
-		senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize);
+		senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize, config);
 		senderEnv.start();
 		if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
 			receiverEnv = senderEnv;
 		}
 		else {
-			receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize);
+			receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize, config);
 			receiverEnv.start();
 		}
 
@@ -176,12 +188,22 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 	}
 
 	private NetworkEnvironment createNettyNetworkEnvironment(
-			@SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
+			@SuppressWarnings("SameParameterValue") int bufferPoolSize, Configuration config) throws Exception {
+
+		int segmentSize = config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
+
+		// we need this because many configs have been written with a "-1" entry
+		// similar to TaskManagerServicesConfiguration#fromConfiguration()
+		// -> please note that this directly influences the number of netty threads!
+		int slots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+		if (slots == -1) {
+			slots = 1;
+		}
 
-		final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
+		final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, segmentSize);
 
 		final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
-			new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
+			new NettyConfig(LOCAL_ADDRESS, 0, segmentSize, slots, config));
 
 		return new NetworkEnvironment(
 			bufferPool,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 6b96c62..a8d18e4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.types.LongValue;
 
@@ -61,6 +62,10 @@ public class StreamNetworkPointToPointBenchmark {
 		recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
 	}
 
+	public void setUp(long flushTimeout) throws Exception {
+		setUp(flushTimeout, new Configuration());
+	}
+
 	/**
 	 * Initializes the throughput benchmark with the given parameters.
 	 *
@@ -68,9 +73,9 @@ public class StreamNetworkPointToPointBenchmark {
 	 * 		output flushing interval of the
 	 * 		{@link org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher thread
 	 */
-	public void setUp(long flushTimeout) throws Exception {
+	public void setUp(long flushTimeout, Configuration config) throws Exception {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
-		environment.setUp(1, 1, false, -1, -1);
+		environment.setUp(1, 1, false, -1, -1, config);
 
 		receiver = environment.createReceiver();
 		recordWriter = environment.createRecordWriter(0, flushTimeout);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index c55dd43..28d7f35 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.LongValue;
 
 import java.util.concurrent.CompletableFuture;
@@ -63,6 +64,24 @@ public class StreamNetworkThroughputBenchmark {
 		setUp(recordWriters, channels, flushTimeout, localMode, -1, -1);
 	}
 
+	public void setUp(
+			int recordWriters,
+			int channels,
+			int flushTimeout,
+			boolean localMode,
+			int senderBufferPoolSize,
+			int receiverBufferPoolSize) throws Exception {
+		setUp(
+			recordWriters,
+			channels,
+			flushTimeout,
+			localMode,
+			senderBufferPoolSize,
+			receiverBufferPoolSize,
+			new Configuration()
+		);
+	}
+
 	/**
 	 * Initializes the throughput benchmark with the given parameters.
 	 *
@@ -78,9 +97,10 @@ public class StreamNetworkThroughputBenchmark {
 			int flushTimeout,
 			boolean localMode,
 			int senderBufferPoolSize,
-			int receiverBufferPoolSize) throws Exception {
+			int receiverBufferPoolSize,
+			Configuration config) throws Exception {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
-		environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize);
+		environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize, config);
 		receiver = environment.createReceiver();
 		writerThreads = new LongRecordWriterThread[recordWriters];
 		for (int writer = 0; writer < recordWriters; writer++) {


[flink] 02/03: [hotfix][benchmarks] add @Override

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f1fc3869cbcf3596eb4604f1d2f347dd483e4ca8
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Mon Sep 10 09:56:51 2018 +0200

    [hotfix][benchmarks] add @Override
---
 .../flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java    | 1 +
 1 file changed, 1 insertion(+)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
index 580612c..a93e9d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SerializingLongReceiver.java
@@ -40,6 +40,7 @@ public class SerializingLongReceiver extends ReceiverThread {
 			});
 	}
 
+	@Override
 	protected void readRecords(long lastExpectedRecord) throws Exception {
 		LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord);
 		final LongValue value = new LongValue();


[flink] 01/03: [hotfix][benchmarks] fix StreamNetworkThroughputBenchmark#setUp not forwarding localMode

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 60f785372dfabc3c88bbb18ef1fed1abe23db29c
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Mon Sep 10 11:05:28 2018 +0200

    [hotfix][benchmarks] fix StreamNetworkThroughputBenchmark#setUp not forwarding localMode
---
 .../runtime/io/benchmark/StreamNetworkThroughputBenchmark.java          | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index 1b0ef8a..c55dd43 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -60,7 +60,7 @@ public class StreamNetworkThroughputBenchmark {
 	}
 
 	public void setUp(int recordWriters, int channels, int flushTimeout, boolean localMode) throws Exception {
-		setUp(recordWriters, channels, flushTimeout, false, -1, -1);
+		setUp(recordWriters, channels, flushTimeout, localMode, -1, -1);
 	}
 
 	/**