You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/25 07:32:23 UTC

[09/10] flink git commit: [FLINK-9033][config] Replace usages of deprecated TASK_MANAGER_NUM_TASK_SLOTS

[FLINK-9033][config] Replace usages of deprecated TASK_MANAGER_NUM_TASK_SLOTS

This closes #5731.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73e9f901
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73e9f901
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73e9f901

Branch: refs/heads/master
Commit: 73e9f9013391a4e5be18f1cfc1f9462a78e95ca7
Parents: 1a9675d
Author: zhouhai02 <zh...@meituan.com>
Authored: Wed Mar 21 00:01:54 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:45:17 2018 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/client/LocalExecutor.java  |  7 ++++---
 .../flink/client/deployment/ClusterSpecification.java     |  3 +--
 .../kinesis/manualtests/ManualExactlyOnceTest.java        |  2 +-
 .../ManualExactlyOnceWithStreamReshardingTest.java        |  2 +-
 .../org/apache/flink/storm/api/FlinkLocalCluster.java     |  3 +--
 .../flink/runtime/clusterframework/BootstrapTools.java    |  2 +-
 .../runtime/minicluster/MiniClusterConfiguration.java     |  3 ++-
 .../runtime/taskexecutor/TaskManagerConfiguration.java    |  2 +-
 .../taskexecutor/TaskManagerServicesConfiguration.java    |  4 ++--
 .../flink/runtime/checkpoint/CoordinatorShutdownTest.java |  5 +++--
 .../partition/PartialConsumePipelinedResultTest.java      |  2 +-
 .../flink/runtime/jobmanager/JobManagerCleanupITCase.java |  3 ++-
 .../runtime/jobmanager/JobManagerHARecoveryTest.java      |  4 ++--
 .../apache/flink/runtime/jobmanager/JobManagerTest.java   |  5 ++---
 .../leaderelection/LeaderChangeJobRecoveryTest.java       |  3 ++-
 .../leaderelection/LeaderChangeStateCleanupTest.java      |  3 ++-
 .../backpressure/BackPressureStatsTrackerImplITCase.java  |  4 ++--
 .../backpressure/StackTraceSampleCoordinatorITCase.java   |  4 ++--
 .../org/apache/flink/runtime/akka/AkkaSslITCase.scala     | 10 +++++-----
 .../apache/flink/runtime/jobmanager/RecoveryITCase.scala  |  4 ++--
 .../apache/flink/runtime/testingUtils/TestingUtils.scala  |  2 +-
 .../org/apache/flink/api/scala/ScalaShellITCase.scala     |  4 ++--
 .../org/apache/flink/test/util/MiniClusterResource.java   |  2 +-
 .../java/org/apache/flink/test/util/TestBaseUtils.java    |  2 +-
 .../checkpointing/utils/SavepointMigrationTestBase.java   |  3 ++-
 .../flink/test/operators/ExecutionEnvironmentITCase.java  |  4 ++--
 .../flink/test/operators/RemoteEnvironmentITCase.java     |  4 ++--
 .../AbstractTaskManagerProcessFailureRecoveryTest.java    |  2 +-
 .../recovery/JobManagerHACheckpointRecoveryITCase.java    |  3 ++-
 .../JobManagerHAProcessFailureBatchRecoveryITCase.java    |  3 +--
 .../test/recovery/TaskManagerFailureRecoveryITCase.java   |  2 +-
 .../leaderelection/ZooKeeperLeaderElectionITCase.java     |  3 ++-
 .../runtime/minicluster/LocalFlinkMiniClusterITCase.java  |  3 ++-
 .../scala/runtime/jobmanager/JobManagerFailsITCase.scala  |  2 +-
 .../runtime/taskmanager/TaskManagerFailsITCase.scala      |  5 ++---
 .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java    |  3 +--
 36 files changed, 63 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 01c281f..f837c4f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -139,7 +140,7 @@ public class LocalExecutor extends PlanExecutor {
 				.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
 				.setNumSlotsPerTaskManager(
 					configuration.getInteger(
-						ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1))
+						TaskManagerOptions.NUM_TASK_SLOTS, 1))
 				.build();
 
 			final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
