You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/23 16:09:55 UTC

[flink] branch master updated: [FLINK-12127][network] Consolidate network options in NetworkEnvironmentOptions

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f39736  [FLINK-12127][network] Consolidate network options in NetworkEnvironmentOptions
2f39736 is described below

commit 2f397367164c8f915304f6317bf0b9dfde615ea9
Author: zhijiang <wa...@aliyun.com>
AuthorDate: Fri May 24 00:09:40 2019 +0800

    [FLINK-12127][network] Consolidate network options in NetworkEnvironmentOptions
---
 .../network_environment_configuration.html         |  61 ++++++
 ...ation.html => network_netty_configuration.html} |   0
 .../generated/task_manager_configuration.html      |  50 -----
 docs/ops/config.md                                 |   8 +-
 .../flink/addons/hbase/HBaseConnectorITCase.java   |   4 +-
 .../flink/configuration/ConfigConstants.java       |  12 +-
 .../configuration/NetworkEnvironmentOptions.java   | 216 +++++++++++++++++++++
 .../flink/configuration/TaskManagerOptions.java    | 129 ------------
 ...TaskManagerHeapSizeCalculationJavaBashTest.java |  19 +-
 .../configuration/ConfigOptionsDocGenerator.java   |   1 -
 .../io/network/buffer/NetworkBufferPool.java       |  14 +-
 .../runtime/io/network/netty/NettyConfig.java      |  67 +------
 .../NetworkEnvironmentConfiguration.java           |  85 ++++----
 .../runtime/io/network/NetworkEnvironmentTest.java |   6 +-
 .../network/netty/NettyConnectionManagerTest.java  |   7 +-
 .../PartialConsumePipelinedResultTest.java         |   4 +-
 .../taskexecutor/NetworkBufferCalculationTest.java |  13 +-
 .../NetworkEnvironmentConfigurationTest.java       |  91 ++++-----
 .../taskexecutor/TaskExecutorSubmissionTest.java   |  11 +-
 .../runtime/taskexecutor/TaskExecutorTest.java     |   7 +-
 .../taskexecutor/TaskManagerRunnerStartupTest.java |   3 +-
 .../TaskManagerServicesConfigurationTest.java      |  18 +-
 .../TaskSubmissionTestEnvironment.java             |   8 +-
 .../TaskCancelAsyncProducerConsumerITCase.java     |   3 +-
 .../streaming/runtime/io/InputProcessorUtil.java   |   3 +-
 .../StreamNetworkThroughputBenchmarkTest.java      |   8 +-
 .../flink/test/cancelling/CancelingTestBase.java   |   3 +-
 .../EventTimeWindowCheckpointingITCase.java        |   3 +-
 .../manual/StreamingScalabilityAndLatency.java     |   3 +-
 .../SuccessAfterNetworkBuffersFailureITCase.java   |   3 +-
 ...tractTaskManagerProcessFailureRecoveryTest.java |   3 +-
 .../JobManagerHAProcessFailureRecoveryITCase.java  |   3 +-
 .../recovery/ProcessFailureCancelingITCase.java    |   3 +-
 .../flink/test/runtime/NettyEpollITCase.java       |   4 +-
 .../apache/flink/yarn/YarnConfigurationITCase.java |   6 +-
 35 files changed, 474 insertions(+), 405 deletions(-)

diff --git a/docs/_includes/generated/network_environment_configuration.html b/docs/_includes/generated/network_environment_configuration.html
new file mode 100644
index 0000000..7787ed0
--- /dev/null
+++ b/docs/_includes/generated/network_environment_configuration.html
@@ -0,0 +1,61 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>taskmanager.data.port</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>The task manager’s port used for data exchange operations.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.data.ssl.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.detailed-metrics</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
+            <td style="word-wrap: break-word;">8</td>
+            <td>Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of highe [...]
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.fraction</h5></td>
+            <td style="word-wrap: break-word;">0.1</td>
+            <td>Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that "taskmanager.network.memory.min"` and "taskmanager.network.memory.max" may override this fraction.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.max</h5></td>
+            <td style="word-wrap: break-word;">"1gb"</td>
+            <td>Maximum memory size for network buffers.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.memory.min</h5></td>
+            <td style="word-wrap: break-word;">"64mb"</td>
+            <td>Minimum memory size for network buffers.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.request-backoff.initial</h5></td>
+            <td style="word-wrap: break-word;">100</td>
+            <td>Minimum backoff in milliseconds for partition requests of input channels.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.network.request-backoff.max</h5></td>
+            <td style="word-wrap: break-word;">10000</td>
+            <td>Maximum backoff in milliseconds for partition requests of input channels.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/docs/_includes/generated/netty_configuration.html b/docs/_includes/generated/network_netty_configuration.html
similarity index 100%
rename from docs/_includes/generated/netty_configuration.html
rename to docs/_includes/generated/network_netty_configuration.html
diff --git a/docs/_includes/generated/task_manager_configuration.html b/docs/_includes/generated/task_manager_configuration.html
index 395a613..16da6d5 100644
--- a/docs/_includes/generated/task_manager_configuration.html
+++ b/docs/_includes/generated/task_manager_configuration.html
@@ -28,16 +28,6 @@
             <td>The maximum number of bytes that a checkpoint alignment may buffer. If the checkpoint alignment buffers more than the configured amount of data, the checkpoint is aborted (skipped). A value of -1 indicates that there is no limit.</td>
         </tr>
         <tr>
-            <td><h5>taskmanager.data.port</h5></td>
-            <td style="word-wrap: break-word;">0</td>
-            <td>The task manager’s port used for data exchange operations.</td>
-        </tr>
-        <tr>
-            <td><h5>taskmanager.data.ssl.enabled</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true</td>
-        </tr>
-        <tr>
             <td><h5>taskmanager.debug.memory.log</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.</td>
@@ -74,46 +64,6 @@
 <ul><li>"name" - uses hostname as binding address</li><li>"ip" - uses host's ip address as binding address</li></ul></td>
         </tr>
         <tr>
-            <td><h5>taskmanager.network.detailed-metrics</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.</td>
-        </tr>
-        <tr>
-            <td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
-            <td style="word-wrap: break-word;">2</td>
-            <td>Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.</td>
-        </tr>
-        <tr>
-            <td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
-            <td style="word-wrap: break-word;">8</td>
-            <td>Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of highe [...]
-        </tr>
-        <tr>
-            <td><h5>taskmanager.network.memory.fraction</h5></td>
-            <td style="word-wrap: break-word;">0.1</td>
-            <td>Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that "taskmanager.network.memory.min"` and "taskmanager.network.memory.max" may override this fraction.</td>
-        </tr>
-        <tr>
-            <td><h5>taskmanager.network.memory.max</h5></td>
-            <td style="word-wrap: break-word;">"1gb"</td>
-            <td>Maximum memory size for network buffers.</td>
-        </tr>
-        <tr>
-            <td><h5>taskmanager.network.memory.min</h5></td>
-            <td style="word-wrap: break-word;">"64mb"</td>
-            <td>Minimum memory size for network buffers.</td>
-        </tr>
-        <tr>
-            <td><h5>taskmanager.network.request-backoff.initial</h5></td>
-            <td style="word-wrap: break-word;">100</td>
-            <td>Minimum backoff in milliseconds for partition requests of input channels.</td>
-        </tr>
-        <tr>
-            <td><h5>taskmanager.network.request-backoff.max</h5></td>
-            <td style="word-wrap: break-word;">10000</td>
-            <td>Maximum backoff in milliseconds for partition requests of input channels.</td>
-        </tr>
-        <tr>
             <td><h5>taskmanager.numberOfTaskSlots</h5></td>
             <td style="word-wrap: break-word;">1</td>
             <td>The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e. [...]
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 3ae46ee..9fb660b 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -100,11 +100,15 @@ The default fraction for managed memory can be adjusted using the taskmanager.me
 
 {% include generated/security_configuration.html %}
 
