You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/06 17:47:56 UTC

[12/12] flink git commit: [hotfix] [config] Harmonize configuration keys for TaskManager network settings.

[hotfix] [config] Harmonize configuration keys for TaskManager network settings.

This preserves old config keys as deprecated keys where the key was already present
in an earlier release.

This also re-arranges config options to form logical sections in the file
and harmonized JavaDoc formatting style.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aed3b806
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aed3b806
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aed3b806

Branch: refs/heads/master
Commit: aed3b806461114e04e9d6c3c0f27bc75eefa8f47
Parents: 710c08b
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 4 21:40:26 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 19:41:53 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/TaskManagerOptions.java | 100 ++++++++------
 .../runtime/io/network/netty/NettyConfig.java   | 136 ++++++++-----------
 2 files changed, 116 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aed3b806/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index c5063d1..8480045 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -29,22 +29,41 @@ import static org.apache.flink.configuration.ConfigOptions.key;
 public class TaskManagerOptions {
 
 	// ------------------------------------------------------------------------
-	//  TaskManager Options
+	//  General TaskManager Options
 	// ------------------------------------------------------------------------
 
 	// @TODO Migrate 'taskmanager.*' config options from ConfigConstants
-	
-	/** Whether to kill the TaskManager when the task thread throws an OutOfMemoryError */
-	public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
-			key("taskmanager.jvm-exit-on-oom")
-			.defaultValue(false);
 
-	/** JVM heap size (in megabytes) for the TaskManagers */
+	/**
+	 * JVM heap size (in megabytes) for the TaskManagers
+	 */
 	public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY =
 			key("taskmanager.heap.mb")
 			.defaultValue(1024);
-		   
-	/** Size of memory buffers used by the network stack and the memory manager (in bytes). */
+
+	/**
+	 * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError
+	 */
+	public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
+			key("taskmanager.jvm-exit-on-oom")
+			.defaultValue(false);
+
+	/**
+	 * Whether the quarantine monitor for task managers shall be started. The quarantine monitor
+	 * shuts down the actor system if it detects that it has quarantined another actor system
+	 * or if it has been quarantined by another actor system.
+	 */
+	public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
+			key("taskmanager.exit-on-fatal-akka-error")
+			.defaultValue(false);
+
+	// ------------------------------------------------------------------------
+	//  Managed Memory Options
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Size of memory buffers used by the network stack and the memory manager (in bytes).
+	 */
 	public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
 			key("taskmanager.memory.segment-size")
 			.defaultValue(32768);
@@ -73,7 +92,9 @@ public class TaskManagerOptions {
 			key("taskmanager.memory.off-heap")
 			.defaultValue(false);
 
-	/** Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */
+	/**
+	 * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.
+	 */
 	public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
 			key("taskmanager.memory.preallocate")
 			.defaultValue(false);
@@ -94,53 +115,65 @@ public class TaskManagerOptions {
 			key("taskmanager.network.numberOfBuffers")
 			.defaultValue(2048);
 
-	/** Fraction of JVM memory to use for network buffers. */
+	/**
+	 * Fraction of JVM memory to use for network buffers.
+	 */
 	public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
 			key("taskmanager.network.memory.fraction")
 			.defaultValue(0.1f);
 
-	/** Minimum memory size for network buffers (in bytes) */
+	/**
+	 * Minimum memory size for network buffers (in bytes)
+	 */
 	public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MIN =
 			key("taskmanager.network.memory.min")
 			.defaultValue(64L << 20); // 64 MB
 
-	/** Maximum memory size for network buffers (in bytes) */
+	/**
+	 * Maximum memory size for network buffers (in bytes)
+	 */
 	public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MAX =
 			key("taskmanager.network.memory.max")
 			.defaultValue(1024L << 20); // 1 GB
 
-
-	/** Minimum backoff for partition requests of input channels. */
-	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
-			key("taskmanager.net.request-backoff.initial")
-			.defaultValue(100);
-
-	/** Maximum backoff for partition requests of input channels. */
-	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
-			key("taskmanager.net.request-backoff.max")
-			.defaultValue(10000);
-
-
 	/**
 	 * Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel).
 	 *
 	 * Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization
 	 */
 	public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
-		key("taskmanager.net.memory.buffers-per-channel")
+			key("taskmanager.network.memory.buffers-per-channel")
 			.defaultValue(2);
 
-	/** Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). */
+	/**
+	 * Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate).
+	 */
 	public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
-		key("taskmanager.net.memory.extra-buffers-per-gate")
+			key("taskmanager.network.memory.floating-buffers-per-gate")
 			.defaultValue(8);
 
 	/**
+	 * Minimum backoff for partition requests of input channels.
+	 */
+	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
+			key("taskmanager.network.request-backoff.initial")
+			.defaultValue(100)
+			.withDeprecatedKeys("taskmanager.net.request-backoff.initial");
+
+	/**
+	 * Maximum backoff for partition requests of input channels.
+	 */
+	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
+			key("taskmanager.network.request-backoff.max")
+			.defaultValue(10000)
+			.withDeprecatedKeys("taskmanager.net.request-backoff.max");
+
+	/**
 	 * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
 	 * lengths.
 	 */
 	public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
-			key("taskmanager.net.detailed-metrics")
+			key("taskmanager.network.detailed-metrics")
 			.defaultValue(false);
 
 	// ------------------------------------------------------------------------
@@ -176,15 +209,6 @@ public class TaskManagerOptions {
 			key("task.checkpoint.alignment.max-size")
 			.defaultValue(-1L);
 
-	/**
-	 * Whether the quarantine monitor for task managers shall be started. The quarantine monitor
-	 * shuts down the actor system if it detects that it has quarantined another actor system
-	 * or if it has been quarantined by another actor system.
-	 */
-	public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
-		key("taskmanager.exit-on-fatal-akka-error")
-		.defaultValue(false);
-
 	// ------------------------------------------------------------------------
 
 	/** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/aed3b806/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index b9a1b90..e716a82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.slf4j.Logger;
@@ -38,19 +40,40 @@ public class NettyConfig {
 
 	// - Config keys ----------------------------------------------------------
 
-	public static final String NUM_ARENAS = "taskmanager.net.num-arenas";
-
-	public static final String NUM_THREADS_SERVER = "taskmanager.net.server.numThreads";
-
-	public static final String NUM_THREADS_CLIENT = "taskmanager.net.client.numThreads";
-
-	public static final String CONNECT_BACKLOG = "taskmanager.net.server.backlog";
-
-	public static final String CLIENT_CONNECT_TIMEOUT_SECONDS = "taskmanager.net.client.connectTimeoutSec";
-
-	public static final String SEND_RECEIVE_BUFFER_SIZE = "taskmanager.net.sendReceiveBufferSize";
-
-	public static final String TRANSPORT_TYPE = "taskmanager.net.transport";
+	public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
+			.key("taskmanager.network.netty.num-arenas")
+			.defaultValue(-1)
+			.withDeprecatedKeys("taskmanager.net.num-arenas");
+
+	public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
+			.key("taskmanager.network.netty.server.numThreads")
+			.defaultValue(-1)
+			.withDeprecatedKeys("taskmanager.net.server.numThreads");
+
+	public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
+			.key("taskmanager.network.netty.client.numThreads")
+			.defaultValue(-1)
+			.withDeprecatedKeys("taskmanager.net.client.numThreads");
+
+	public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
+			.key("taskmanager.network.netty.server.backlog")
+			.defaultValue(0) // default: 0 => Netty's default
+			.withDeprecatedKeys("taskmanager.net.server.backlog");
+
+	public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
+			.key("taskmanager.network.netty.client.connectTimeoutSec")
+			.defaultValue(120) // default: 120s = 2min
+			.withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec");
+
+	public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
+			.key("taskmanager.network.netty.sendReceiveBufferSize")
+			.defaultValue(0) // default: 0 => Netty's default
+			.withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize");
+
+	public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
+			.key("taskmanager.network.netty.transport")
+			.defaultValue("nio")
+			.withDeprecatedKeys("taskmanager.net.transport");
 
 	// ------------------------------------------------------------------------
 
@@ -112,100 +135,49 @@ public class NettyConfig {
 	}
 
 	// ------------------------------------------------------------------------
-	// Setters
-	// ------------------------------------------------------------------------
-
-	public NettyConfig setServerConnectBacklog(int connectBacklog) {
-		checkArgument(connectBacklog >= 0);
-		config.setInteger(CONNECT_BACKLOG, connectBacklog);
-
-		return this;
-	}
-
-	public NettyConfig setServerNumThreads(int numThreads) {
-		checkArgument(numThreads >= 0);
-		config.setInteger(NUM_THREADS_SERVER, numThreads);
-
-		return this;
-	}
-
-	public NettyConfig setClientNumThreads(int numThreads) {
-		checkArgument(numThreads >= 0);
-		config.setInteger(NUM_THREADS_CLIENT, numThreads);
-
-		return this;
-	}
-
-	public NettyConfig setClientConnectTimeoutSeconds(int connectTimeoutSeconds) {
-		checkArgument(connectTimeoutSeconds >= 0);
-		config.setInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, connectTimeoutSeconds);
-
-		return this;
-	}
-
-	public NettyConfig setSendAndReceiveBufferSize(int bufferSize) {
-		checkArgument(bufferSize >= 0);
-		config.setInteger(SEND_RECEIVE_BUFFER_SIZE, bufferSize);
-
-		return this;
-	}
-
-	public NettyConfig setTransportType(String transport) {
-		if (transport.equals("nio") || transport.equals("epoll") || transport.equals("auto")) {
-			config.setString(TRANSPORT_TYPE, transport);
-		}
-		else {
-			throw new IllegalArgumentException("Unknown transport type.");
-		}
-
-		return this;
-	}
-
-	// ------------------------------------------------------------------------
 	// Getters
 	// ------------------------------------------------------------------------
 
 	public int getServerConnectBacklog() {
-		// default: 0 => Netty's default
-		return config.getInteger(CONNECT_BACKLOG, 0);
+		return config.getInteger(CONNECT_BACKLOG);
 	}
 
 	public int getNumberOfArenas() {
 		// default: number of slots
-		return config.getInteger(NUM_ARENAS, numberOfSlots);
+		final int configValue = config.getInteger(NUM_ARENAS);
+		return configValue == -1 ? numberOfSlots : configValue;
 	}
 
 	public int getServerNumThreads() {
 		// default: number of task slots
-		return config.getInteger(NUM_THREADS_SERVER, numberOfSlots);
+		final int configValue = config.getInteger(NUM_THREADS_SERVER);
+		return configValue == -1 ? numberOfSlots : configValue;
 	}
 
 	public int getClientNumThreads() {
 		// default: number of task slots
-		return config.getInteger(NUM_THREADS_CLIENT, numberOfSlots);
+		final int configValue = config.getInteger(NUM_THREADS_CLIENT);
+		return configValue == -1 ? numberOfSlots : configValue;
 	}
 
 	public int getClientConnectTimeoutSeconds() {
-		// default: 120s = 2min
-		return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, 120);
+		return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
 	}
 
 	public int getSendAndReceiveBufferSize() {
-		// default: 0 => Netty's default
-		return config.getInteger(SEND_RECEIVE_BUFFER_SIZE, 0);
+		return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
 	}
 
 	public TransportType getTransportType() {
-		String transport = config.getString(TRANSPORT_TYPE, "nio");
-
-		if (transport.equals("nio")) {
-			return TransportType.NIO;
-		}
-		else if (transport.equals("epoll")) {
-			return TransportType.EPOLL;
-		}
-		else {
-			return TransportType.AUTO;
+		String transport = config.getString(TRANSPORT_TYPE);
+
+		switch (transport) {
+			case "nio":
+				return TransportType.NIO;
+			case "epoll":
+				return TransportType.EPOLL;
+			default:
+				return TransportType.AUTO;
 		}
 	}