@@ -220,7 +221,7 @@ public class LocalExecutor extends PlanExecutor {
 
 			try {
 				// TODO: Set job's default parallelism to max number of slots
-				final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
+				final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
 				final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 				plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
 
@@ -265,7 +266,7 @@ public class LocalExecutor extends PlanExecutor {
 
 	private Configuration createConfiguration() {
 		Configuration newConfiguration = new Configuration();
-		newConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
+		newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
 		newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());
 
 		newConfiguration.addAll(baseConfiguration);

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index 8650cab..cf2ae4c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client.deployment;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -66,7 +65,7 @@ public final class ClusterSpecification {
 	}
 
 	public static ClusterSpecification fromConfiguration(Configuration configuration) {
-		int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+		int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
 		int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
 		int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 963002f..40225fb 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -78,7 +78,7 @@ public class ManualExactlyOnceTest {
 
 		final Configuration flinkConfig = new Configuration();
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
 		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 93b9caf..34dcdc0 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -90,7 +90,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 
 		final Configuration flinkConfig = new Configuration();
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
 		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index bff8c80..6b0b503 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.storm.api;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -90,7 +89,7 @@ public class FlinkLocalCluster {
 			configuration.addAll(jobGraph.getJobConfiguration());
 
 			configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
-			configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
+			configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
 
 			this.flink = new LocalFlinkMiniCluster(configuration, true);
 			this.flink.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 102274d1..7a8403a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -247,7 +247,7 @@ public class BootstrapTools {
 
 		cfg.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, registrationTimeout.toString());
 		if (numSlots != -1){
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+			cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
 		}
 
 		return cfg; 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 44a567b..0a0c692 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.util.Preconditions;
@@ -167,7 +168,7 @@ public class MiniClusterConfiguration {
 
 		public MiniClusterConfiguration build() {
 			final Configuration modifiedConfiguration = new Configuration(configuration);
-			modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+			modifiedConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTaskManager);
 			modifiedConfiguration.setString(
 				RestOptions.ADDRESS,
 				modifiedConfiguration.getString(RestOptions.ADDRESS, "localhost"));

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 1bf42ee..e8a7ae8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -162,7 +162,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 	// --------------------------------------------------------------------------------------------
 
 	public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
-		int numberSlots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+		int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
 		if (numberSlots == -1) {
 			numberSlots = 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index d029bc5..b80320c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -196,7 +196,7 @@ public class TaskManagerServicesConfiguration {
 			boolean localCommunication) throws Exception {
 
 		// we need this because many configs have been written with a "-1" entry
-		int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+		int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 		if (slots == -1) {
 			slots = 1;
 		}
@@ -290,7 +290,7 @@ public class TaskManagerServicesConfiguration {
 		checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
 			"Leave config parameter empty or use 0 to let the system choose a port automatically.");
 
-		checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+		checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
 			"Number of task slots must be at least one.");
 
 		final int pageSize = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 1d44444..8a6a9d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.execution.Environment;
@@ -59,7 +60,7 @@ public class CoordinatorShutdownTest extends TestLogger {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 			cluster = new LocalFlinkMiniCluster(config, true);
 			cluster.start();
 			
@@ -128,7 +129,7 @@ public class CoordinatorShutdownTest extends TestLogger {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 			cluster = new LocalFlinkMiniCluster(config, true);
 			cluster.start();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index ced1a33..8080ca4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -55,7 +55,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 	public static void setUp() throws Exception {
 		final Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
 		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
 		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
index 8806dec..33a4a28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -146,7 +147,7 @@ public class JobManagerCleanupITCase extends TestLogger {
 
 					try {
 						Configuration config = new Configuration();
-						config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+						config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 						config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 						config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT());
 						config.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index bb2dbf7..d991983 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -174,7 +174,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 
 		flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 		flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
-		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
+		flinkConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slots);
 		flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 417294c..79e6d20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -641,7 +640,7 @@ public class JobManagerTest extends TestLogger {
 
 		Configuration tmConfig = new Configuration();
 		tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
-		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
 
 		ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
 			tmConfig,
@@ -1300,7 +1299,7 @@ public class JobManagerTest extends TestLogger {
 			archiver = new AkkaActorGateway(master._2(), leaderId);
 
 			Configuration tmConfig = new Configuration();
-			tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+			tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
 
 			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
 				tmConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 942fcf3..2ebaeba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -72,7 +73,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
 
 		configuration.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
 		configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 9999);

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index bbcbbf0..6880d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -78,7 +79,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
 
 		highAvailabilityServices = new TestingManualHighAvailabilityServices();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
index 994d02e..0e837a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
@@ -138,7 +138,7 @@ public class BackPressureStatsTrackerImplITCase extends TestLogger {
 					config,
 					highAvailabilityServices);
 
-				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+				config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
 
 				taskManager = TestingUtils.createTaskManager(
 					testActorSystem,

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
index 8fa302a..ccf5f60 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
@@ -109,7 +109,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 					config,
 					highAvailabilityServices);
 
-				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+				config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
 
 				taskManager = TestingUtils.createTaskManager(
 					testActorSystem,

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index 72596cd..ebdae31 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -51,7 +51,7 @@ class AkkaSslITCase(_system: ActorSystem)
       val config = new Configuration()
       config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
       config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
-      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+      config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
       config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
 
       config.setBoolean(SecurityOptions.SSL_ENABLED, true)
@@ -78,7 +78,7 @@ class AkkaSslITCase(_system: ActorSystem)
         val config = new Configuration()
         config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
         config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
-        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
 
         config.setBoolean(SecurityOptions.SSL_ENABLED, true)
@@ -101,7 +101,7 @@ class AkkaSslITCase(_system: ActorSystem)
     "start with akka ssl disabled" in {
 
       val config = new Configuration()
-      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+      config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
       config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
       config.setBoolean(SecurityOptions.SSL_ENABLED, false)
 
@@ -117,7 +117,7 @@ class AkkaSslITCase(_system: ActorSystem)
       an[Exception] should be thrownBy {
 
         val config = new Configuration()
-        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
         config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
 
@@ -139,7 +139,7 @@ class AkkaSslITCase(_system: ActorSystem)
       an[Exception] should be thrownBy {
 
         val config = new Configuration()
-        config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
         config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 71d2ee9..a2dffc8 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -22,7 +22,7 @@ import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, TaskManagerOptions}
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex}
@@ -59,7 +59,7 @@ class RecoveryITCase(_system: ActorSystem)
       heartbeatTimeout: String)
     : TestingCluster = {
     val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
     config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
     new TestingCluster(config)

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 2d8d02d..b89d2a6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -88,7 +88,7 @@ object TestingUtils {
   def startTestingCluster(numSlots: Int, numTMs: Int = 1,
                           timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = {
     val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs)
     config.setString(AkkaOptions.ASK_TIMEOUT, timeout)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index cb6231a..2e07fb9 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -22,8 +22,8 @@ import java.io._
 
 import akka.actor.ActorRef
 import akka.pattern.Patterns
+import org.apache.flink.configuration.{Configuration, CoreOptions, TaskManagerOptions}
 import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
-import org.apache.flink.configuration.{ConfigConstants, Configuration, CoreOptions, GlobalConfiguration}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
@@ -321,7 +321,7 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism)
+    configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
     configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
 
     cluster = Option(new StandaloneMiniCluster(configuration))

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 8c21b37..844de01 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -198,7 +198,7 @@ public class MiniClusterResource extends ExternalResource {
 	private void startLegacyMiniCluster() throws Exception {
 		final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
 
 		final LocalFlinkMiniCluster flinkMiniCluster = TestBaseUtils.startCluster(
 			configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index dd255fd..7e9b12e 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -127,7 +127,7 @@ public class TestBaseUtils extends TestLogger {
 		Configuration config = new Configuration();
 
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
 
 		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, startWebserver);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 8f2aaa1..cfa155b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -99,7 +100,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 		final Configuration config = new Configuration();
 
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
 
 		UUID id = UUID.randomUUID();
 		final File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" + id).getAbsoluteFile();

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
index f7f993b..65a21d1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -50,7 +50,7 @@ public class ExecutionEnvironmentITCase extends TestLogger {
 	@Test
 	public void testLocalEnvironmentWithConfig() throws Exception {
 		Configuration conf = new Configuration();
-		conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+		conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM);
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
 		env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index a3a551c..c2d6341 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -96,7 +96,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
 
 			resource = miniCluster;
 		} else {
-			configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
+			configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS);
 			final StandaloneMiniCluster standaloneMiniCluster = new StandaloneMiniCluster(configuration);
 			hostname = standaloneMiniCluster.getHostname();
 			port = standaloneMiniCluster.getPort();

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 29516dc..6764f6f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -406,7 +406,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
 				cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 				cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+				cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 				cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
 
 				TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 16ea6d5..ce750d3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -365,7 +366,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 			config.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
 			config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toString());
 
 			String tmpFolderString = temporaryFolder.newFolder().toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index d217a2a..50404a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -270,7 +269,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 			// Task manager configuration
 			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 
 			highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
 				config,

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 8371230..755b3d3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -77,7 +77,7 @@ public class TaskManagerFailureRecoveryITCase extends TestLogger {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
 			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 
 			config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "500 ms");

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index f348b8a..399fc10 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -157,7 +158,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
 
 		// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
 		// sure that all TMs have registered to the JM prior to issuing the RecoverAllJobs message

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 770b88c..b3d92df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.runtime.minicluster;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -76,7 +77,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
 			miniCluster = new LocalFlinkMiniCluster(config, true);
 
 			miniCluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 5fe7b1d..5c9b1fb 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -133,7 +133,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
 
   def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
     val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
     config.setInteger(JobManagerOptions.PORT, 0)
     config.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "50 ms")

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index a065e5b..69ad2d7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -20,8 +20,7 @@ package org.apache.flink.api.scala.runtime.taskmanager
 
 import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.ConfigConstants
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{ConfigConstants, Configuration, TaskManagerOptions}
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
@@ -243,7 +242,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
 
   def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
     val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
 
     new TestingCluster(config, singleActorSystem = false)

http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index fe04662..7596d68 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -23,7 +23,6 @@ import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -500,7 +499,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		}
 
 		if (commandLine.hasOption(slots.getOpt())) {
-			effectiveConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
+			effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
 		}
 
 		if (isYarnPropertiesFileMode(commandLine)) {