-### Network communication (via Netty)
+### Network Environment
+
+{% include generated/network_environment_configuration.html %}
+
+### Network Communication (via Netty)
 
 These parameters allow for advanced tuning. The default values are sufficient when running concurrent high-throughput jobs on a large cluster.
 
-{% include generated/netty_configuration.html %}
+{% include generated/network_netty_configuration.html %}
 
 ### Web Frontend
 
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index 30e0e5b..20ba45e 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
@@ -362,7 +362,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		public static void setAsContext() {
 			Configuration config = new Configuration();
 			// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
-			config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
+			config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
 			final LocalEnvironment le = new LocalEnvironment(config);
 
 			initializeContextEnvironment(new ExecutionEnvironmentFactory() {
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 9dce4ba..56a264c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -204,7 +204,7 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_IPC_PORT_KEY = "taskmanager.rpc.port";
 
 	/**
-	 * @deprecated use {@link TaskManagerOptions#DATA_PORT} instead
+	 * @deprecated use {@link NetworkEnvironmentOptions#DATA_PORT} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_DATA_PORT_KEY = "taskmanager.data.port";
@@ -212,7 +212,7 @@ public final class ConfigConstants {
 	/**
 	 * Config parameter to override SSL support for taskmanager's data transport.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#DATA_SSL_ENABLED} instead
+	 * @deprecated use {@link NetworkEnvironmentOptions#DATA_SSL_ENABLED} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_DATA_SSL_ENABLED = "taskmanager.data.ssl.enabled";
@@ -270,7 +270,7 @@ public final class ConfigConstants {
 	 * The config parameter defining the number of buffers used in the network stack. This defines the
 	 * number of possible tasks and shuffles.
 	 *
-	 * @deprecated Use {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} instead
+	 * @deprecated Use {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} instead
 	 */
 	@Deprecated
 	public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
@@ -1392,7 +1392,7 @@ public final class ConfigConstants {
 	 * The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that
 	 * the TaskManager searches for a free port.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#DATA_PORT} instead
+	 * @deprecated use {@link NetworkEnvironmentOptions#DATA_PORT} instead
 	 */
 	@Deprecated
 	public static final int DEFAULT_TASK_MANAGER_DATA_PORT = 0;
@@ -1400,7 +1400,7 @@ public final class ConfigConstants {
 	/**
 	 * The default value to override ssl support for task manager's data transport.
 	 *
-	 * @deprecated use {@link TaskManagerOptions#DATA_SSL_ENABLED} instead
+	 * @deprecated use {@link NetworkEnvironmentOptions#DATA_SSL_ENABLED} instead
 	 */
 	@Deprecated
 	public static final boolean DEFAULT_TASK_MANAGER_DATA_SSL_ENABLED = true;
@@ -1424,7 +1424,7 @@ public final class ConfigConstants {
 	/**
 	 * Config key has been deprecated. Therefore, no default value required.
 	 *
-	 * @deprecated {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} provides the default value now
+	 * @deprecated {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} provides the default value now
 	 */
 	@Deprecated
 	public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NetworkEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NetworkEnvironmentOptions.java
new file mode 100644
index 0000000..920cf5b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/NetworkEnvironmentOptions.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.ConfigGroup;
+import org.apache.flink.annotation.docs.ConfigGroups;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to network stack.
+ */
+@PublicEvolving
+@ConfigGroups(groups = @ConfigGroup(name = "NetworkNetty", keyPrefix = "taskmanager.network.netty"))
+public class NetworkEnvironmentOptions {
+
+	// ------------------------------------------------------------------------
+	//  Network General Options
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that
+	 * the TaskManager searches for a free port.
+	 */
+	public static final ConfigOption<Integer> DATA_PORT =
+		key("taskmanager.data.port")
+			.defaultValue(0)
+			.withDescription("The task manager’s port used for data exchange operations.");
+
+	/**
+	 * Config parameter to override SSL support for taskmanager's data transport.
+	 */
+	public static final ConfigOption<Boolean> DATA_SSL_ENABLED =
+		key("taskmanager.data.ssl.enabled")
+			.defaultValue(true)
+			.withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" +
+				" global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true");
+
+	/**
+	 * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
+	 * lengths.
+	 */
+	public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
+		key("taskmanager.network.detailed-metrics")
+			.defaultValue(false)
+			.withDescription("Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.");
+
+	/**
+	 * Boolean flag to enable/disable network credit-based flow control.
+	 *
+	 * @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of
+	 * credit-based flow control.
+	 */
+	@Deprecated
+	public static final ConfigOption<Boolean> NETWORK_CREDIT_MODEL =
+		key("taskmanager.network.credit-model")
+			.defaultValue(true)
+			.withDeprecatedKeys("taskmanager.network.credit-based-flow-control.enabled")
+			.withDescription("Boolean flag to enable/disable network credit-based flow control.");
+
+	/**
+	 * Number of buffers used in the network stack. This defines the number of possible tasks and
+	 * shuffles.
+	 *
+	 * @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
+	 * and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
+	 */
+	@Deprecated
+	public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
+		key("taskmanager.network.numberOfBuffers")
+			.defaultValue(2048);
+
+	/**
+	 * Fraction of JVM memory to use for network buffers.
+	 */
+	public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
+		key("taskmanager.network.memory.fraction")
+			.defaultValue(0.1f)
+			.withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
+				" data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
+				" are. If a job is rejected or you get a warning that the system has not enough buffers available," +
+				" increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
+				"` and \"taskmanager.network.memory.max\" may override this fraction.");
+
+	/**
+	 * Minimum memory size for network buffers.
+	 */
+	public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
+		key("taskmanager.network.memory.min")
+			.defaultValue("64mb")
+			.withDescription("Minimum memory size for network buffers.");
+
+	/**
+	 * Maximum memory size for network buffers.
+	 */
+	public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
+		key("taskmanager.network.memory.max")
+			.defaultValue("1gb")
+			.withDescription("Maximum memory size for network buffers.");
+
+	/**
+	 * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
+	 *
+	 * <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
+	 */
+	public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
+		key("taskmanager.network.memory.buffers-per-channel")
+			.defaultValue(2)
+			.withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
+				"In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
+				" configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
+				" for parallel serialization.");
+
+	/**
+	 * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
+	 */
+	public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
+		key("taskmanager.network.memory.floating-buffers-per-gate")
+			.defaultValue(8)
+			.withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
+				" In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
+				" The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
+				" help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
+				" increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");
+
+	// ------------------------------------------------------------------------
+	//  Netty Options
+	// ------------------------------------------------------------------------
+
+	public static final ConfigOption<Integer> NUM_ARENAS =
+		key("taskmanager.network.netty.num-arenas")
+			.defaultValue(-1)
+			.withDeprecatedKeys("taskmanager.net.num-arenas")
+			.withDescription("The number of Netty arenas.");
+
+	public static final ConfigOption<Integer> NUM_THREADS_SERVER =
+		key("taskmanager.network.netty.server.numThreads")
+			.defaultValue(-1)
+			.withDeprecatedKeys("taskmanager.net.server.numThreads")
+			.withDescription("The number of Netty server threads.");
+
+	public static final ConfigOption<Integer> NUM_THREADS_CLIENT =
+		key("taskmanager.network.netty.client.numThreads")
+			.defaultValue(-1)
+			.withDeprecatedKeys("taskmanager.net.client.numThreads")
+			.withDescription("The number of Netty client threads.");
+
+	public static final ConfigOption<Integer> CONNECT_BACKLOG =
+		key("taskmanager.network.netty.server.backlog")
+			.defaultValue(0) // default: 0 => Netty's default
+			.withDeprecatedKeys("taskmanager.net.server.backlog")
+			.withDescription("The netty server connection backlog.");
+
+	public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS =
+		key("taskmanager.network.netty.client.connectTimeoutSec")
+			.defaultValue(120) // default: 120s = 2min
+			.withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
+			.withDescription("The Netty client connection timeout.");
+
+	public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE =
+		key("taskmanager.network.netty.sendReceiveBufferSize")
+			.defaultValue(0) // default: 0 => Netty's default
+			.withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
+			.withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
+				" (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");
+
+	public static final ConfigOption<String> TRANSPORT_TYPE =
+		key("taskmanager.network.netty.transport")
+			.defaultValue("nio")
+			.withDeprecatedKeys("taskmanager.net.transport")
+			.withDescription("The Netty transport type, either \"nio\" or \"epoll\"");
+
+	// ------------------------------------------------------------------------
+	//  Partition Request Options
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Minimum backoff for partition requests of input channels.
+	 */
+	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
+		key("taskmanager.network.request-backoff.initial")
+			.defaultValue(100)
+			.withDeprecatedKeys("taskmanager.net.request-backoff.initial")
+			.withDescription("Minimum backoff in milliseconds for partition requests of input channels.");
+
+	/**
+	 * Maximum backoff for partition requests of input channels.
+	 */
+	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
+		key("taskmanager.network.request-backoff.max")
+			.defaultValue(10000)
+			.withDeprecatedKeys("taskmanager.net.request-backoff.max")
+			.withDescription("Maximum backoff in milliseconds for partition requests of input channels.");
+
+	// ------------------------------------------------------------------------
+
+	/** Not intended to be instantiated. */
+	private NetworkEnvironmentOptions() {}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 8b51bed..351b9f4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -109,24 +109,6 @@ public class TaskManagerOptions {
 				" collisions when multiple TaskManagers are running on the same machine.");
 
 	/**
-	 * The default network port the task manager expects to receive transfer envelopes on. The {@code 0} means that
-	 * the TaskManager searches for a free port.
-	 */
-	public static final ConfigOption<Integer> DATA_PORT =
-		key("taskmanager.data.port")
-			.defaultValue(0)
-			.withDescription("The task manager’s port used for data exchange operations.");
-
-	/**
-	 * Config parameter to override SSL support for taskmanager's data transport.
-	 */
-	public static final ConfigOption<Boolean> DATA_SSL_ENABLED =
-		key("taskmanager.data.ssl.enabled")
-			.defaultValue(true)
-			.withDescription("Enable SSL support for the taskmanager data transport. This is applicable only when the" +
-				" global flag for internal SSL (" + SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true");
-
-	/**
 	 * The initial registration backoff between two consecutive registration attempts. The backoff
 	 * is doubled for each new registration attempt until it reaches the maximum registration backoff.
 	 */
@@ -263,10 +245,6 @@ public class TaskManagerOptions {
 					" GC. For streaming setups is is highly recommended to set this value to false as the core state" +
 					" backends currently do not use the managed memory.", code(MEMORY_OFF_HEAP.key())).build());
 
-	// ------------------------------------------------------------------------
-	//  Network Options
-	// ------------------------------------------------------------------------
-
 	/**
 	 * The config parameter for automatically defining the TaskManager's binding address,
 	 * if {@link #HOST} configuration option is not set.
@@ -282,113 +260,6 @@ public class TaskManagerOptions {
 					text("\"ip\" - uses host's ip address as binding address"))
 				.build());
 
-
-	/**
-	 * Number of buffers used in the network stack. This defines the number of possible tasks and
-	 * shuffles.
-	 *
-	 * @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
-	 * and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
-	 */
-	@Deprecated
-	public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
-			key("taskmanager.network.numberOfBuffers")
-			.defaultValue(2048);
-
-	/**
-	 * Fraction of JVM memory to use for network buffers.
-	 */
-	public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
-			key("taskmanager.network.memory.fraction")
-			.defaultValue(0.1f)
-			.withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
-				" data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
-				" are. If a job is rejected or you get a warning that the system has not enough buffers available," +
-				" increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
-				"` and \"taskmanager.network.memory.max\" may override this fraction.");
-
-	/**
-	 * Minimum memory size for network buffers.
-	 */
-	public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
-			key("taskmanager.network.memory.min")
-			.defaultValue("64mb")
-			.withDescription("Minimum memory size for network buffers.");
-
-	/**
-	 * Maximum memory size for network buffers.
-	 */
-	public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
-			key("taskmanager.network.memory.max")
-			.defaultValue("1gb")
-			.withDescription("Maximum memory size for network buffers.");
-
-	/**
-	 * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
-	 *
-	 * <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
-	 */
-	public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
-			key("taskmanager.network.memory.buffers-per-channel")
-			.defaultValue(2)
-			.withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
-				"In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
-				" configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
-				" for parallel serialization.");
-
-	/**
-	 * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
-	 */
-	public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
-			key("taskmanager.network.memory.floating-buffers-per-gate")
-			.defaultValue(8)
-			.withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
-				" In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
-				" The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
-				" help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
-				" increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");
-
-
-	/**
-	 * Minimum backoff for partition requests of input channels.
-	 */
-	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
-			key("taskmanager.network.request-backoff.initial")
-			.defaultValue(100)
-			.withDeprecatedKeys("taskmanager.net.request-backoff.initial")
-			.withDescription("Minimum backoff in milliseconds for partition requests of input channels.");
-
-	/**
-	 * Maximum backoff for partition requests of input channels.
-	 */
-	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
-			key("taskmanager.network.request-backoff.max")
-			.defaultValue(10000)
-			.withDeprecatedKeys("taskmanager.net.request-backoff.max")
-			.withDescription("Maximum backoff in milliseconds for partition requests of input channels.");
-
-	/**
-	 * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
-	 * lengths.
-	 */
-	public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
-			key("taskmanager.network.detailed-metrics")
-			.defaultValue(false)
-			.withDescription("Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.");
-
-	/**
-	 * Boolean flag to enable/disable network credit-based flow control.
-	 *
-	 * @deprecated Will be removed for Flink 1.6 when the old code will be dropped in favour of
-	 * credit-based flow control.
-	 */
-	@Deprecated
-	public static final ConfigOption<Boolean> NETWORK_CREDIT_MODEL =
-			key("taskmanager.network.credit-model")
-			.defaultValue(true)
-			.withDeprecatedKeys("taskmanager.network.credit-based-flow-control.enabled")
-			.withDescription("Boolean flag to enable/disable network credit-based flow control.");
-
 	// ------------------------------------------------------------------------
 	//  Task Options
 	// ------------------------------------------------------------------------
