You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/08/10 11:22:56 UTC
[2/3] flink git commit: [FLINK-6494] [RM][Yarn][Mesos] Migrate
ResourceManager/Yarn/Mesos configuration options
[FLINK-6494] [RM][Yarn][Mesos] Migrate ResourceManager/Yarn/Mesos configuration options
This closes #4075.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d63d704e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d63d704e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d63d704e
Branch: refs/heads/master
Commit: d63d704efb2bc28dd7a33ee9027f4d447acbd209
Parents: 6539180
Author: zjureel <zj...@gmail.com>
Authored: Thu Jun 8 11:38:56 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu Aug 10 11:36:30 2017 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 89 ++++++++++++++--
.../configuration/ResourceManagerOptions.java | 39 +++++++
.../flink/mesos/configuration/MesosOptions.java | 106 +++++++++++++++++++
.../MesosApplicationMasterRunner.java | 31 +++---
.../MesosFlinkResourceManager.java | 10 +-
.../flink/mesos/util/MesosArtifactServer.java | 5 +-
.../MesosFlinkResourceManagerTest.java | 6 +-
.../ContaineredTaskManagerParameters.java | 14 ++-
.../runtime/minicluster/FlinkMiniCluster.scala | 9 +-
.../minicluster/LocalFlinkMiniCluster.scala | 8 +-
.../java/org/apache/flink/yarn/UtilsTest.java | 19 ++--
.../YARNSessionCapacitySchedulerITCase.java | 6 +-
.../yarn/AbstractYarnClusterDescriptor.java | 10 +-
.../main/java/org/apache/flink/yarn/Utils.java | 17 +--
.../flink/yarn/YarnApplicationMasterRunner.java | 17 +--
.../flink/yarn/YarnFlinkResourceManager.java | 10 +-
.../apache/flink/yarn/YarnResourceManager.java | 3 +-
.../flink/yarn/cli/FlinkYarnSessionCli.java | 3 +-
.../yarn/configuration/YarnConfigOptions.java | 65 ++++++++++++
.../yarn/entrypoint/YarnEntrypointUtils.java | 13 +--
.../org/apache/flink/yarn/YarnJobManager.scala | 5 +-
.../flink/yarn/YarnClusterDescriptorTest.java | 3 +-
22 files changed, 375 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f817344..4c6c62a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -135,7 +135,9 @@ public final class ConfigConstants {
/**
* The config parameter defining the network port to connect to
* for communication with the resource manager.
+ * @deprecated Use {@link ResourceManagerOptions#IPC_PORT} instead.
*/
+ @Deprecated
public static final String RESOURCE_MANAGER_IPC_PORT_KEY = "resourcemanager.rpc.port";
/**
@@ -349,12 +351,16 @@ public final class ConfigConstants {
/**
* Percentage of heap space to remove from containers (YARN / Mesos), to compensate
* for other JVM memory usage.
+ * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead.
*/
+ @Deprecated
public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = "containerized.heap-cutoff-ratio";
/**
* Minimum amount of heap memory to remove in containers, as a safety margin.
+ * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead.
*/
+ @Deprecated
public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = "containerized.heap-cutoff-min";
/**
@@ -362,13 +368,17 @@ public final class ConfigConstants {
* For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
* containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
* in the flink-conf.yaml.
+ * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_MASTER_ENV_PREFIX} instead.
*/
+ @Deprecated
public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env.";
/**
* Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows
* setting custom environment variables for the workers (TaskManagers)
+ * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_TASK_MANAGER_ENV_PREFIX} instead.
*/
+ @Deprecated
public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";
@@ -376,7 +386,9 @@ public final class ConfigConstants {
/**
* The vcores exposed by YARN.
+ * @deprecated in favor of {@code YarnConfigOptions#VCORES}.
*/
+ @Deprecated
public static final String YARN_VCORES = "yarn.containers.vcores";
/**
@@ -406,7 +418,9 @@ public final class ConfigConstants {
* the YARN session / job on YARN.
*
* By default, we take the number of of initially requested containers.
+ * @deprecated in favor of {@code YarnConfigOptions#MAX_FAILED_CONTAINERS}.
*/
+ @Deprecated
public static final String YARN_MAX_FAILED_CONTAINERS = "yarn.maximum-failed-containers";
/**
@@ -414,14 +428,18 @@ public final class ConfigConstants {
* availability mode. This value is usually limited by YARN.
*
* By default, it's 1 in the standalone case and 2 in the high availability case.
+ * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_ATTEMPTS}.
*/
+ @Deprecated
public static final String YARN_APPLICATION_ATTEMPTS = "yarn.application-attempts";
/**
* The heartbeat interval between the Application Master and the YARN Resource Manager.
*
* The default value is 5 (seconds).
+ * @deprecated in favor of {@code YarnConfigOptions#HEARTBEAT_DELAY_SECONDS}.
*/
+ @Deprecated
public static final String YARN_HEARTBEAT_DELAY_SECONDS = "yarn.heartbeat-delay";
/**
@@ -429,8 +447,10 @@ public final class ConfigConstants {
* processing slots is written into a properties file, so that the Flink client is able
* to pick those details up.
* This configuration parameter allows changing the default location of that file (for example
- * for environments sharing a Flink installation between users)
+ * for environments sharing a Flink installation between users).
+ * @deprecated in favor of {@code YarnConfigOptions#PROPERTIES_FILE_LOCATION}.
*/
+ @Deprecated
public static final String YARN_PROPERTIES_FILE_LOCATION = "yarn.properties-file.location";
/**
@@ -474,12 +494,16 @@ public final class ConfigConstants {
* or a list of ranges and or points: "50100-50200,50300-50400,51234"
*
* Setting the port to 0 will let the OS choose an available port.
+ * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_MASTER_PORT}.
*/
+ @Deprecated
public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port";
/**
* A comma-separated list of strings to use as YARN application tags.
+ * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_TAGS}.
*/
+ @Deprecated
public static final String YARN_APPLICATION_TAGS = "yarn.tags";
@@ -487,7 +511,9 @@ public final class ConfigConstants {
/**
* The initial number of Mesos tasks to allocate.
+ * @deprecated in favor of {@code MesosOptions#INITIAL_TASKS}.
*/
+ @Deprecated
public static final String MESOS_INITIAL_TASKS = "mesos.initial-tasks";
/**
@@ -495,7 +521,9 @@ public final class ConfigConstants {
* the Mesos session / job on Mesos.
*
* By default, we take the number of of initially requested tasks.
+ * @deprecated in favor of {@code MesosOptions#MAX_FAILED_TASKS}.
*/
+ @Deprecated
public static final String MESOS_MAX_FAILED_TASKS = "mesos.maximum-failed-tasks";
/**
@@ -510,36 +538,53 @@ public final class ConfigConstants {
* file:///path/to/file (where file contains one of the above)
* }
* </pre>
- *
+ * @deprecated in favor of {@code MesosOptions#MASTER_URL}.
*/
+ @Deprecated
public static final String MESOS_MASTER_URL = "mesos.master";
/**
* The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down.
*
* The default value is 600 (seconds).
+ * @deprecated in favor of {@code MesosOptions#FAILOVER_TIMEOUT_SECONDS}.
*/
+ @Deprecated
public static final String MESOS_FAILOVER_TIMEOUT_SECONDS = "mesos.failover-timeout";
/**
* The config parameter defining the Mesos artifact server port to use.
* Setting the port to 0 will let the OS choose an available port.
+ * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_PORT_KEY}.
*/
+ @Deprecated
public static final String MESOS_ARTIFACT_SERVER_PORT_KEY = "mesos.resourcemanager.artifactserver.port";
+ /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_NAME}. */
+ @Deprecated
public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "mesos.resourcemanager.framework.name";
+ /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_ROLE}. */
+ @Deprecated
public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "mesos.resourcemanager.framework.role";
+ /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_PRINCIPAL}. */
+ @Deprecated
public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL = "mesos.resourcemanager.framework.principal";
+ /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_SECRET}. */
+ @Deprecated
public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = "mesos.resourcemanager.framework.secret";
+ /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_USER}. */
+ @Deprecated
public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "mesos.resourcemanager.framework.user";
/**
* Config parameter to override SSL support for the Artifact Server
+ * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}.
*/
+ @Deprecated
public static final String MESOS_ARTIFACT_SERVER_SSL_ENABLED = "mesos.resourcemanager.artifactserver.ssl.enabled";
// ------------------------ Hadoop Configuration ------------------------
@@ -1218,7 +1263,9 @@ public final class ConfigConstants {
/**
* The default network port of the resource manager.
+ * @deprecated Use {@link ResourceManagerOptions#IPC_PORT} instead.
*/
+ @Deprecated
public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;
/**
@@ -1378,13 +1425,17 @@ public final class ConfigConstants {
/**
* Minimum amount of memory to subtract from the process memory to get the TaskManager
* heap size. We came up with these values experimentally.
+ * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead.
*/
+ @Deprecated
public static final int DEFAULT_YARN_HEAP_CUTOFF = 600;
/**
* Relative amount of memory to subtract from Java process memory to get the TaskManager
- * heap size
+ * heap size.
+ * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead.
*/
+ @Deprecated
public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
/**
@@ -1395,31 +1446,49 @@ public final class ConfigConstants {
/**
* Default port for the application master is 0, which means
- * the operating system assigns an ephemeral port
+ * the operating system assigns an ephemeral port.
+ * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_MASTER_PORT}.
*/
+ @Deprecated
public static final String DEFAULT_YARN_JOB_MANAGER_PORT = "0";
// ------ Mesos-Specific Configuration ------
// For more configuration entries please see {@code MesosTaskManagerParameters}.
- /** The default failover timeout provided to Mesos (10 mins) */
+ /**
+ * The default failover timeout provided to Mesos (10 mins)
+ * @deprecated in favor of {@code MesosOptions#FAILOVER_TIMEOUT_SECONDS}.
+ */
+ @Deprecated
public static final int DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS = 10 * 60;
/**
* The default network port to listen on for the Mesos artifact server.
+ * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_PORT_KEY}.
*/
+ @Deprecated
public static final int DEFAULT_MESOS_ARTIFACT_SERVER_PORT = 0;
/**
* The default Mesos framework name for the ResourceManager to use.
+ * @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_NAME}.
*/
+ @Deprecated
public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "Flink";
+ /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_ROLE}. */
+ @Deprecated
public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*";
+ /** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_USER}. */
+ @Deprecated
public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
- /** Default value to override SSL support for the Artifact Server */
+ /**
+ * Default value to override SSL support for the Artifact Server.
+ * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}.
+ */
+ @Deprecated
public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
// ------------------------ File System Behavior ------------------------
@@ -1659,8 +1728,16 @@ public final class ConfigConstants {
public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1;
+ /**
+ * @deprecated Use {@link ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead.
+ */
+ @Deprecated
public static final String LOCAL_NUMBER_RESOURCE_MANAGER = "local.number-resourcemanager";
+ /**
+ * @deprecated Use {@link ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead.
+ */
+ @Deprecated
public static final int DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER = 1;
public static final String LOCAL_START_WEBSERVER = "local.start-webserver";
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index 6a09f19..e2d96bb 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -33,6 +33,45 @@ public class ResourceManagerOptions {
.key("resourcemanager.job.timeout")
.defaultValue("5 minutes");
+ public static final ConfigOption<Integer> LOCAL_NUMBER_RESOURCE_MANAGER = ConfigOptions
+ .key("local.number-resourcemanager")
+ .defaultValue(1);
+
+ public static final ConfigOption<Integer> IPC_PORT = ConfigOptions
+ .key("resourcemanager.rpc.port")
+ .defaultValue(0);
+
+ /**
+ * Percentage of heap space to remove from containers (YARN / Mesos), to compensate
+ * for other JVM memory usage.
+ */
+ public static final ConfigOption<Float> CONTAINERIZED_HEAP_CUTOFF_RATIO = ConfigOptions
+ .key("containerized.heap-cutoff-ratio")
+ .defaultValue(0.25f)
+ .withDeprecatedKeys("yarn.heap-cutoff-ratio");
+
+ /**
+ * Minimum amount of heap memory to remove in containers, as a safety margin.
+ */
+ public static final ConfigOption<Integer> CONTAINERIZED_HEAP_CUTOFF_MIN = ConfigOptions
+ .key("containerized.heap-cutoff-min")
+ .defaultValue(600)
+ .withDeprecatedKeys("yarn.heap-cutoff-min");
+
+ /**
+ * Prefix for passing custom environment variables to Flink's master process.
+ * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
+ * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
+ * in the flink-conf.yaml.
+ */
+ public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env.";
+
+ /**
+ * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows
+ * setting custom environment variables for the workers (TaskManagers)
+ */
+ public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";
+
// ---------------------------------------------------------------------------------------------
/** Not intended to be instantiated */
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
new file mode 100644
index 0000000..8616cad
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.configuration;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to mesos settings.
+ */
+public class MesosOptions {
+
+ /**
+ * The initial number of Mesos tasks to allocate.
+ */
+ public static final ConfigOption<Integer> INITIAL_TASKS =
+ key("mesos.initial-tasks")
+ .defaultValue(0);
+
+ /**
+ * The maximum number of failed Mesos tasks before entirely stopping
+ * the Mesos session / job on Mesos.
+ *
+ * <p>By default, we take the number of initially requested tasks.
+ */
+ public static final ConfigOption<Integer> MAX_FAILED_TASKS =
+ key("mesos.maximum-failed-tasks")
+ .defaultValue(-1);
+
+ /**
+ * The Mesos master URL.
+ *
+ * <p>The value should be in one of the following forms:
+ * <pre>
+ * {@code
+ * host:port
+ * zk://host1:port1,host2:port2,.../path
+ * zk://username:password@host1:port1,host2:port2,.../path
+ * file:///path/to/file (where file contains one of the above)
+ * }
+ * </pre>
+ */
+ public static final ConfigOption<String> MASTER_URL =
+ key("mesos.master")
+ .noDefaultValue();
+
+ /**
+ * The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down.
+ */
+ public static final ConfigOption<Integer> FAILOVER_TIMEOUT_SECONDS =
+ key("mesos.failover-timeout")
+ .defaultValue(600);
+
+ /**
+ * The config parameter defining the Mesos artifact server port to use.
+ * Setting the port to 0 will let the OS choose an available port.
+ */
+ public static final ConfigOption<Integer> ARTIFACT_SERVER_PORT =
+ key("mesos.resourcemanager.artifactserver.port")
+ .defaultValue(0);
+
+ public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_NAME =
+ key("mesos.resourcemanager.framework.name")
+ .defaultValue("Flink");
+
+ public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_ROLE =
+ key("mesos.resourcemanager.framework.role")
+ .defaultValue("*");
+
+ public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_PRINCIPAL =
+ key("mesos.resourcemanager.framework.principal")
+ .noDefaultValue();
+
+ public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_SECRET =
+ key("mesos.resourcemanager.framework.secret")
+ .noDefaultValue();
+
+ public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_USER =
+ key("mesos.resourcemanager.framework.user")
+ .defaultValue("");
+
+ /**
+ * Config parameter to override SSL support for the Artifact Server.
+ */
+ public static final ConfigOption<Boolean> ARTIFACT_SERVER_SSL_ENABLED =
+ key("mesos.resourcemanager.artifactserver.ssl.enabled")
+ .defaultValue(true);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index d4e2f0d..260b7f3 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,12 +18,12 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
@@ -264,8 +264,7 @@ public class MesosApplicationMasterRunner {
// try to start the artifact server
LOG.debug("Starting Artifact Server");
- final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
- ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
+ final int artifactServerPort = config.getInteger(MesosOptions.ARTIFACT_SERVER_PORT);
final String artifactServerPrefix = UUID.randomUUID().toString();
artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config);
@@ -491,42 +490,38 @@ public class MesosApplicationMasterRunner {
.setHostname(hostname);
Protos.Credential.Builder credential = null;
- if (!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
- throw new IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be configured.");
+ if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
+ throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured.");
}
- String masterUrl = flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null);
+ String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL);
Duration failoverTimeout = FiniteDuration.apply(
flinkConfig.getInteger(
- ConfigConstants.MESOS_FAILOVER_TIMEOUT_SECONDS,
- ConfigConstants.DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS),
+ MesosOptions.FAILOVER_TIMEOUT_SECONDS),
TimeUnit.SECONDS);
frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
frameworkInfo.setName(flinkConfig.getString(
- ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_NAME,
- ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME));
+ MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
frameworkInfo.setRole(flinkConfig.getString(
- ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE,
- ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE));
+ MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
frameworkInfo.setUser(flinkConfig.getString(
- ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
- ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
+ MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
- if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+ if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
frameworkInfo.setPrincipal(flinkConfig.getString(
- ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
+ MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL));
credential = Protos.Credential.newBuilder();
credential.setPrincipal(frameworkInfo.getPrincipal());
// some environments use a side-channel to communicate the secret to Mesos,
// and thus don't set the 'secret' configuration setting
- if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+ if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {
credential.setSecret(flinkConfig.getString(
- ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
+ MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index d6b5c9d..05d7e1f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -19,9 +19,9 @@
package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -641,7 +641,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
String msg = "Stopping Mesos session because the number of failed tasks ("
+ failedTasksSoFar + ") exceeded the maximum failed tasks ("
+ maxFailedTasks + "). This number is controlled by the '"
- + ConfigConstants.MESOS_MAX_FAILED_TASKS + "' configuration setting. "
+ + MesosOptions.MAX_FAILED_TASKS.key() + "' configuration setting. "
+ "By default its the number of requested tasks.";
LOG.error(msg);
@@ -757,18 +757,18 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
Logger log) {
final int numInitialTaskManagers = flinkConfig.getInteger(
- ConfigConstants.MESOS_INITIAL_TASKS, 0);
+ MesosOptions.INITIAL_TASKS);
if (numInitialTaskManagers >= 0) {
log.info("Mesos framework to allocate {} initial tasks",
numInitialTaskManagers);
}
else {
throw new IllegalConfigurationException("Invalid value for " +
- ConfigConstants.MESOS_INITIAL_TASKS + ", which must be at least zero.");
+ MesosOptions.INITIAL_TASKS.key() + ", which must be at least zero.");
}
final int maxFailedTasks = flinkConfig.getInteger(
- ConfigConstants.MESOS_MAX_FAILED_TASKS, numInitialTaskManagers);
+ MesosOptions.MAX_FAILED_TASKS.key(), numInitialTaskManagers);
if (maxFailedTasks >= 0) {
log.info("Mesos framework tolerates {} failed tasks before giving up",
maxFailedTasks);
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 2627d25..3a6f77a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -18,12 +18,12 @@
package org.apache.flink.mesos.util;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -115,8 +115,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
// Config to enable https access to the artifact server
boolean enableSSL = config.getBoolean(
- ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED,
- ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) &&
+ MesosOptions.ARTIFACT_SERVER_SSL_ENABLED) &&
SSLUtils.getSSLEnabled(config);
if (enableSSL) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index af3f7ef..8bfb4d1 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -105,8 +105,8 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
private static final long serialVersionUID = -952579203067648838L;
{
- setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
- setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
+ setInteger(MesosOptions.MAX_FAILED_TASKS, -1);
+ setInteger(MesosOptions.INITIAL_TASKS, 0);
}};
@BeforeClass
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index 9d679cf..7e9891f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.clusterframework;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import java.util.HashMap;
@@ -115,22 +115,20 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
// (1) compute how much memory we subtract from the total memory, to get the Java memory
final float memoryCutoffRatio = config.getFloat(
- ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
- ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
final int minCutoff = config.getInteger(
- ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
- ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
throw new IllegalArgumentException("The configuration value '"
- + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given="
+ + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given="
+ memoryCutoffRatio);
}
if (minCutoff >= containerMemoryMB) {
throw new IllegalArgumentException("The configuration value '"
- + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN + "'='" + minCutoff
+ + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff
+ "' is larger than the total container memory " + containerMemoryMB);
}
@@ -147,7 +145,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
// (3) obtain the additional environment variables from the configuration
final HashMap<String, String> envVars = new HashMap<>();
- final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
+ final String prefix = ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
for (String key : config.keySet()) {
if (key.startsWith(prefix) && key.length() > prefix.length()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index c5c87ac..6f13b9f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -28,7 +28,7 @@ import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.Config
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, TaskManagerOptions}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, ResourceManagerOptions, TaskManagerOptions}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
@@ -168,8 +168,7 @@ abstract class FlinkMiniCluster(
def getNumberOfResourceManagers: Int = {
originalConfiguration.getInteger(
- ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER,
- ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER
+ ResourceManagerOptions.LOCAL_NUMBER_RESOURCE_MANAGER
)
}
@@ -226,8 +225,8 @@ abstract class FlinkMiniCluster(
if (useSingleActorSystem) {
AkkaUtils.getAkkaConfig(originalConfiguration, None)
} else {
- val port = originalConfiguration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+ val port = originalConfiguration.getInteger(
+ ResourceManagerOptions.IPC_PORT)
val resolvedPort = if(port != 0) port + index else port
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 27a8ee1..0ae00a9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
import akka.actor.{ActorRef, ActorSystem, Props}
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, TaskManagerOptions}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, ResourceManagerOptions, TaskManagerOptions}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -183,11 +183,11 @@ class LocalFlinkMiniCluster(
val resourceManagerName = getResourceManagerName(index)
val resourceManagerPort = config.getInteger(
- ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+ ResourceManagerOptions.IPC_PORT)
if(resourceManagerPort > 0) {
- config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
+ config.setInteger(ResourceManagerOptions.IPC_PORT,
+ resourceManagerPort + index)
}
val resourceManagerProps = getResourceManagerProps(
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 82a656c..275bcc9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
@@ -62,8 +63,8 @@ public class UtilsTest {
@Test
public void testHeapCutoff() {
Configuration conf = new Configuration();
- conf.setDouble(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15);
- conf.setInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
+ conf.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15F);
+ conf.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf));
Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf));
@@ -71,14 +72,14 @@ public class UtilsTest {
// test different configuration
Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
- conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, "1000");
- conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.1");
+ conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key(), "1000");
+ conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.1");
Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
- conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.5");
+ conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.5");
Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
- conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1");
+ conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
// test also deprecated keys
@@ -93,21 +94,21 @@ public class UtilsTest {
@Test(expected = IllegalArgumentException.class)
public void illegalArgument() {
Configuration conf = new Configuration();
- conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1.1");
+ conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1.1");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}
@Test(expected = IllegalArgumentException.class)
public void illegalArgumentNegative() {
Configuration conf = new Configuration();
- conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "-0.01");
+ conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "-0.01");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}
@Test(expected = IllegalArgumentException.class)
public void tooMuchCutoff() {
Configuration conf = new Configuration();
- conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "6000");
+ conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "6000");
Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 2e88836..d85aa97 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.yarn;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.client.JobClient;
@@ -26,6 +25,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -145,7 +145,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
"-nm", "customName",
"-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3",
- "-D" + ConfigConstants.YARN_VCORES + "=2"},
+ "-D" + YarnConfigOptions.VCORES.key() + "=2"},
"Number of connected TaskManagers changed to 1. Slots available: 3",
RunTypes.YARN_SESSION);
@@ -186,7 +186,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
- Assert.assertEquals("2", parsedConfig.get(ConfigConstants.YARN_VCORES));
+ Assert.assertEquals("2", parsedConfig.get(YarnConfigOptions.VCORES.key()));
// -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface
// first, get the hostname/port
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index dfc8b6a..55dc47f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -253,7 +253,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// The number of cores can be configured in the config.
// If not configured, it is set to the number of task slots
int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
- int configuredVcores = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, clusterSpecification.getSlotsPerTaskManager());
+ int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
// don't configure more than the maximum configured number of vcores
if (configuredVcores > numYarnVcores) {
throw new IllegalConfigurationException(
@@ -261,7 +261,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
" but Yarn only has %d virtual cores available. Please note that the number" +
" of virtual cores is set to the number of task slots by default unless configured" +
" in the Flink config with '%s.'",
- configuredVcores, numYarnVcores, ConfigConstants.YARN_VCORES));
+ configuredVcores, numYarnVcores, YarnConfigOptions.VCORES.key()));
}
// check if required Hadoop environment variables are set. If not, warn user
@@ -677,7 +677,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
flinkConfiguration.getInteger(
- ConfigConstants.YARN_APPLICATION_ATTEMPTS,
+ YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
activateHighAvailabilitySupport(appContext);
@@ -685,7 +685,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
flinkConfiguration.getInteger(
- ConfigConstants.YARN_APPLICATION_ATTEMPTS,
+ YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
1));
}
@@ -1135,7 +1135,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
IllegalAccessException {
final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
- final String tagsString = flinkConfiguration.getString(ConfigConstants.YARN_APPLICATION_TAGS, "");
+ final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);
final Set<String> applicationTags = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 662617f..98d27ab 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -18,7 +18,7 @@
package org.apache.flink.yarn;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.security.SecurityUtils;
@@ -82,24 +82,17 @@ public final class Utils {
*/
public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) {
- BootstrapTools.substituteDeprecatedConfigKey(conf,
- ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
- BootstrapTools.substituteDeprecatedConfigKey(conf,
- ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
- float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
- ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
- int minCutoff = conf.getInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
- ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
+ float memoryCutoffRatio = conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
+ int minCutoff = conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
throw new IllegalArgumentException("The configuration value '"
- + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO
+ + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key()
+ "' must be between 0 and 1. Value given=" + memoryCutoffRatio);
}
if (minCutoff > memory) {
throw new IllegalArgumentException("The configuration value '"
- + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN
+ + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key()
+ "' is higher (" + minCutoff + ") than the requested amount of memory " + memory);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index dccbb71..e951df4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -43,6 +44,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@@ -299,8 +301,7 @@ public class YarnApplicationMasterRunner {
// try to start the actor system, JobManager and JobManager actor system
// using the port range definition from the config.
final String amPortRange = config.getString(
- ConfigConstants.YARN_APPLICATION_MASTER_PORT,
- ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+ YarnConfigOptions.APPLICATION_MASTER_PORT);
actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, amPortRange, LOG);
@@ -518,21 +519,13 @@ public class YarnApplicationMasterRunner {
// corresponding generic config keys instead. that way, later code needs not
// deal with deprecated config keys
- BootstrapTools.substituteDeprecatedConfigKey(configuration,
- ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
- ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-
- BootstrapTools.substituteDeprecatedConfigKey(configuration,
- ConfigConstants.YARN_HEAP_CUTOFF_MIN,
- ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
- ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+ ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
- ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+ ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
return configuration;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 66e44a6..4d8142f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -18,7 +18,6 @@
package org.apache.flink.yarn;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -28,6 +27,7 @@ import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.messages.ContainersAllocated;
import org.apache.flink.yarn.messages.ContainersComplete;
@@ -337,7 +337,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
// Resource requirements for worker containers
int taskManagerSlots = taskManagerParameters.numSlots();
- int vcores = config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1));
+ int vcores = config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1));
Resource capability = Resource.newInstance(containerMemorySizeMB, vcores);
resourceManagerClient.addContainerRequest(
@@ -550,7 +550,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
String msg = "Stopping YARN session because the number of failed containers ("
+ failedContainersSoFar + ") exceeded the maximum failed containers ("
+ maxFailedContainers + "). This number is controlled by the '"
- + ConfigConstants.YARN_MAX_FAILED_CONTAINERS + "' configuration setting. "
+ + YarnConfigOptions.MAX_FAILED_CONTAINERS.key() + "' configuration setting. "
+ "By default its the number of requested containers.";
LOG.error(msg);
@@ -710,7 +710,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
Logger log) {
final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
- ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
+ YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
final long yarnExpiryIntervalMS = yarnConfig.getLong(
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
@@ -723,7 +723,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
}
final int maxFailedContainers = flinkConfig.getInteger(
- ConfigConstants.YARN_MAX_FAILED_CONTAINERS, numInitialTaskManagers);
+ YarnConfigOptions.MAX_FAILED_CONTAINERS.key(), numInitialTaskManagers);
if (maxFailedContainers >= 0) {
log.info("YARN application tolerates {} failed TaskManager containers before giving up",
maxFailedContainers);
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 8327b6a..fb1a1c3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
@@ -134,7 +135,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
this.yarnConfig = new YarnConfiguration();
this.env = env;
final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
- ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
+ YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
final long yarnExpiryIntervalMS = yarnConfig.getLong(
YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 5d8abac..f2968b1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -37,6 +37,7 @@ import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterClient;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptorV2;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -773,7 +774,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
String currentUser = System.getProperty("user.name");
String propertiesFileLocation =
- conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
+ conf.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 28ef2ab..3773352 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -52,6 +52,71 @@ public class YarnConfigOptions {
key("yarn.per-job-cluster.include-user-jar")
.defaultValue("ORDER");
+ /**
+ * The vcores exposed by YARN.
+ */
+ public static final ConfigOption<Integer> VCORES =
+ key("yarn.containers.vcores")
+ .defaultValue(-1);
+
+ /**
+ * The maximum number of failed YARN containers before entirely stopping
+ * the YARN session / job on YARN.
+ * By default, we take the number of of initially requested containers.
+ *
+ * <p>Note: This option returns a String since Integer options must have a static default value.
+ */
+ public static final ConfigOption<String> MAX_FAILED_CONTAINERS =
+ key("yarn.maximum-failed-containers")
+ .noDefaultValue();
+
+ /**
+ * Set the number of retries for failed YARN ApplicationMasters/JobManagers in high
+ * availability mode. This value is usually limited by YARN.
+ * By default, it's 1 in the standalone case and 2 in the high availability case.
+ *
+ * <p>>Note: This option returns a String since Integer options must have a static default value.
+ */
+ public static final ConfigOption<String> APPLICATION_ATTEMPTS =
+ key("yarn.application-attempts")
+ .noDefaultValue();
+
+ /**
+ * The heartbeat interval between the Application Master and the YARN Resource Manager.
+ */
+ public static final ConfigOption<Integer> HEARTBEAT_DELAY_SECONDS =
+ key("yarn.heartbeat-delay")
+ .defaultValue(5);
+
+ /**
+ * When a Flink job is submitted to YARN, the JobManager's host and the number of available
+ * processing slots is written into a properties file, so that the Flink client is able
+ * to pick those details up.
+ * This configuration parameter allows changing the default location of that file (for example
+ * for environments sharing a Flink installation between users)
+ */
+ public static final ConfigOption<String> PROPERTIES_FILE_LOCATION =
+ key("yarn.properties-file.location")
+ .noDefaultValue();
+
+ /**
+ * The config parameter defining the Akka actor system port for the ApplicationMaster and
+ * JobManager.
+ * The port can either be a port, such as "9123",
+ * a range of ports: "50100-50200"
+ * or a list of ranges and or points: "50100-50200,50300-50400,51234".
+ * Setting the port to 0 will let the OS choose an available port.
+ */
+ public static final ConfigOption<String> APPLICATION_MASTER_PORT =
+ key("yarn.application-master.port")
+ .defaultValue("0");
+
+ /**
+ * A comma-separated list of strings to use as YARN application tags.
+ */
+ public static final ConfigOption<String> APPLICATION_TAGS =
+ key("yarn.tags")
+ .defaultValue("");
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 9ead775..e8fccac 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.security.SecurityContext;
@@ -112,21 +113,13 @@ public class YarnEntrypointUtils {
// corresponding generic config keys instead. that way, later code needs not
// deal with deprecated config keys
- BootstrapTools.substituteDeprecatedConfigKey(configuration,
- ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
- ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-
- BootstrapTools.substituteDeprecatedConfigKey(configuration,
- ConfigConstants.YARN_HEAP_CUTOFF_MIN,
- ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
- ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+ ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
- ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+ ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
final String keytabPath;
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index d78b390..a2d1668 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -22,7 +22,7 @@ import java.io.IOException
import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
import akka.actor.ActorRef
-import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
+import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.clusterframework.ContaineredJobManager
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.yarn.configuration.YarnConfigOptions
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -88,7 +89,7 @@ class YarnJobManager(
val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds
val YARN_HEARTBEAT_DELAY: FiniteDuration =
FiniteDuration(
- flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
+ flinkConfiguration.getInteger(YarnConfigOptions.HEARTBEAT_DELAY_SECONDS),
TimeUnit.SECONDS)
val yarnFilesPath: Option[String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES))
http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index bcb8559..19d1af5 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -87,7 +88,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
public void testConfigOverwrite() {
Configuration configuration = new Configuration();
// overwrite vcores in config
- configuration.setInteger(ConfigConstants.YARN_VCORES, Integer.MAX_VALUE);
+ configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration,