You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/25 07:32:23 UTC
[09/10] flink git commit: [FLINK-9033][config] Replace usages of
deprecated TASK_MANAGER_NUM_TASK_SLOTS
[FLINK-9033][config] Replace usages of deprecated TASK_MANAGER_NUM_TASK_SLOTS
This closes #5731.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73e9f901
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73e9f901
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73e9f901
Branch: refs/heads/master
Commit: 73e9f9013391a4e5be18f1cfc1f9462a78e95ca7
Parents: 1a9675d
Author: zhouhai02 <zh...@meituan.com>
Authored: Wed Mar 21 00:01:54 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 24 13:45:17 2018 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/client/LocalExecutor.java | 7 ++++---
.../flink/client/deployment/ClusterSpecification.java | 3 +--
.../kinesis/manualtests/ManualExactlyOnceTest.java | 2 +-
.../ManualExactlyOnceWithStreamReshardingTest.java | 2 +-
.../org/apache/flink/storm/api/FlinkLocalCluster.java | 3 +--
.../flink/runtime/clusterframework/BootstrapTools.java | 2 +-
.../runtime/minicluster/MiniClusterConfiguration.java | 3 ++-
.../runtime/taskexecutor/TaskManagerConfiguration.java | 2 +-
.../taskexecutor/TaskManagerServicesConfiguration.java | 4 ++--
.../flink/runtime/checkpoint/CoordinatorShutdownTest.java | 5 +++--
.../partition/PartialConsumePipelinedResultTest.java | 2 +-
.../flink/runtime/jobmanager/JobManagerCleanupITCase.java | 3 ++-
.../runtime/jobmanager/JobManagerHARecoveryTest.java | 4 ++--
.../apache/flink/runtime/jobmanager/JobManagerTest.java | 5 ++---
.../leaderelection/LeaderChangeJobRecoveryTest.java | 3 ++-
.../leaderelection/LeaderChangeStateCleanupTest.java | 3 ++-
.../backpressure/BackPressureStatsTrackerImplITCase.java | 4 ++--
.../backpressure/StackTraceSampleCoordinatorITCase.java | 4 ++--
.../org/apache/flink/runtime/akka/AkkaSslITCase.scala | 10 +++++-----
.../apache/flink/runtime/jobmanager/RecoveryITCase.scala | 4 ++--
.../apache/flink/runtime/testingUtils/TestingUtils.scala | 2 +-
.../org/apache/flink/api/scala/ScalaShellITCase.scala | 4 ++--
.../org/apache/flink/test/util/MiniClusterResource.java | 2 +-
.../java/org/apache/flink/test/util/TestBaseUtils.java | 2 +-
.../checkpointing/utils/SavepointMigrationTestBase.java | 3 ++-
.../flink/test/operators/ExecutionEnvironmentITCase.java | 4 ++--
.../flink/test/operators/RemoteEnvironmentITCase.java | 4 ++--
.../AbstractTaskManagerProcessFailureRecoveryTest.java | 2 +-
.../recovery/JobManagerHACheckpointRecoveryITCase.java | 3 ++-
.../JobManagerHAProcessFailureBatchRecoveryITCase.java | 3 +--
.../test/recovery/TaskManagerFailureRecoveryITCase.java | 2 +-
.../leaderelection/ZooKeeperLeaderElectionITCase.java | 3 ++-
.../runtime/minicluster/LocalFlinkMiniClusterITCase.java | 3 ++-
.../scala/runtime/jobmanager/JobManagerFailsITCase.scala | 2 +-
.../runtime/taskmanager/TaskManagerFailsITCase.scala | 5 ++---
.../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java | 3 +--
36 files changed, 63 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 01c281f..f837c4f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -139,7 +140,7 @@ public class LocalExecutor extends PlanExecutor {
.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(
configuration.getInteger(
- ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1))
+ TaskManagerOptions.NUM_TASK_SLOTS, 1))
.build();
final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
@@ -220,7 +221,7 @@ public class LocalExecutor extends PlanExecutor {
try {
// TODO: Set job's default parallelism to max number of slots
- final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
+ final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
@@ -265,7 +266,7 @@ public class LocalExecutor extends PlanExecutor {
private Configuration createConfiguration() {
Configuration newConfiguration = new Configuration();
- newConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
+ newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, getTaskManagerNumSlots());
newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, isDefaultOverwriteFiles());
newConfiguration.addAll(baseConfiguration);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index 8650cab..cf2ae4c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -18,7 +18,6 @@
package org.apache.flink.client.deployment;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -66,7 +65,7 @@ public final class ClusterSpecification {
}
public static ClusterSpecification fromConfiguration(Configuration configuration) {
- int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 963002f..40225fb 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -78,7 +78,7 @@ public class ManualExactlyOnceTest {
final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+ flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 93b9caf..34dcdc0 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -90,7 +90,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+ flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index bff8c80..6b0b503 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -18,7 +18,6 @@
package org.apache.flink.storm.api;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -90,7 +89,7 @@ public class FlinkLocalCluster {
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
this.flink = new LocalFlinkMiniCluster(configuration, true);
this.flink.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 102274d1..7a8403a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -247,7 +247,7 @@ public class BootstrapTools {
cfg.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, registrationTimeout.toString());
if (numSlots != -1){
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+ cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
}
return cfg;
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 44a567b..0a0c692 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.util.Preconditions;
@@ -167,7 +168,7 @@ public class MiniClusterConfiguration {
public MiniClusterConfiguration build() {
final Configuration modifiedConfiguration = new Configuration(configuration);
- modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+ modifiedConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTaskManager);
modifiedConfiguration.setString(
RestOptions.ADDRESS,
modifiedConfiguration.getString(RestOptions.ADDRESS, "localhost"));
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 1bf42ee..e8a7ae8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -162,7 +162,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
// --------------------------------------------------------------------------------------------
public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
- int numberSlots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
if (numberSlots == -1) {
numberSlots = 1;
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index d029bc5..b80320c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -196,7 +196,7 @@ public class TaskManagerServicesConfiguration {
boolean localCommunication) throws Exception {
// we need this because many configs have been written with a "-1" entry
- int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
if (slots == -1) {
slots = 1;
}
@@ -290,7 +290,7 @@ public class TaskManagerServicesConfiguration {
checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
"Leave config parameter empty or use 0 to let the system choose a port automatically.");
- checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+ checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
"Number of task slots must be at least one.");
final int pageSize = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 1d44444..8a6a9d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.execution.Environment;
@@ -59,7 +60,7 @@ public class CoordinatorShutdownTest extends TestLogger {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
cluster = new LocalFlinkMiniCluster(config, true);
cluster.start();
@@ -128,7 +129,7 @@ public class CoordinatorShutdownTest extends TestLogger {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
cluster = new LocalFlinkMiniCluster(config, true);
cluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index ced1a33..8080ca4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -55,7 +55,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
public static void setUp() throws Exception {
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
index 8806dec..33a4a28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -146,7 +147,7 @@ public class JobManagerCleanupITCase extends TestLogger {
try {
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT());
config.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index bb2dbf7..d991983 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -174,7 +174,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
- flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
+ flinkConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slots);
flinkConfiguration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 417294c..79e6d20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -641,7 +640,7 @@ public class JobManagerTest extends TestLogger {
Configuration tmConfig = new Configuration();
tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
- tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+ tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
tmConfig,
@@ -1300,7 +1299,7 @@ public class JobManagerTest extends TestLogger {
archiver = new AkkaActorGateway(master._2(), leaderId);
Configuration tmConfig = new Configuration();
- tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+ tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
tmConfig,
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 942fcf3..2ebaeba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -72,7 +73,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
configuration.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 9999);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index bbcbbf0..6880d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -78,7 +79,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
highAvailabilityServices = new TestingManualHighAvailabilityServices();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
index 994d02e..0e837a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.rest.handler.legacy.backpressure;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
@@ -138,7 +138,7 @@ public class BackPressureStatsTrackerImplITCase extends TestLogger {
config,
highAvailabilityServices);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
taskManager = TestingUtils.createTaskManager(
testActorSystem,
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
index 8fa302a..ccf5f60 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.rest.handler.legacy.backpressure;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
@@ -109,7 +109,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
config,
highAvailabilityServices);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
taskManager = TestingUtils.createTaskManager(
testActorSystem,
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index 72596cd..ebdae31 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -51,7 +51,7 @@ class AkkaSslITCase(_system: ActorSystem)
val config = new Configuration()
config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setBoolean(SecurityOptions.SSL_ENABLED, true)
@@ -78,7 +78,7 @@ class AkkaSslITCase(_system: ActorSystem)
val config = new Configuration()
config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setBoolean(SecurityOptions.SSL_ENABLED, true)
@@ -101,7 +101,7 @@ class AkkaSslITCase(_system: ActorSystem)
"start with akka ssl disabled" in {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setBoolean(SecurityOptions.SSL_ENABLED, false)
@@ -117,7 +117,7 @@ class AkkaSslITCase(_system: ActorSystem)
an[Exception] should be thrownBy {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
@@ -139,7 +139,7 @@ class AkkaSslITCase(_system: ActorSystem)
an[Exception] should be thrownBy {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 71d2ee9..a2dffc8 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -22,7 +22,7 @@ import akka.actor.{ActorSystem, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, TaskManagerOptions}
import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.io.network.partition.ResultPartitionType
import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobStatus, JobVertex}
@@ -59,7 +59,7 @@ class RecoveryITCase(_system: ActorSystem)
heartbeatTimeout: String)
: TestingCluster = {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers)
config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, heartbeatTimeout)
new TestingCluster(config)
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 2d8d02d..b89d2a6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -88,7 +88,7 @@ object TestingUtils {
def startTestingCluster(numSlots: Int, numTMs: Int = 1,
timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): TestingCluster = {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs)
config.setString(AkkaOptions.ASK_TIMEOUT, timeout)
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index cb6231a..2e07fb9 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -22,8 +22,8 @@ import java.io._
import akka.actor.ActorRef
import akka.pattern.Patterns
+import org.apache.flink.configuration.{Configuration, CoreOptions, TaskManagerOptions}
import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
-import org.apache.flink.configuration.{ConfigConstants, Configuration, CoreOptions, GlobalConfiguration}
import org.apache.flink.runtime.clusterframework.BootstrapTools
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.util.TestLogger
@@ -321,7 +321,7 @@ object ScalaShellITCase {
@BeforeClass
def beforeAll(): Unit = {
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism)
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
cluster = Option(new StandaloneMiniCluster(configuration))
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 8c21b37..844de01 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -198,7 +198,7 @@ public class MiniClusterResource extends ExternalResource {
private void startLegacyMiniCluster() throws Exception {
final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
final LocalFlinkMiniCluster flinkMiniCluster = TestBaseUtils.startCluster(
configuration,
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index dd255fd..7e9b12e 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -127,7 +127,7 @@ public class TestBaseUtils extends TestLogger {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, startWebserver);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 8f2aaa1..cfa155b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -99,7 +100,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, DEFAULT_PARALLELISM);
UUID id = UUID.randomUUID();
final File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" + id).getAbsoluteFile();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
index f7f993b..65a21d1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ExecutionEnvironmentITCase.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
@@ -50,7 +50,7 @@ public class ExecutionEnvironmentITCase extends TestLogger {
@Test
public void testLocalEnvironmentWithConfig() throws Exception {
Configuration conf = new Configuration();
- conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+ conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
index a3a551c..c2d6341 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -96,7 +96,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
resource = miniCluster;
} else {
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, TM_SLOTS);
final StandaloneMiniCluster standaloneMiniCluster = new StandaloneMiniCluster(configuration);
hostname = standaloneMiniCluster.getHostname();
port = standaloneMiniCluster.getPort();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 29516dc..6764f6f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -406,7 +406,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+ cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
TaskManager.selectNetworkInterfaceAndRunTaskManager(cfg,
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 16ea6d5..ce750d3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -365,7 +366,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
config.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints);
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, temporaryFolder.newFolder().toString());
String tmpFolderString = temporaryFolder.newFolder().toString();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index d217a2a..50404a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -270,7 +269,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
// Task manager configuration
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
config,
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 8371230..755b3d3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -77,7 +77,7 @@ public class TaskManagerFailureRecoveryITCase extends TestLogger {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "500 ms");
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index f348b8a..399fc10 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -157,7 +158,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+ configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
// sure that all TMs have registered to the JM prior to issuing the RecoverAllJobs message
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 770b88c..b3d92df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.runtime.minicluster;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -76,7 +77,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
miniCluster = new LocalFlinkMiniCluster(config, true);
miniCluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 5fe7b1d..5c9b1fb 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -133,7 +133,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
config.setInteger(JobManagerOptions.PORT, 0)
config.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "50 ms")
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index a065e5b..69ad2d7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -20,8 +20,7 @@ package org.apache.flink.api.scala.runtime.taskmanager
import akka.actor.{ActorSystem, Kill, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.ConfigConstants
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{ConfigConstants, Configuration, TaskManagerOptions}
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.runtime.io.network.partition.ResultPartitionType
@@ -243,7 +242,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = {
val config = new Configuration()
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
new TestingCluster(config, singleActorSystem = false)
http://git-wip-us.apache.org/repos/asf/flink/blob/73e9f901/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index fe04662..7596d68 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -23,7 +23,6 @@ import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -500,7 +499,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
}
if (commandLine.hasOption(slots.getOpt())) {
- effectiveConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
+ effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));
}
if (isYarnPropertiesFileMode(commandLine)) {