diff --git a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
index 4c5c39f..9d6b638 100644
--- a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
+++ b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.dist;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
@@ -158,9 +159,9 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 		config.setLong(KEY_TASKM_MEM_SIZE, javaMemMB);
 		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap);
 
-		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(netBufMemMin));
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));
+		config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(netBufMemMin));
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));
 
 		if (managedMemSizeMB == 0) {
 			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
@@ -232,9 +233,9 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 
 		String[] command = {"src/test/bin/calcTMNetBufMem.sh",
 			totalJavaMemorySizeMB + "m",
-			String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
-			config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN),
-			config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)};
+			String.valueOf(config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
+			config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
+			config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)};
 
 		String scriptOutput = executeScript(command);
 
@@ -271,9 +272,9 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 		String[] command = {"src/test/bin/calcTMHeapSizeMB.sh",
 			totalJavaMemorySizeMB + "m",
 			String.valueOf(config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)),
-			String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
-			config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN),
-			config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX),
+			String.valueOf(config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
+			config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN),
+			config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX),
 			config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
 			String.valueOf(config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION))};
 		String scriptOutput = executeScript(command);
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index ed8c237..0cd9022 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -57,7 +57,6 @@ public class ConfigOptionsDocGenerator {
 
 	static final OptionsClassLocation[] LOCATIONS = new OptionsClassLocation[]{
 		new OptionsClassLocation("flink-core", "org.apache.flink.configuration"),
-		new OptionsClassLocation("flink-runtime", "org.apache.flink.runtime.io.network.netty"),
 		new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 3d6c2da..2053100 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemorySegmentProvider;
@@ -149,9 +149,9 @@ public class NetworkBufferPool implements BufferPoolFactory, MemorySegmentProvid
 						totalNumberOfMemorySegments - numTotalRequiredBuffers,
 						totalNumberOfMemorySegments,
 						memorySegmentSize,
-						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
-						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
-						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
+						NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+						NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+						NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
 			}
 
 			this.numTotalRequiredBuffers += numberOfSegmentsToRequest;
@@ -284,9 +284,9 @@ public class NetworkBufferPool implements BufferPoolFactory, MemorySegmentProvid
 						totalNumberOfMemorySegments - numTotalRequiredBuffers,
 						totalNumberOfMemorySegments,
 						memorySegmentSize,
-						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
-						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
-						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
+						NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+						NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+						NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key()));
 			}
 
 			this.numTotalRequiredBuffers += numRequiredBuffers;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 2d23747..f0a96fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.runtime.net.SSLUtils;
 
 import org.slf4j.Logger;
