You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/06 17:47:56 UTC
[12/12] flink git commit: [hotfix] [config] Harmonize configuration
keys for TaskManager network settings.
[hotfix] [config] Harmonize configuration keys for TaskManager network settings.
This preserves old config keys as deprecated keys where the key was already present
in an earlier release.
This also re-arranges config options to form logical sections in the file
and harmonized JavaDoc formatting style.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aed3b806
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aed3b806
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aed3b806
Branch: refs/heads/master
Commit: aed3b806461114e04e9d6c3c0f27bc75eefa8f47
Parents: 710c08b
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 4 21:40:26 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 19:41:53 2017 +0200
----------------------------------------------------------------------
.../flink/configuration/TaskManagerOptions.java | 100 ++++++++------
.../runtime/io/network/netty/NettyConfig.java | 136 ++++++++-----------
2 files changed, 116 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/aed3b806/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index c5063d1..8480045 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -29,22 +29,41 @@ import static org.apache.flink.configuration.ConfigOptions.key;
public class TaskManagerOptions {
// ------------------------------------------------------------------------
- // TaskManager Options
+ // General TaskManager Options
// ------------------------------------------------------------------------
// @TODO Migrate 'taskmanager.*' config options from ConfigConstants
-
- /** Whether to kill the TaskManager when the task thread throws an OutOfMemoryError */
- public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
- key("taskmanager.jvm-exit-on-oom")
- .defaultValue(false);
- /** JVM heap size (in megabytes) for the TaskManagers */
+ /**
+ * JVM heap size (in megabytes) for the TaskManagers
+ */
public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY =
key("taskmanager.heap.mb")
.defaultValue(1024);
-
- /** Size of memory buffers used by the network stack and the memory manager (in bytes). */
+
+ /**
+ * Whether to kill the TaskManager when the task thread throws an OutOfMemoryError
+ */
+ public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
+ key("taskmanager.jvm-exit-on-oom")
+ .defaultValue(false);
+
+ /**
+ * Whether the quarantine monitor for task managers shall be started. The quarantine monitor
+ * shuts down the actor system if it detects that it has quarantined another actor system
+ * or if it has been quarantined by another actor system.
+ */
+ public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
+ key("taskmanager.exit-on-fatal-akka-error")
+ .defaultValue(false);
+
+ // ------------------------------------------------------------------------
+ // Managed Memory Options
+ // ------------------------------------------------------------------------
+
+ /**
+ * Size of memory buffers used by the network stack and the memory manager (in bytes).
+ */
public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
.defaultValue(32768);
@@ -73,7 +92,9 @@ public class TaskManagerOptions {
key("taskmanager.memory.off-heap")
.defaultValue(false);
- /** Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */
+ /**
+ * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.
+ */
public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
key("taskmanager.memory.preallocate")
.defaultValue(false);
@@ -94,53 +115,65 @@ public class TaskManagerOptions {
key("taskmanager.network.numberOfBuffers")
.defaultValue(2048);
- /** Fraction of JVM memory to use for network buffers. */
+ /**
+ * Fraction of JVM memory to use for network buffers.
+ */
public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
key("taskmanager.network.memory.fraction")
.defaultValue(0.1f);
- /** Minimum memory size for network buffers (in bytes) */
+ /**
+ * Minimum memory size for network buffers (in bytes)
+ */
public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MIN =
key("taskmanager.network.memory.min")
.defaultValue(64L << 20); // 64 MB
- /** Maximum memory size for network buffers (in bytes) */
+ /**
+ * Maximum memory size for network buffers (in bytes)
+ */
public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MAX =
key("taskmanager.network.memory.max")
.defaultValue(1024L << 20); // 1 GB
-
- /** Minimum backoff for partition requests of input channels. */
- public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
- key("taskmanager.net.request-backoff.initial")
- .defaultValue(100);
-
- /** Maximum backoff for partition requests of input channels. */
- public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
- key("taskmanager.net.request-backoff.max")
- .defaultValue(10000);
-
-
/**
* Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel).
*
* Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization
*/
public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
- key("taskmanager.net.memory.buffers-per-channel")
+ key("taskmanager.network.memory.buffers-per-channel")
.defaultValue(2);
- /** Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). */
+ /**
+ * Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate).
+ */
public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
- key("taskmanager.net.memory.extra-buffers-per-gate")
+ key("taskmanager.network.memory.floating-buffers-per-gate")
.defaultValue(8);
/**
+ * Minimum backoff for partition requests of input channels.
+ */
+ public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
+ key("taskmanager.network.request-backoff.initial")
+ .defaultValue(100)
+ .withDeprecatedKeys("taskmanager.net.request-backoff.initial");
+
+ /**
+ * Maximum backoff for partition requests of input channels.
+ */
+ public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
+ key("taskmanager.network.request-backoff.max")
+ .defaultValue(10000)
+ .withDeprecatedKeys("taskmanager.net.request-backoff.max");
+
+ /**
* Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
* lengths.
*/
public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
- key("taskmanager.net.detailed-metrics")
+ key("taskmanager.network.detailed-metrics")
.defaultValue(false);
// ------------------------------------------------------------------------
@@ -176,15 +209,6 @@ public class TaskManagerOptions {
key("task.checkpoint.alignment.max-size")
.defaultValue(-1L);
- /**
- * Whether the quarantine monitor for task managers shall be started. The quarantine monitor
- * shuts down the actor system if it detects that it has quarantined another actor system
- * or if it has been quarantined by another actor system.
- */
- public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
- key("taskmanager.exit-on-fatal-akka-error")
- .defaultValue(false);
-
// ------------------------------------------------------------------------
/** Not intended to be instantiated */
http://git-wip-us.apache.org/repos/asf/flink/blob/aed3b806/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index b9a1b90..e716a82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.net.SSLUtils;
import org.slf4j.Logger;
@@ -38,19 +40,40 @@ public class NettyConfig {
// - Config keys ----------------------------------------------------------
- public static final String NUM_ARENAS = "taskmanager.net.num-arenas";
-
- public static final String NUM_THREADS_SERVER = "taskmanager.net.server.numThreads";
-
- public static final String NUM_THREADS_CLIENT = "taskmanager.net.client.numThreads";
-
- public static final String CONNECT_BACKLOG = "taskmanager.net.server.backlog";
-
- public static final String CLIENT_CONNECT_TIMEOUT_SECONDS = "taskmanager.net.client.connectTimeoutSec";
-
- public static final String SEND_RECEIVE_BUFFER_SIZE = "taskmanager.net.sendReceiveBufferSize";
-
- public static final String TRANSPORT_TYPE = "taskmanager.net.transport";
+ public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
+ .key("taskmanager.network.netty.num-arenas")
+ .defaultValue(-1)
+ .withDeprecatedKeys("taskmanager.net.num-arenas");
+
+ public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
+ .key("taskmanager.network.netty.server.numThreads")
+ .defaultValue(-1)
+ .withDeprecatedKeys("taskmanager.net.server.numThreads");
+
+ public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
+ .key("taskmanager.network.netty.client.numThreads")
+ .defaultValue(-1)
+ .withDeprecatedKeys("taskmanager.net.client.numThreads");
+
+ public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
+ .key("taskmanager.network.netty.server.backlog")
+ .defaultValue(0) // default: 0 => Netty's default
+ .withDeprecatedKeys("taskmanager.net.server.backlog");
+
+ public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
+ .key("taskmanager.network.netty.client.connectTimeoutSec")
+ .defaultValue(120) // default: 120s = 2min
+ .withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec");
+
+ public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
+ .key("taskmanager.network.netty.sendReceiveBufferSize")
+ .defaultValue(0) // default: 0 => Netty's default
+ .withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize");
+
+ public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
+ .key("taskmanager.network.netty.transport")
+ .defaultValue("nio")
+ .withDeprecatedKeys("taskmanager.net.transport");
// ------------------------------------------------------------------------
@@ -112,100 +135,49 @@ public class NettyConfig {
}
// ------------------------------------------------------------------------
- // Setters
- // ------------------------------------------------------------------------
-
- public NettyConfig setServerConnectBacklog(int connectBacklog) {
- checkArgument(connectBacklog >= 0);
- config.setInteger(CONNECT_BACKLOG, connectBacklog);
-
- return this;
- }
-
- public NettyConfig setServerNumThreads(int numThreads) {
- checkArgument(numThreads >= 0);
- config.setInteger(NUM_THREADS_SERVER, numThreads);
-
- return this;
- }
-
- public NettyConfig setClientNumThreads(int numThreads) {
- checkArgument(numThreads >= 0);
- config.setInteger(NUM_THREADS_CLIENT, numThreads);
-
- return this;
- }
-
- public NettyConfig setClientConnectTimeoutSeconds(int connectTimeoutSeconds) {
- checkArgument(connectTimeoutSeconds >= 0);
- config.setInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, connectTimeoutSeconds);
-
- return this;
- }
-
- public NettyConfig setSendAndReceiveBufferSize(int bufferSize) {
- checkArgument(bufferSize >= 0);
- config.setInteger(SEND_RECEIVE_BUFFER_SIZE, bufferSize);
-
- return this;
- }
-
- public NettyConfig setTransportType(String transport) {
- if (transport.equals("nio") || transport.equals("epoll") || transport.equals("auto")) {
- config.setString(TRANSPORT_TYPE, transport);
- }
- else {
- throw new IllegalArgumentException("Unknown transport type.");
- }
-
- return this;
- }
-
- // ------------------------------------------------------------------------
// Getters
// ------------------------------------------------------------------------
public int getServerConnectBacklog() {
- // default: 0 => Netty's default
- return config.getInteger(CONNECT_BACKLOG, 0);
+ return config.getInteger(CONNECT_BACKLOG);
}
public int getNumberOfArenas() {
// default: number of slots
- return config.getInteger(NUM_ARENAS, numberOfSlots);
+ final int configValue = config.getInteger(NUM_ARENAS);
+ return configValue == -1 ? numberOfSlots : configValue;
}
public int getServerNumThreads() {
// default: number of task slots
- return config.getInteger(NUM_THREADS_SERVER, numberOfSlots);
+ final int configValue = config.getInteger(NUM_THREADS_SERVER);
+ return configValue == -1 ? numberOfSlots : configValue;
}
public int getClientNumThreads() {
// default: number of task slots
- return config.getInteger(NUM_THREADS_CLIENT, numberOfSlots);
+ final int configValue = config.getInteger(NUM_THREADS_CLIENT);
+ return configValue == -1 ? numberOfSlots : configValue;
}
public int getClientConnectTimeoutSeconds() {
- // default: 120s = 2min
- return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, 120);
+ return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
}
public int getSendAndReceiveBufferSize() {
- // default: 0 => Netty's default
- return config.getInteger(SEND_RECEIVE_BUFFER_SIZE, 0);
+ return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
}
public TransportType getTransportType() {
- String transport = config.getString(TRANSPORT_TYPE, "nio");
-
- if (transport.equals("nio")) {
- return TransportType.NIO;
- }
- else if (transport.equals("epoll")) {
- return TransportType.EPOLL;
- }
- else {
- return TransportType.AUTO;
+ String transport = config.getString(TRANSPORT_TYPE);
+
+ switch (transport) {
+ case "nio":
+ return TransportType.NIO;
+ case "epoll":
+ return TransportType.EPOLL;
+ default:
+ return TransportType.AUTO;
}
}