@@ -38,53 +36,6 @@ public class NettyConfig {
 
 	private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);
 
-	// - Config keys ----------------------------------------------------------
-
-	public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
-			.key("taskmanager.network.netty.num-arenas")
-			.defaultValue(-1)
-			.withDeprecatedKeys("taskmanager.net.num-arenas")
-			.withDescription("The number of Netty arenas.");
-
-	public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
-			.key("taskmanager.network.netty.server.numThreads")
-			.defaultValue(-1)
-			.withDeprecatedKeys("taskmanager.net.server.numThreads")
-			.withDescription("The number of Netty server threads.");
-
-	public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
-			.key("taskmanager.network.netty.client.numThreads")
-			.defaultValue(-1)
-			.withDeprecatedKeys("taskmanager.net.client.numThreads")
-			.withDescription("The number of Netty client threads.");
-
-	public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
-			.key("taskmanager.network.netty.server.backlog")
-			.defaultValue(0) // default: 0 => Netty's default
-			.withDeprecatedKeys("taskmanager.net.server.backlog")
-			.withDescription("The netty server connection backlog.");
-
-	public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
-			.key("taskmanager.network.netty.client.connectTimeoutSec")
-			.defaultValue(120) // default: 120s = 2min
-			.withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
-			.withDescription("The Netty client connection timeout.");
-
-	public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
-			.key("taskmanager.network.netty.sendReceiveBufferSize")
-			.defaultValue(0) // default: 0 => Netty's default
-			.withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
-			.withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
-				" (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");
-
-	public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
-			.key("taskmanager.network.netty.transport")
-			.defaultValue("nio")
-			.withDeprecatedKeys("taskmanager.net.transport")
-			.withDescription("The Netty transport type, either \"nio\" or \"epoll\"");
-
-	// ------------------------------------------------------------------------
-
 	enum TransportType {
 		NIO, EPOLL, AUTO
 	}
@@ -147,37 +98,37 @@ public class NettyConfig {
 	// ------------------------------------------------------------------------
 
 	public int getServerConnectBacklog() {
-		return config.getInteger(CONNECT_BACKLOG);
+		return config.getInteger(NetworkEnvironmentOptions.CONNECT_BACKLOG);
 	}
 
 	public int getNumberOfArenas() {
 		// default: number of slots
-		final int configValue = config.getInteger(NUM_ARENAS);
+		final int configValue = config.getInteger(NetworkEnvironmentOptions.NUM_ARENAS);
 		return configValue == -1 ? numberOfSlots : configValue;
 	}
 
 	public int getServerNumThreads() {
 		// default: number of task slots
-		final int configValue = config.getInteger(NUM_THREADS_SERVER);
+		final int configValue = config.getInteger(NetworkEnvironmentOptions.NUM_THREADS_SERVER);
 		return configValue == -1 ? numberOfSlots : configValue;
 	}
 
 	public int getClientNumThreads() {
 		// default: number of task slots
-		final int configValue = config.getInteger(NUM_THREADS_CLIENT);
+		final int configValue = config.getInteger(NetworkEnvironmentOptions.NUM_THREADS_CLIENT);
 		return configValue == -1 ? numberOfSlots : configValue;
 	}
 
 	public int getClientConnectTimeoutSeconds() {
-		return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
+		return config.getInteger(NetworkEnvironmentOptions.CLIENT_CONNECT_TIMEOUT_SECONDS);
 	}
 
 	public int getSendAndReceiveBufferSize() {
-		return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
+		return config.getInteger(NetworkEnvironmentOptions.SEND_RECEIVE_BUFFER_SIZE);
 	}
 
 	public TransportType getTransportType() {
-		String transport = config.getString(TRANSPORT_TYPE);
+		String transport = config.getString(NetworkEnvironmentOptions.TRANSPORT_TYPE);
 
 		switch (transport) {
 			case "nio":
@@ -204,7 +155,7 @@ public class NettyConfig {
 	}
 
 	public boolean getSSLEnabled() {
-		return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
+		return config.getBoolean(NetworkEnvironmentOptions.DATA_SSL_ENABLED)
 			&& SSLUtils.isInternalSSLEnabled(config);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
index 449ad8f..ecbc58e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
@@ -150,15 +151,15 @@ public class NetworkEnvironmentConfiguration {
 
 		final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport);
 
-		int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
-		int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+		int initialRequestBackoff = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+		int maxRequestBackoff = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX);
 
-		int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
-		int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+		int buffersPerChannel = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
+		int extraBuffersPerGate = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
 
-		boolean isCreditBased = nettyConfig != null && configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+		boolean isCreditBased = nettyConfig != null && configuration.getBoolean(NetworkEnvironmentOptions.NETWORK_CREDIT_MODEL);
 
-		boolean isNetworkDetailedMetrics = configuration.getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS);
+		boolean isNetworkDetailedMetrics = configuration.getBoolean(NetworkEnvironmentOptions.NETWORK_DETAILED_METRICS);
 
 		return new NetworkEnvironmentConfiguration(
 			numberOfNetworkBuffers,
@@ -184,10 +185,10 @@ public class NetworkEnvironmentConfiguration {
 	 * <ul>
 	 *  <li>{@link TaskManagerOptions#MANAGED_MEMORY_SIZE},</li>
 	 *  <li>{@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},</li>
-	 *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
-	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
-	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
-	 *  <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
+	 *  <li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
+	 * 	<li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
+	 * 	<li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
+	 *  <li>{@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
 	 * </ul>.
 	 *
 	 * @param config configuration object
@@ -222,7 +223,7 @@ public class NetworkEnvironmentConfiguration {
 		// jvmHeapNoNet = jvmHeap - networkBufBytes
 		//              = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
 		// jvmHeap = jvmHeapNoNet / (1.0 - networkBufFraction)
-		float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+		float networkBufFraction = config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
 		long networkBufSize = (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction);
 		return calculateNewNetworkBufferMemory(config, networkBufSize, maxJvmHeapMemory);
 	}
@@ -233,10 +234,10 @@ public class NetworkEnvironmentConfiguration {
 	 *
 	 * <p>The following configuration parameters are involved:
 	 * <ul>
-	 *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
-	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
-	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
-	 *  <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
+	 *  <li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
+	 * 	<li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
+	 * 	<li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}, and</li>
+	 *  <li>{@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li>
 	 * </ul>.
 	 *
 	 * @param totalJavaMemorySize overall available memory to use (in bytes)
@@ -250,18 +251,18 @@ public class NetworkEnvironmentConfiguration {
 
 		final long networkBufBytes;
 		if (hasNewNetworkConfig(config)) {
-			float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+			float networkBufFraction = config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
 			long networkBufSize = (long) (totalJavaMemorySize * networkBufFraction);
 			networkBufBytes = calculateNewNetworkBufferMemory(config, networkBufSize, totalJavaMemorySize);
 		} else {
 			// use old (deprecated) network buffers parameter
-			int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+			int numNetworkBuffers = config.getInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS);
 			networkBufBytes = (long) numNetworkBuffers * (long) segmentSize;
 
 			checkOldNetworkConfig(numNetworkBuffers);
 
 			ConfigurationParserUtils.checkConfigParameter(networkBufBytes < totalJavaMemorySize,
-				networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+				networkBufBytes, NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS.key(),
 				"Network buffer memory size too large: " + networkBufBytes + " >= " +
 					totalJavaMemorySize + " (total JVM memory size)");
 		}
@@ -275,9 +276,9 @@ public class NetworkEnvironmentConfiguration {
 	 *
 	 * <p>The following configuration parameters are involved:
 	 * <ul>
-	 *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
-	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
-	 * 	<li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}</li>
+	 *  <li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
+	 * 	<li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
+	 * 	<li>{@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}</li>
 	 * </ul>.
 	 *
 	 * @param config configuration object
@@ -287,9 +288,9 @@ public class NetworkEnvironmentConfiguration {
 	 * @return memory to use for network buffers (in bytes)
 	 */
 	private static long calculateNewNetworkBufferMemory(Configuration config, long networkBufSize, long maxJvmHeapMemory) {
-		float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
-		long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
-		long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
+		float networkBufFraction = config.getFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+		long networkBufMin = MemorySize.parse(config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
+		long networkBufMax = MemorySize.parse(config.getString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
 
 		int pageSize = getPageSize(config);
 
@@ -299,9 +300,9 @@ public class NetworkEnvironmentConfiguration {
 
 		ConfigurationParserUtils.checkConfigParameter(networkBufBytes < maxJvmHeapMemory,
 			"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
-			"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
-				TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
-				TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+			"(" + NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+				NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+				NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
 			"Network buffer memory size too large: " + networkBufBytes + " >= " +
 				maxJvmHeapMemory + " (maximum JVM memory size)");
 
@@ -318,7 +319,7 @@ public class NetworkEnvironmentConfiguration {
 	@SuppressWarnings("deprecation")
 	private static void checkOldNetworkConfig(final int numNetworkBuffers) {
 		ConfigurationParserUtils.checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
-			TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+			NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS.key(),
 			"Must have at least one network buffer");
 	}
 
@@ -339,23 +340,23 @@ public class NetworkEnvironmentConfiguration {
 		final long networkBufMax) throws IllegalConfigurationException {
 
 		ConfigurationParserUtils.checkConfigParameter(networkBufFraction > 0.0f && networkBufFraction < 1.0f, networkBufFraction,
-			TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+			NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
 			"Network buffer memory fraction of the free memory must be between 0.0 and 1.0");
 
 		ConfigurationParserUtils.checkConfigParameter(networkBufMin >= pageSize, networkBufMin,
-			TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+			NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
 			"Minimum memory for network buffers must allow at least one network " +
 				"buffer with respect to the memory segment size");
 
 		ConfigurationParserUtils.checkConfigParameter(networkBufMax >= pageSize, networkBufMax,
-			TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
+			NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
 			"Maximum memory for network buffers must allow at least one network " +
 				"buffer with respect to the memory segment size");
 
 		ConfigurationParserUtils.checkConfigParameter(networkBufMax >= networkBufMin, networkBufMax,
-			TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
+			NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
 			"Maximum memory for network buffers must not be smaller than minimum memory (" +
-				TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ": " + networkBufMin + ")");
+				NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ": " + networkBufMin + ")");
 	}
 
 	/**
@@ -368,10 +369,10 @@ public class NetworkEnvironmentConfiguration {
 	@SuppressWarnings("deprecation")
 	@VisibleForTesting
 	public static boolean hasNewNetworkConfig(final Configuration config) {
-		return config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
-			config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
-			config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX) ||
-			!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+		return config.contains(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
+			config.contains(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
+			config.contains(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX) ||
+			!config.contains(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS);
 	}
 
 	/**
@@ -381,8 +382,8 @@ public class NetworkEnvironmentConfiguration {
 	 * @return the data port
 	 */
 	private static int getDataport(Configuration configuration) {
-		final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);
-		ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
+		final int dataport = configuration.getInteger(NetworkEnvironmentOptions.DATA_PORT);
+		ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, NetworkEnvironmentOptions.DATA_PORT.key(),
 			"Leave config parameter empty or use 0 to let the system choose a port automatically.");
 
 		return dataport;
@@ -400,13 +401,13 @@ public class NetworkEnvironmentConfiguration {
 		final int numberOfNetworkBuffers;
 		if (!hasNewNetworkConfig(configuration)) {
 			// fallback: number of network buffers
-			numberOfNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+			numberOfNetworkBuffers = configuration.getInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS);
 
 			checkOldNetworkConfig(numberOfNetworkBuffers);
 		} else {
-			if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+			if (configuration.contains(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS)) {
 				LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
-					TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+					NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS.key());
 			}
 
 			final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index d6e1d48..5e41ec0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network;
 
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
@@ -141,7 +141,7 @@ public class NetworkEnvironmentTest {
 			bufferCount = 20;
 		} else {
 			// incoming: 2 exclusive buffers per channel
-			bufferCount = 10 + 10 * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
+			bufferCount = 10 + 10 * NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
 		}
 
 		testRegisterTaskWithLimitedBuffers(bufferCount);
@@ -160,7 +160,7 @@ public class NetworkEnvironmentTest {
 			bufferCount = 19;
 		} else {
 			// incoming: 2 exclusive buffers per channel
-			bufferCount = 10 + 10 * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1;
+			bufferCount = 10 + 10 * NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1;
 		}
 
 		expectedException.expect(IOException.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index f83e411..094426d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.util.NetUtils;
@@ -110,9 +111,9 @@ public class NettyConnectionManagerTest {
 
 		// Expected number of threads
 		Configuration flinkConfig = new Configuration();
-		flinkConfig.setInteger(NettyConfig.NUM_ARENAS, numberOfArenas);
-		flinkConfig.setInteger(NettyConfig.NUM_THREADS_CLIENT, 3);
-		flinkConfig.setInteger(NettyConfig.NUM_THREADS_SERVER, 4);
+		flinkConfig.setInteger(NetworkEnvironmentOptions.NUM_ARENAS, numberOfArenas);
+		flinkConfig.setInteger(NetworkEnvironmentOptions.NUM_THREADS_CLIENT, 3);
+		flinkConfig.setInteger(NetworkEnvironmentOptions.NUM_THREADS_SERVER, 4);
 
 		NettyConfig config = new NettyConfig(
 				InetAddress.getLocalHost(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index f32b839..7cb1abd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -62,7 +62,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 	private static Configuration getFlinkConfiguration() {
 		final Configuration config = new Configuration();
 		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
 
 		return config;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index ae498a1..e6736fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.TestLogger;
 
@@ -70,9 +71,9 @@ public class NetworkBufferCalculationTest extends TestLogger {
 	 *
 	 * @param managedMemory         see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}
 	 * @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}
-	 * @param networkBufFraction	see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
-	 * @param networkBufMin			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}
-	 * @param networkBufMax			see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}
+	 * @param networkBufFraction	see {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION}
+	 * @param networkBufMin			see {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN}
+	 * @param networkBufMax			see {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}
 	 * @param memoryType			on-heap or off-heap
 	 *
 	 * @return configuration object
@@ -89,9 +90,9 @@ public class NetworkBufferCalculationTest extends TestLogger {
 
 		configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), managedMemory);
 		configuration.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), managedMemoryFraction);
-		configuration.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(), networkBufFraction);
-		configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(), networkBufMin);
-		configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(), networkBufMax);
+		configuration.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(), networkBufFraction);
+		configuration.setLong(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key(), networkBufMin);
+		configuration.setLong(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key(), networkBufMax);
 		configuration.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP.key(), memoryType == MemoryType.OFF_HEAP);
 
 		return configuration;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkEnvironmentConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkEnvironmentConfigurationTest.java
index a2c623e..47f4734 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkEnvironmentConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkEnvironmentConfigurationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.TestLogger;
@@ -40,13 +41,13 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
 
 	/**
 	 * Test for {@link NetworkEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)} using old
-	 * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+	 * configurations via {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS}.
 	 */
 	@SuppressWarnings("deprecation")
 	@Test
 	public void calculateNetworkBufOld() {
 		Configuration config = new Configuration();
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 1);
 
 		// note: actual network buffer memory size is independent of the totalJavaMemorySize
 		assertEquals(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(),
@@ -56,24 +57,24 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
 
 		// test integer overflow in the memory size
 		int numBuffers = (int) ((2L << 32) / MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes()); // 2^33
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, numBuffers);
 		assertEquals(2L << 32, NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(2L << 33, config));
 	}
 
 	/**
 	 * Test for {@link NetworkEnvironmentConfiguration#calculateNetworkBufferMemory(long, Configuration)} using new
-	 * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
-	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
-	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+	 * configurations via {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+	 * {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN} and
+	 * {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}.
 	 */
 	@Test
 	public void calculateNetworkBufNew() throws Exception {
 		Configuration config = new Configuration();
 
 		// (1) defaults
-		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
-		final Long defaultMin = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
-		final Long defaultMax = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
+		final Float defaultFrac = NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+		final Long defaultMin = MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
+		final Long defaultMax = MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
 		assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), defaultMin, defaultMax),
 			NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((64L << 20 + 1), config));
 		assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), defaultMin, defaultMax),
@@ -90,8 +91,8 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
 	 */
 	private static void calculateNetworkBufNew(final Configuration config) {
 		// (2) fixed size memory
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20)); // 1MB
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 20)); // 1MB
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20)); // 1MB
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 20)); // 1MB
 
 
 		// note: actual network buffer memory size is independent of the totalJavaMemorySize
@@ -103,14 +104,14 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
 		Random ran = new Random();
 		for (int i = 0; i < 1_000; ++i){
 			float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
-			config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
+			config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
 
 			long min = Math.max(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(), ran.nextLong());
-			config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(min));
+			config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(min));
 
 
 			long max = Math.max(min, ran.nextLong());
-			config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(max));
+			config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(max));
 
 			long javaMem = Math.max(max + 1, ran.nextLong());
 
@@ -141,16 +142,16 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
 	@Test
 	public void calculateNetworkBufMixed() throws Exception {
 		Configuration config = new Configuration();
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 1);
 
-		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
-		final Long defaultMin = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
-		final Long defaultMax = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
+		final Float defaultFrac = NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
+		final Long defaultMin = MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
+		final Long defaultMax = MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
 
 
 		// old + 1 new parameter = new:
 		Configuration config1 = config.clone();
-		config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		config1.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
 		assertEquals(enforceBounds((long) (0.1f * (10L << 20)), defaultMin, defaultMax),
 			NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((64L << 20 + 1), config1));
 		assertEquals(enforceBounds((long) (0.1f * (10L << 30)), defaultMin, defaultMax),
@@ -158,15 +159,15 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
 
 		config1 = config.clone();
 		long newMin = MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(); // smallest value possible
-		config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(newMin));
+		config1.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(newMin));
 		assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), newMin, defaultMax),
 			NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((10L << 20), config1));
 		assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), newMin, defaultMax),
 			NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((10L << 30), config1));
 
 		config1 = config.clone();
-		long newMax = Math.max(64L << 20 + 1, MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes());
-		config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(newMax));
+		long newMax = Math.max(64L << 20 + 1, MemorySize.parse(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes());
+		config1.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(newMax));
 		assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), defaultMin, newMax),
 			NetworkEnvironmentConfiguration.calculateNetworkBufferMemory((64L << 20 + 1), config1));
 		assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), defaultMin, newMax),
@@ -197,19 +198,19 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
 	@Test
 	public void calculateHeapSizeMB() throws Exception {
 		Configuration config = new Configuration();
-		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(64L << 20)); // 64MB
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 30)); // 1GB
+		config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(64L << 20)); // 64MB
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 30)); // 1GB
 
 		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
 		assertEquals(900, TaskManagerServices.calculateHeapSizeMB(1000, config));
 
 		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
-		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f);
+		config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f);
 		assertEquals(800, TaskManagerServices.calculateHeapSizeMB(1000, config));
 
 		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
-		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "10m"); // 10MB
 		assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
 
@@ -221,13 +222,13 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
 	/**
 	 * Verifies that {@link NetworkEnvironmentConfiguration#hasNewNetworkConfig(Configuration)}
 	 * returns the correct result for old configurations via
-	 * {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
+	 * {@link NetworkEnvironmentOptions#NETWORK_NUM_BUFFERS}.
 	 */
 	@SuppressWarnings("deprecation")
 	@Test
 	public void hasNewNetworkBufConfOld() throws Exception {
 		Configuration config = new Configuration();
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 1);
 
 		assertFalse(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
 	}
@@ -235,9 +236,9 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
 	/**
 	 * Verifies that {@link NetworkEnvironmentConfiguration#hasNewNetworkConfig(Configuration)}
 	 * returns the correct result for new configurations via
-	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
-	 * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and {@link
-	 * TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
+	 * {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+	 * {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MIN} and {@link
+	 * NetworkEnvironmentOptions#NETWORK_BUFFERS_MEMORY_MAX}.
 	 */
 	@Test
 	public void hasNewNetworkBufConfNew() throws Exception {
@@ -245,29 +246,29 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
 		assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
 
 		// fully defined:
-		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "2048");
+		config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "2048");
 
 		assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
 
 		// partly defined:
 		config = new Configuration();
-		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
 		assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
 		assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
 
 		config = new Configuration();
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
 		assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
-		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		config.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
 		assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
 
 		config = new Configuration();
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
 		assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
 		assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
 	}
 
@@ -281,20 +282,20 @@ public class NetworkEnvironmentConfigurationTest extends TestLogger {
 		Configuration config = new Configuration();
 		assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
 
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 1);
 		assertFalse(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config));
 
 		// old + 1 new parameter = new:
 		Configuration config1 = config.clone();
-		config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
+		config1.setFloat(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
 		assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config1));
 
 		config1 = config.clone();
-		config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
+		config1.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
 		assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config1));
 
 		config1 = config.clone();
-		config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
+		config1.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
 		assertTrue(NetworkEnvironmentConfiguration.hasNewNetworkConfig(config1));
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
index 12c1f8e..711256e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -413,9 +414,9 @@ public class TaskExecutorSubmissionTest extends TestLogger {
 
 		final int dataPort = NetUtils.getAvailablePort();
 		Configuration config = new Configuration();
-		config.setInteger(TaskManagerOptions.DATA_PORT, dataPort);
-		config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
-		config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+		config.setInteger(NetworkEnvironmentOptions.DATA_PORT, dataPort);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
 
 		// Remote location (on the same TM though) for the partition
 		final ResultPartitionLocation loc = ResultPartitionLocation
@@ -526,8 +527,8 @@ public class TaskExecutorSubmissionTest extends TestLogger {
 				Collections.singletonList(inputGateDeploymentDescriptor));
 
 		Configuration config = new Configuration();
-		config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
-		config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
 
 		final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
 		final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index bf57e2a..58f234c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -1627,9 +1628,9 @@ public class TaskExecutorTest extends TestLogger {
 	public void testLogNotFoundHandling() throws Throwable {
 		final int dataPort = NetUtils.getAvailablePort();
 		Configuration config = new Configuration();
-		config.setInteger(TaskManagerOptions.DATA_PORT, dataPort);
-		config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
-		config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+		config.setInteger(NetworkEnvironmentOptions.DATA_PORT, dataPort);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
 		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
 
 		try (TaskSubmissionTestEnvironment env =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
index a14dfd9..f48949d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerStartupTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -162,7 +163,7 @@ public class TaskManagerRunnerStartupTest extends TestLogger {
 
 		try {
 			final Configuration cfg = new Configuration();
-			cfg.setInteger(TaskManagerOptions.DATA_PORT, blocker.getLocalPort());
+			cfg.setInteger(NetworkEnvironmentOptions.DATA_PORT, blocker.getLocalPort());
 
 			startTaskManager(
 				cfg,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
index 3ac1734..d528612 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -38,20 +38,20 @@ public class TaskManagerServicesConfigurationTest extends TestLogger {
 	/**
 	 * Verifies that {@link TaskManagerServicesConfiguration#fromConfiguration(Configuration, long, InetAddress, boolean)}
 	 * returns the correct result for new configurations via
-	 * {@link TaskManagerOptions#NETWORK_REQUEST_BACKOFF_INITIAL},
-	 * {@link TaskManagerOptions#NETWORK_REQUEST_BACKOFF_MAX},
-	 * {@link TaskManagerOptions#NETWORK_BUFFERS_PER_CHANNEL} and
-	 * {@link TaskManagerOptions#NETWORK_EXTRA_BUFFERS_PER_GATE}
+	 * {@link NetworkEnvironmentOptions#NETWORK_REQUEST_BACKOFF_INITIAL},
+	 * {@link NetworkEnvironmentOptions#NETWORK_REQUEST_BACKOFF_MAX},
+	 * {@link NetworkEnvironmentOptions#NETWORK_BUFFERS_PER_CHANNEL} and
+	 * {@link NetworkEnvironmentOptions#NETWORK_EXTRA_BUFFERS_PER_GATE}
 	 */
 	@Test
 	public void testNetworkRequestBackoffAndBuffers() throws Exception {
 
 		// set some non-default values
 		final Configuration config = new Configuration();
-		config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
-		config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
-		config.setInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
-		config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
 
 		TaskManagerServicesConfiguration tmConfig =
 			TaskManagerServicesConfiguration.fromConfiguration(config, MEM_SIZE_PARAM, InetAddress.getLoopbackAddress(), true);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 0f29880..d0d142f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -240,14 +240,14 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 			networkEnvironment = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
 		} else {
 			final InetSocketAddress socketAddress = new InetSocketAddress(
-				InetAddress.getByName(testingRpcService.getAddress()), configuration.getInteger(TaskManagerOptions.DATA_PORT));
+				InetAddress.getByName(testingRpcService.getAddress()), configuration.getInteger(NetworkEnvironmentOptions.DATA_PORT));
 
 			final NettyConfig nettyConfig = new NettyConfig(socketAddress.getAddress(), socketAddress.getPort(),
 				NetworkEnvironmentConfiguration.getPageSize(configuration), ConfigurationParserUtils.getSlot(configuration), configuration);
 
 			networkEnvironment =  new NetworkEnvironmentBuilder()
-				.setPartitionRequestInitialBackoff(configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL))
-				.setPartitionRequestMaxBackoff(configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX))
+				.setPartitionRequestInitialBackoff(configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL))
+				.setPartitionRequestMaxBackoff(configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX))
 				.setNettyConfig(localCommunication ? null : nettyConfig)
 				.build();
 			networkEnvironment.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index 85573af..4c9c772 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
@@ -73,7 +74,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 	private static Configuration getFlinkConfiguration() {
 		Configuration config = new Configuration();
 		config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 9);
 		return config;
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index d1c5b72..b420781 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -51,7 +52,7 @@ public class InputProcessorUtil {
 					+ " must be positive or -1 (infinite)");
 			}
 
-			if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {
+			if (taskManagerConfig.getBoolean(NetworkEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
 				barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
 			} else {
 				barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
index 97675db..a15aca4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -82,7 +82,7 @@ public class StreamNetworkThroughputBenchmarkTest {
 		expectedException.expect(IOException.class);
 		expectedException.expectMessage("Insufficient number of network buffers");
 
-		env.setUp(writers, channels, 100, false, writers * channels - 1, writers * channels * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
+		env.setUp(writers, channels, 100, false, writers * channels - 1, writers * channels * NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
 	}
 
 	@Test
@@ -94,7 +94,7 @@ public class StreamNetworkThroughputBenchmarkTest {
 		expectedException.expect(IOException.class);
 		expectedException.expectMessage("Insufficient number of network buffers");
 
-		env.setUp(writers, channels, 100, false, writers * channels, writers * channels * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1);
+		env.setUp(writers, channels, 100, false, writers * channels, writers * channels * NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1);
 	}
 
 	@Test
@@ -104,7 +104,7 @@ public class StreamNetworkThroughputBenchmarkTest {
 		int channels = 2;
 
 		env.setUp(writers, channels, 100, false, writers * channels, writers * channels *
-			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
+			NetworkEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
 		env.executeBenchmark(10_000);
 		env.tearDown();
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 142ca43..08a5e1a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -79,7 +80,7 @@ public abstract class CancelingTestBase extends TestLogger {
 		config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
 		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
 		config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 2048);
 
 		return config;
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 1c04d27..99c5a75 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
@@ -210,7 +211,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
 		// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
+		config.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
 		config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
 
 		if (zkServer != null) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index 73e4ae9..95aab44 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.manual;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.CheckpointingMode;
@@ -50,7 +51,7 @@ public class StreamingScalabilityAndLatency {
 		try {
 			Configuration config = new Configuration();
 			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
-			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000);
+			config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 20000);
 
 			config.setInteger("taskmanager.net.server.numThreads", 1);
 			config.setInteger("taskmanager.net.client.numThreads", 1);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index d53c052..642f601 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.examples.java.clustering.KMeans;
 import org.apache.flink.examples.java.clustering.util.KMeansData;
@@ -62,7 +63,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 800);
 		return config;
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 26840e7..a8f6ffc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -102,7 +103,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
 		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
 
 		try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config)) {
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
index aae2f0f..2ff3f59 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -250,7 +251,7 @@ public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger {
 			zooKeeper.getConnectString(), zookeeperStoragePath.getPath());
 		// Task manager configuration
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
 		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 
 		final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index bd7b912..e4dd1f1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -114,7 +115,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
 		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+		config.setInteger(NetworkEnvironmentOptions.NETWORK_NUM_BUFFERS, 100);
 		config.setInteger(RestOptions.PORT, 0);
 
 		final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
index 5be9363..05fd914 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.runtime;
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -31,7 +32,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
 
 /**
@@ -73,7 +73,7 @@ public class NettyEpollITCase extends TestLogger {
 	private MiniClusterWithClientResource trySetUpCluster() throws Exception {
 		try {
 			Configuration config = new Configuration();
-			config.setString(TRANSPORT_TYPE, "epoll");
+			config.setString(NetworkEnvironmentOptions.TRANSPORT_TYPE, "epoll");
 			MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
 				new MiniClusterResourceConfiguration.Builder()
 					.setConfiguration(config)
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 6da6055..acca620 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -25,8 +25,8 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NetworkEnvironmentOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.RestClient;
@@ -87,8 +87,8 @@ public class YarnConfigurationITCase extends YarnTestBase {
 
 		// disable heap cutoff min
 		configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
-		configuration.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20));
-		configuration.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(4L << 20));
+		configuration.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20));
+		configuration.setString(NetworkEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(4L << 20));
 
 		final YarnConfiguration yarnConfiguration = getYarnConfiguration();
 		final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(