You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/08/10 11:22:56 UTC

[2/3] flink git commit: [FLINK-6494] [RM][Yarn][Mesos] Migrate ResourceManager/Yarn/Mesos configuration options

[FLINK-6494] [RM][Yarn][Mesos] Migrate ResourceManager/Yarn/Mesos configuration options

This closes #4075.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d63d704e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d63d704e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d63d704e

Branch: refs/heads/master
Commit: d63d704efb2bc28dd7a33ee9027f4d447acbd209
Parents: 6539180
Author: zjureel <zj...@gmail.com>
Authored: Thu Jun 8 11:38:56 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu Aug 10 11:36:30 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  89 ++++++++++++++--
 .../configuration/ResourceManagerOptions.java   |  39 +++++++
 .../flink/mesos/configuration/MesosOptions.java | 106 +++++++++++++++++++
 .../MesosApplicationMasterRunner.java           |  31 +++---
 .../MesosFlinkResourceManager.java              |  10 +-
 .../flink/mesos/util/MesosArtifactServer.java   |   5 +-
 .../MesosFlinkResourceManagerTest.java          |   6 +-
 .../ContaineredTaskManagerParameters.java       |  14 ++-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   9 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   8 +-
 .../java/org/apache/flink/yarn/UtilsTest.java   |  19 ++--
 .../YARNSessionCapacitySchedulerITCase.java     |   6 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |  10 +-
 .../main/java/org/apache/flink/yarn/Utils.java  |  17 +--
 .../flink/yarn/YarnApplicationMasterRunner.java |  17 +--
 .../flink/yarn/YarnFlinkResourceManager.java    |  10 +-
 .../apache/flink/yarn/YarnResourceManager.java  |   3 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   3 +-
 .../yarn/configuration/YarnConfigOptions.java   |  65 ++++++++++++
 .../yarn/entrypoint/YarnEntrypointUtils.java    |  13 +--
 .../org/apache/flink/yarn/YarnJobManager.scala  |   5 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   |   3 +-
 22 files changed, 375 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f817344..4c6c62a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -135,7 +135,9 @@ public final class ConfigConstants {
 	/**
 	 * The config parameter defining the network port to connect to
 	 * for communication with the resource manager.
+	 * @deprecated Use {@link ResourceManagerOptions#IPC_PORT} instead.
 	 */
+	@Deprecated
 	public static final String RESOURCE_MANAGER_IPC_PORT_KEY = "resourcemanager.rpc.port";
 
 	/**
@@ -349,12 +351,16 @@ public final class ConfigConstants {
 	/**
 	 * Percentage of heap space to remove from containers (YARN / Mesos), to compensate
 	 * for other JVM memory usage.
+	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead.
 	 */
+	@Deprecated
 	public static final String CONTAINERIZED_HEAP_CUTOFF_RATIO = "containerized.heap-cutoff-ratio";
 
 	/**
 	 * Minimum amount of heap memory to remove in containers, as a safety margin.
+	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead.
 	 */
+	@Deprecated
 	public static final String CONTAINERIZED_HEAP_CUTOFF_MIN = "containerized.heap-cutoff-min";
 
 	/**
@@ -362,13 +368,17 @@ public final class ConfigConstants {
 	 * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
 	 * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 	 * in the flink-conf.yaml.
+	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_MASTER_ENV_PREFIX} instead.
 	 */
+	@Deprecated
 	public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env.";
 
 	/**
 	 * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows
 	 * setting custom environment variables for the workers (TaskManagers)
+	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_TASK_MANAGER_ENV_PREFIX} instead.
 	 */
+	@Deprecated
 	public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";
 
 	
@@ -376,7 +386,9 @@ public final class ConfigConstants {
 
 	/**
 	 * The vcores exposed by YARN.
+	 * @deprecated in favor of {@code YarnConfigOptions#VCORES}.
 	 */
+	@Deprecated
 	public static final String YARN_VCORES = "yarn.containers.vcores";
 
 	/**
@@ -406,7 +418,9 @@ public final class ConfigConstants {
 	 * the YARN session / job on YARN.
 	 *
 	 * By default, we take the number of of initially requested containers.
+	 * @deprecated in favor of {@code YarnConfigOptions#MAX_FAILED_CONTAINERS}.
 	 */
+	@Deprecated
 	public static final String YARN_MAX_FAILED_CONTAINERS = "yarn.maximum-failed-containers";
 
 	/**
@@ -414,14 +428,18 @@ public final class ConfigConstants {
 	 * availability mode. This value is usually limited by YARN.
 	 *
 	 * By default, it's 1 in the standalone case and 2 in the high availability case.
+	 * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_ATTEMPTS}.
 	 */
+	@Deprecated
 	public static final String YARN_APPLICATION_ATTEMPTS = "yarn.application-attempts";
 
 	/**
 	 * The heartbeat interval between the Application Master and the YARN Resource Manager.
 	 *
 	 * The default value is 5 (seconds).
+	 * @deprecated in favor of {@code YarnConfigOptions#HEARTBEAT_DELAY_SECONDS}.
 	 */
+	@Deprecated
 	public static final String YARN_HEARTBEAT_DELAY_SECONDS = "yarn.heartbeat-delay";
 
 	/**
@@ -429,8 +447,10 @@ public final class ConfigConstants {
 	 * processing slots is written into a properties file, so that the Flink client is able
 	 * to pick those details up.
 	 * This configuration parameter allows changing the default location of that file (for example
-	 * for environments sharing a Flink installation between users)
+	 * for environments sharing a Flink installation between users).
+	 * @deprecated in favor of {@code YarnConfigOptions#PROPERTIES_FILE_LOCATION}.
 	 */
+	@Deprecated
 	public static final String YARN_PROPERTIES_FILE_LOCATION = "yarn.properties-file.location";
 
 	/**
@@ -474,12 +494,16 @@ public final class ConfigConstants {
 	 * or a list of ranges and or points: "50100-50200,50300-50400,51234"
 	 *
 	 * Setting the port to 0 will let the OS choose an available port.
+	 * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_MASTER_PORT}.
 	 */
+	@Deprecated
 	public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port";
 
 	/**
 	 * A comma-separated list of strings to use as YARN application tags.
+	 * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_TAGS}.
 	 */
+	@Deprecated
 	public static final String YARN_APPLICATION_TAGS = "yarn.tags";
 
 
@@ -487,7 +511,9 @@ public final class ConfigConstants {
 
 	/**
 	 * The initial number of Mesos tasks to allocate.
+	 * @deprecated in favor of {@code MesosOptions#INITIAL_TASKS}.
 	 */
+	@Deprecated
 	public static final String MESOS_INITIAL_TASKS = "mesos.initial-tasks";
 
 	/**
@@ -495,7 +521,9 @@ public final class ConfigConstants {
 	 * the Mesos session / job on Mesos.
 	 *
 	 * By default, we take the number of of initially requested tasks.
+	 * @deprecated in favor of {@code MesosOptions#MAX_FAILED_TASKS}.
 	 */
+	@Deprecated
 	public static final String MESOS_MAX_FAILED_TASKS = "mesos.maximum-failed-tasks";
 
 	/**
@@ -510,36 +538,53 @@ public final class ConfigConstants {
 	 *     file:///path/to/file (where file contains one of the above)
 	 * }
 	 * </pre>
-	 *
+	 * @deprecated in favor of {@code MesosOptions#MASTER_URL}.
 	 */
+	@Deprecated
 	public static final String MESOS_MASTER_URL = "mesos.master";
 
 	/**
 	 * The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down.
 	 *
 	 * The default value is 600 (seconds).
+	 * @deprecated in favor of {@code MesosOptions#FAILOVER_TIMEOUT_SECONDS}.
 	 */
+	@Deprecated
 	public static final String MESOS_FAILOVER_TIMEOUT_SECONDS = "mesos.failover-timeout";
 
 	/**
 	 * The config parameter defining the Mesos artifact server port to use.
 	 * Setting the port to 0 will let the OS choose an available port.
+	 * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_PORT_KEY}.
 	 */
+	@Deprecated
 	public static final String MESOS_ARTIFACT_SERVER_PORT_KEY = "mesos.resourcemanager.artifactserver.port";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_NAME}. */
+	@Deprecated
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "mesos.resourcemanager.framework.name";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_ROLE}. */
+	@Deprecated
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "mesos.resourcemanager.framework.role";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_PRINCIPAL}. */
+	@Deprecated
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL = "mesos.resourcemanager.framework.principal";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_SECRET}. */
+	@Deprecated
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = "mesos.resourcemanager.framework.secret";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_USER}. */
+	@Deprecated
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "mesos.resourcemanager.framework.user";
 
 	/**
 	 * Config parameter to override SSL support for the Artifact Server
+	 * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}.
 	 */
+	@Deprecated
 	public static final String MESOS_ARTIFACT_SERVER_SSL_ENABLED = "mesos.resourcemanager.artifactserver.ssl.enabled";
 
 	// ------------------------ Hadoop Configuration ------------------------
@@ -1218,7 +1263,9 @@ public final class ConfigConstants {
 
 	/**
 	 * The default network port of the resource manager.
+	 * @deprecated Use {@link ResourceManagerOptions#IPC_PORT} instead.
 	 */
+	@Deprecated
 	public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;
 
 	/**
@@ -1378,13 +1425,17 @@ public final class ConfigConstants {
 	/**
 	 * Minimum amount of memory to subtract from the process memory to get the TaskManager
 	 * heap size. We came up with these values experimentally.
+	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_MIN} instead.
 	 */
+	@Deprecated
 	public static final int DEFAULT_YARN_HEAP_CUTOFF = 600;
 
 	/**
 	 * Relative amount of memory to subtract from Java process memory to get the TaskManager
-	 * heap size
+	 * heap size.
+	 * @deprecated Use {@link ResourceManagerOptions#CONTAINERIZED_HEAP_CUTOFF_RATIO} instead.
 	 */
+	@Deprecated
 	public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
 
 	/**
@@ -1395,31 +1446,49 @@ public final class ConfigConstants {
 
 	/**
 	 * Default port for the application master is 0, which means
-	 * the operating system assigns an ephemeral port
+	 * the operating system assigns an ephemeral port.
+	 * @deprecated in favor of {@code YarnConfigOptions#APPLICATION_MASTER_PORT}.
 	 */
+	@Deprecated
 	public static final String DEFAULT_YARN_JOB_MANAGER_PORT = "0";
 
 	// ------ Mesos-Specific Configuration ------
 	// For more configuration entries please see {@code MesosTaskManagerParameters}.
 
-	/** The default failover timeout provided to Mesos (10 mins) */
+	/**
+	 * The default failover timeout provided to Mesos (10 mins)
+	 * @deprecated in favor of {@code MesosOptions#FAILOVER_TIMEOUT_SECONDS}.
+	 */
+	@Deprecated
 	public static final int DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS = 10 * 60;
 
 	/**
 	 * The default network port to listen on for the Mesos artifact server.
+	 * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_PORT_KEY}.
 	 */
+	@Deprecated
 	public static final int DEFAULT_MESOS_ARTIFACT_SERVER_PORT = 0;
 
 	/**
 	 * The default Mesos framework name for the ResourceManager to use.
+	 * @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_NAME}.
 	 */
+	@Deprecated
 	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME = "Flink";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_ROLE}. */
+	@Deprecated
 	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*";
 
+	/** @deprecated in favor of {@code MesosOptions#RESOURCEMANAGER_FRAMEWORK_USER}. */
+	@Deprecated
 	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
 
-	/** Default value to override SSL support for the Artifact Server */
+	/**
+	 * Default value to override SSL support for the Artifact Server.
+	 * @deprecated in favor of {@code MesosOptions#ARTIFACT_SERVER_SSL_ENABLED}.
+	 */
+	@Deprecated
 	public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
 
 	// ------------------------ File System Behavior ------------------------
@@ -1659,8 +1728,16 @@ public final class ConfigConstants {
 
 	public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1;
 
+	/**
+	 * @deprecated Use {@link ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead.
+	 */
+	@Deprecated
 	public static final String LOCAL_NUMBER_RESOURCE_MANAGER = "local.number-resourcemanager";
 
+	/**
+	 * @deprecated Use {@link ResourceManagerOptions#LOCAL_NUMBER_RESOURCE_MANAGER} instead.
+	 */
+	@Deprecated
 	public static final int DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER = 1;
 
 	public static final String LOCAL_START_WEBSERVER = "local.start-webserver";

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index 6a09f19..e2d96bb 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -33,6 +33,45 @@ public class ResourceManagerOptions {
 		.key("resourcemanager.job.timeout")
 		.defaultValue("5 minutes");
 
+	public static final ConfigOption<Integer> LOCAL_NUMBER_RESOURCE_MANAGER = ConfigOptions
+		.key("local.number-resourcemanager")
+		.defaultValue(1);
+
+	public static final ConfigOption<Integer> IPC_PORT = ConfigOptions
+		.key("resourcemanager.rpc.port")
+		.defaultValue(0);
+
+	/**
+	 * Percentage of heap space to remove from containers (YARN / Mesos), to compensate
+	 * for other JVM memory usage.
+	 */
+	public static final ConfigOption<Float> CONTAINERIZED_HEAP_CUTOFF_RATIO = ConfigOptions
+		.key("containerized.heap-cutoff-ratio")
+		.defaultValue(0.25f)
+		.withDeprecatedKeys("yarn.heap-cutoff-ratio");
+
+	/**
+	 * Minimum amount of heap memory to remove in containers, as a safety margin.
+	 */
+	public static final ConfigOption<Integer> CONTAINERIZED_HEAP_CUTOFF_MIN = ConfigOptions
+		.key("containerized.heap-cutoff-min")
+		.defaultValue(600)
+		.withDeprecatedKeys("yarn.heap-cutoff-min");
+
+	/**
+	 * Prefix for passing custom environment variables to Flink's master process.
+	 * For example for passing LD_LIBRARY_PATH as an env variable to the AppMaster, set:
+	 * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
+	 * in the flink-conf.yaml.
+	 */
+	public static final String CONTAINERIZED_MASTER_ENV_PREFIX = "containerized.master.env.";
+
+	/**
+	 * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration prefix allows
+	 * setting custom environment variables for the workers (TaskManagers)
+	 */
+	public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = "containerized.taskmanager.env.";
+	
 	// ---------------------------------------------------------------------------------------------
 
 	/** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
new file mode 100644
index 0000000..8616cad
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.configuration;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to mesos settings.
+ */
+public class MesosOptions {
+
+	/**
+	 * The initial number of Mesos tasks to allocate.
+	 */
+	public static final ConfigOption<Integer> INITIAL_TASKS =
+		key("mesos.initial-tasks")
+			.defaultValue(0);
+
+	/**
+	 * The maximum number of failed Mesos tasks before entirely stopping
+	 * the Mesos session / job on Mesos.
+	 *
+	 * <p>By default, we take the number of initially requested tasks.
+	 */
+	public static final ConfigOption<Integer> MAX_FAILED_TASKS =
+		key("mesos.maximum-failed-tasks")
+			.defaultValue(-1);
+
+	/**
+	 * The Mesos master URL.
+	 *
+	 * <p>The value should be in one of the following forms:
+	 * <pre>
+	 * {@code
+	 *     host:port
+	 *     zk://host1:port1,host2:port2,.../path
+	 *     zk://username:password@host1:port1,host2:port2,.../path
+	 *     file:///path/to/file (where file contains one of the above)
+	 * }
+	 * </pre>
+	 */
+	public static final ConfigOption<String> MASTER_URL =
+		key("mesos.master")
+			.noDefaultValue();
+
+	/**
+	 * The failover timeout for the Mesos scheduler, after which running tasks are automatically shut down.
+	 */
+	public static final ConfigOption<Integer> FAILOVER_TIMEOUT_SECONDS =
+		key("mesos.failover-timeout")
+			.defaultValue(600);
+
+	/**
+	 * The config parameter defining the Mesos artifact server port to use.
+	 * Setting the port to 0 will let the OS choose an available port.
+	 */
+	public static final ConfigOption<Integer> ARTIFACT_SERVER_PORT =
+		key("mesos.resourcemanager.artifactserver.port")
+			.defaultValue(0);
+
+	public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_NAME =
+		key("mesos.resourcemanager.framework.name")
+			.defaultValue("Flink");
+
+	public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_ROLE =
+		key("mesos.resourcemanager.framework.role")
+			.defaultValue("*");
+
+	public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_PRINCIPAL =
+		key("mesos.resourcemanager.framework.principal")
+			.noDefaultValue();
+
+	public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_SECRET =
+		key("mesos.resourcemanager.framework.secret")
+			.noDefaultValue();
+
+	public static final ConfigOption<String> RESOURCEMANAGER_FRAMEWORK_USER =
+		key("mesos.resourcemanager.framework.user")
+			.defaultValue("");
+
+	/**
+	 * Config parameter to override SSL support for the Artifact Server.
+	 */
+	public static final ConfigOption<Boolean> ARTIFACT_SERVER_SSL_ENABLED =
+		key("mesos.resourcemanager.artifactserver.ssl.enabled")
+			.defaultValue(true);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index d4e2f0d..260b7f3 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
@@ -264,8 +264,7 @@ public class MesosApplicationMasterRunner {
 
 			// try to start the artifact server
 			LOG.debug("Starting Artifact Server");
-			final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
-				ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
+			final int artifactServerPort = config.getInteger(MesosOptions.ARTIFACT_SERVER_PORT);
 			final String artifactServerPrefix = UUID.randomUUID().toString();
 			artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config);
 
@@ -491,42 +490,38 @@ public class MesosApplicationMasterRunner {
 			.setHostname(hostname);
 		Protos.Credential.Builder credential = null;
 
-		if (!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
-			throw new IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be configured.");
+		if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
+			throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured.");
 		}
-		String masterUrl = flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null);
+		String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL);
 
 		Duration failoverTimeout = FiniteDuration.apply(
 			flinkConfig.getInteger(
-				ConfigConstants.MESOS_FAILOVER_TIMEOUT_SECONDS,
-				ConfigConstants.DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS),
+				MesosOptions.FAILOVER_TIMEOUT_SECONDS),
 			TimeUnit.SECONDS);
 		frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
 
 		frameworkInfo.setName(flinkConfig.getString(
-			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_NAME,
-			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_NAME));
+			MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
 
 		frameworkInfo.setRole(flinkConfig.getString(
-			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE,
-			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE));
+			MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
 
 		frameworkInfo.setUser(flinkConfig.getString(
-			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
-			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
+			MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
 
-		if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+		if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
 			frameworkInfo.setPrincipal(flinkConfig.getString(
-				ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
+				MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL));
 
 			credential = Protos.Credential.newBuilder();
 			credential.setPrincipal(frameworkInfo.getPrincipal());
 
 			// some environments use a side-channel to communicate the secret to Mesos,
 			// and thus don't set the 'secret' configuration setting
-			if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+			if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET)) {
 				credential.setSecret(flinkConfig.getString(
-					ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
+					MesosOptions.RESOURCEMANAGER_FRAMEWORK_SECRET));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index d6b5c9d..05d7e1f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -19,9 +19,9 @@
 package org.apache.flink.mesos.runtime.clusterframework;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -641,7 +641,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 				String msg = "Stopping Mesos session because the number of failed tasks ("
 					+ failedTasksSoFar + ") exceeded the maximum failed tasks ("
 					+ maxFailedTasks + "). This number is controlled by the '"
-					+ ConfigConstants.MESOS_MAX_FAILED_TASKS + "' configuration setting. "
+					+ MesosOptions.MAX_FAILED_TASKS.key() + "' configuration setting. "
 					+ "By default its the number of requested tasks.";
 
 				LOG.error(msg);
@@ -757,18 +757,18 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			Logger log) {
 
 		final int numInitialTaskManagers = flinkConfig.getInteger(
-			ConfigConstants.MESOS_INITIAL_TASKS, 0);
+			MesosOptions.INITIAL_TASKS);
 		if (numInitialTaskManagers >= 0) {
 			log.info("Mesos framework to allocate {} initial tasks",
 				numInitialTaskManagers);
 		}
 		else {
 			throw new IllegalConfigurationException("Invalid value for " +
-				ConfigConstants.MESOS_INITIAL_TASKS + ", which must be at least zero.");
+				MesosOptions.INITIAL_TASKS.key() + ", which must be at least zero.");
 		}
 
 		final int maxFailedTasks = flinkConfig.getInteger(
-			ConfigConstants.MESOS_MAX_FAILED_TASKS, numInitialTaskManagers);
+			MesosOptions.MAX_FAILED_TASKS.key(), numInitialTaskManagers);
 		if (maxFailedTasks >= 0) {
 			log.info("Mesos framework tolerates {} failed tasks before giving up",
 				maxFailedTasks);

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index 2627d25..3a6f77a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.mesos.util;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.runtime.net.SSLUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -115,8 +115,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 
 		// Config to enable https access to the artifact server
 		boolean enableSSL = config.getBoolean(
-				ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED,
-				ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) &&
+				MesosOptions.ARTIFACT_SERVER_SSL_ENABLED) &&
 				SSLUtils.getSSLEnabled(config);
 
 		if (enableSSL) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index af3f7ef..8bfb4d1 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.configuration.MesosOptions;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
@@ -105,8 +105,8 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 		private static final long serialVersionUID = -952579203067648838L;
 
 		{
-			setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
-			setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
+			setInteger(MesosOptions.MAX_FAILED_TASKS, -1);
+			setInteger(MesosOptions.INITIAL_TASKS, 0);
 	}};
 
 	@BeforeClass

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index 9d679cf..7e9891f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 
 import java.util.HashMap;
@@ -115,22 +115,20 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 		// (1) compute how much memory we subtract from the total memory, to get the Java memory
 
 		final float memoryCutoffRatio = config.getFloat(
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
-			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
+			ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
 
 		final int minCutoff = config.getInteger(
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
-			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
+			ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
 
 		if (memoryCutoffRatio >= 1 || memoryCutoffRatio <= 0) {
 			throw new IllegalArgumentException("The configuration value '"
-				+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO + "' must be between 0 and 1. Value given="
+				+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key() + "' must be between 0 and 1. Value given="
 				+ memoryCutoffRatio);
 		}
 
 		if (minCutoff >= containerMemoryMB) {
 			throw new IllegalArgumentException("The configuration value '"
-				+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN + "'='" + minCutoff
+				+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key() + "'='" + minCutoff
 				+ "' is larger than the total container memory " + containerMemoryMB);
 		}
 
@@ -147,7 +145,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
 		// (3) obtain the additional environment variables from the configuration
 		final HashMap<String, String> envVars = new HashMap<>();
-		final String prefix = ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
+		final String prefix = ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
 		
 		for (String key : config.keySet()) {
 			if (key.startsWith(prefix) && key.length() > prefix.length()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index c5c87ac..6f13b9f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -28,7 +28,7 @@ import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, TaskManagerOptions}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, ResourceManagerOptions, TaskManagerOptions}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
@@ -168,8 +168,7 @@ abstract class FlinkMiniCluster(
 
   def getNumberOfResourceManagers: Int = {
     originalConfiguration.getInteger(
-      ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER,
-      ConfigConstants.DEFAULT_LOCAL_NUMBER_RESOURCE_MANAGER
+      ResourceManagerOptions.LOCAL_NUMBER_RESOURCE_MANAGER
     )
   }
 
@@ -226,8 +225,8 @@ abstract class FlinkMiniCluster(
     if (useSingleActorSystem) {
       AkkaUtils.getAkkaConfig(originalConfiguration, None)
     } else {
-      val port = originalConfiguration.getInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-                                                  ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+      val port = originalConfiguration.getInteger(
+        ResourceManagerOptions.IPC_PORT)
 
       val resolvedPort = if(port != 0) port + index else port
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 27a8ee1..0ae00a9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
 import akka.actor.{ActorRef, ActorSystem, Props}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, TaskManagerOptions}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, ResourceManagerOptions, TaskManagerOptions}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -183,11 +183,11 @@ class LocalFlinkMiniCluster(
     val resourceManagerName = getResourceManagerName(index)
 
     val resourceManagerPort = config.getInteger(
-      ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+      ResourceManagerOptions.IPC_PORT)
 
     if(resourceManagerPort > 0) {
-      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index)
+      config.setInteger(ResourceManagerOptions.IPC_PORT,
+        resourceManagerPort + index)
     }
 
     val resourceManagerProps = getResourceManagerProps(

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 82a656c..275bcc9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
 
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Level;
@@ -62,8 +63,8 @@ public class UtilsTest {
 	@Test
 	public void testHeapCutoff() {
 		Configuration conf = new Configuration();
-		conf.setDouble(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15);
-		conf.setInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
+		conf.setFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15F);
+		conf.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
 
 		Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf));
 		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf));
@@ -71,14 +72,14 @@ public class UtilsTest {
 		// test different configuration
 		Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
 
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, "1000");
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.1");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key(), "1000");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.1");
 		Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
 
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "0.5");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "0.5");
 		Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
 
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 
 		// test also deprecated keys
@@ -93,21 +94,21 @@ public class UtilsTest {
 	@Test(expected = IllegalArgumentException.class)
 	public void illegalArgument() {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "1.1");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "1.1");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void illegalArgumentNegative() {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "-0.01");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "-0.01");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void tooMuchCutoff() {
 		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, "6000");
+		conf.setString(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key(), "6000");
 		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 2e88836..d85aa97 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.client.JobClient;
@@ -26,6 +25,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -145,7 +145,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"-nm", "customName",
 				"-Dfancy-configuration-value=veryFancy",
 				"-Dyarn.maximum-failed-containers=3",
-				"-D" + ConfigConstants.YARN_VCORES + "=2"},
+				"-D" + YarnConfigOptions.VCORES.key() + "=2"},
 			"Number of connected TaskManagers changed to 1. Slots available: 3",
 			RunTypes.YARN_SESSION);
 
@@ -186,7 +186,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 
 			Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
 			Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
-			Assert.assertEquals("2", parsedConfig.get(ConfigConstants.YARN_VCORES));
+			Assert.assertEquals("2", parsedConfig.get(YarnConfigOptions.VCORES.key()));
 
 			// -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface
 			// first, get the hostname/port

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index dfc8b6a..55dc47f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -253,7 +253,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// The number of cores can be configured in the config.
 		// If not configured, it is set to the number of task slots
 		int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
-		int configuredVcores = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, clusterSpecification.getSlotsPerTaskManager());
+		int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
 		// don't configure more than the maximum configured number of vcores
 		if (configuredVcores > numYarnVcores) {
 			throw new IllegalConfigurationException(
@@ -261,7 +261,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 						" but Yarn only has %d virtual cores available. Please note that the number" +
 						" of virtual cores is set to the number of task slots by default unless configured" +
 						" in the Flink config with '%s.'",
-					configuredVcores, numYarnVcores, ConfigConstants.YARN_VCORES));
+					configuredVcores, numYarnVcores, YarnConfigOptions.VCORES.key()));
 		}
 
 		// check if required Hadoop environment variables are set. If not, warn user
@@ -677,7 +677,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			// activate re-execution of failed applications
 			appContext.setMaxAppAttempts(
 				flinkConfiguration.getInteger(
-					ConfigConstants.YARN_APPLICATION_ATTEMPTS,
+					YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
 					YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
 
 			activateHighAvailabilitySupport(appContext);
@@ -685,7 +685,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			// set number of application retries to 1 in the default case
 			appContext.setMaxAppAttempts(
 				flinkConfiguration.getInteger(
-					ConfigConstants.YARN_APPLICATION_ATTEMPTS,
+					YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
 					1));
 		}
 
@@ -1135,7 +1135,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		IllegalAccessException {
 
 		final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
-		final String tagsString = flinkConfiguration.getString(ConfigConstants.YARN_APPLICATION_TAGS, "");
+		final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);
 
 		final Set<String> applicationTags = new HashSet<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 662617f..98d27ab 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -82,24 +82,17 @@ public final class Utils {
 	 */
 	public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) {
 
-		BootstrapTools.substituteDeprecatedConfigKey(conf,
-			ConfigConstants.YARN_HEAP_CUTOFF_RATIO, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-		BootstrapTools.substituteDeprecatedConfigKey(conf,
-			ConfigConstants.YARN_HEAP_CUTOFF_MIN, ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
-		float memoryCutoffRatio = conf.getFloat(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
-			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
-		int minCutoff = conf.getInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
-			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
+		float memoryCutoffRatio = conf.getFloat(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO);
+		int minCutoff = conf.getInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN);
 
 		if (memoryCutoffRatio > 1 || memoryCutoffRatio < 0) {
 			throw new IllegalArgumentException("The configuration value '"
-				+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO
+				+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_RATIO.key()
 				+ "' must be between 0 and 1. Value given=" + memoryCutoffRatio);
 		}
 		if (minCutoff > memory) {
 			throw new IllegalArgumentException("The configuration value '"
-				+ ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN
+				+ ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.key()
 				+ "' is higher (" + minCutoff + ") than the requested amount of memory " + memory);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index dccbb71..e951df4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -43,6 +44,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -299,8 +301,7 @@ public class YarnApplicationMasterRunner {
 			// try to start the actor system, JobManager and JobManager actor system
 			// using the port range definition from the config.
 			final String amPortRange = config.getString(
-					ConfigConstants.YARN_APPLICATION_MASTER_PORT,
-					ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+					YarnConfigOptions.APPLICATION_MASTER_PORT);
 
 			actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, amPortRange, LOG);
 
@@ -518,21 +519,13 @@ public class YarnApplicationMasterRunner {
 		// corresponding generic config keys instead. that way, later code needs not
 		// deal with deprecated config keys
 
-		BootstrapTools.substituteDeprecatedConfigKey(configuration,
-			ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-
-		BootstrapTools.substituteDeprecatedConfigKey(configuration,
-			ConfigConstants.YARN_HEAP_CUTOFF_MIN,
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
 		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
 			ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
-			ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+			ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
 
 		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
 			ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
-			ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+			ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
 
 		return configuration;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 66e44a6..4d8142f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -28,6 +27,7 @@ import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.messages.ContainersAllocated;
 import org.apache.flink.yarn.messages.ContainersComplete;
 
@@ -337,7 +337,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 
 			// Resource requirements for worker containers
 			int taskManagerSlots = taskManagerParameters.numSlots();
-			int vcores = config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1));
+			int vcores = config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1));
 			Resource capability = Resource.newInstance(containerMemorySizeMB, vcores);
 
 			resourceManagerClient.addContainerRequest(
@@ -550,7 +550,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 					String msg = "Stopping YARN session because the number of failed containers ("
 						+ failedContainersSoFar + ") exceeded the maximum failed containers ("
 						+ maxFailedContainers + "). This number is controlled by the '"
-						+ ConfigConstants.YARN_MAX_FAILED_CONTAINERS + "' configuration setting. "
+						+ YarnConfigOptions.MAX_FAILED_CONTAINERS.key() + "' configuration setting. "
 						+ "By default its the number of requested containers.";
 
 					LOG.error(msg);
@@ -710,7 +710,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 			Logger log) {
 
 		final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
-			ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
+			YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
 
 		final long yarnExpiryIntervalMS = yarnConfig.getLong(
 			YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
@@ -723,7 +723,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 		}
 
 		final int maxFailedContainers = flinkConfig.getInteger(
-			ConfigConstants.YARN_MAX_FAILED_CONTAINERS, numInitialTaskManagers);
+			YarnConfigOptions.MAX_FAILED_CONTAINERS.key(), numInitialTaskManagers);
 		if (maxFailedContainers >= 0) {
 			log.info("YARN application tolerates {} failed TaskManager containers before giving up",
 				maxFailedContainers);

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 8327b6a..fb1a1c3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -134,7 +135,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 		this.yarnConfig = new YarnConfiguration();
 		this.env = env;
 		final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
-				ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
+				YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
 
 		final long yarnExpiryIntervalMS = yarnConfig.getLong(
 				YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 5d8abac..f2968b1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -37,6 +37,7 @@ import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptorV2;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -773,7 +774,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
 		String currentUser = System.getProperty("user.name");
 		String propertiesFileLocation =
-			conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
+			conf.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
 
 		return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 28ef2ab..3773352 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -52,6 +52,71 @@ public class YarnConfigOptions {
 		key("yarn.per-job-cluster.include-user-jar")
 			.defaultValue("ORDER");
 
+	/**
+	 * The vcores exposed by YARN.
+	 */
+	public static final ConfigOption<Integer> VCORES =
+		key("yarn.containers.vcores")
+		.defaultValue(-1);
+
+	/**
+	 * The maximum number of failed YARN containers before entirely stopping
+	 * the YARN session / job on YARN.
+	 * By default, we take the number of of initially requested containers.
+	 *
+	 * <p>Note: This option returns a String since Integer options must have a static default value.
+	 */
+	public static final ConfigOption<String> MAX_FAILED_CONTAINERS =
+		key("yarn.maximum-failed-containers")
+		.noDefaultValue();
+
+	/**
+	 * Set the number of retries for failed YARN ApplicationMasters/JobManagers in high
+	 * availability mode. This value is usually limited by YARN.
+	 * By default, it's 1 in the standalone case and 2 in the high availability case.
+	 *
+	 * <p>>Note: This option returns a String since Integer options must have a static default value.
+	 */
+	public static final ConfigOption<String> APPLICATION_ATTEMPTS =
+		key("yarn.application-attempts")
+		.noDefaultValue();
+
+	/**
+	 * The heartbeat interval between the Application Master and the YARN Resource Manager.
+	 */
+	public static final ConfigOption<Integer> HEARTBEAT_DELAY_SECONDS =
+		key("yarn.heartbeat-delay")
+		.defaultValue(5);
+
+	/**
+	 * When a Flink job is submitted to YARN, the JobManager's host and the number of available
+	 * processing slots is written into a properties file, so that the Flink client is able
+	 * to pick those details up.
+	 * This configuration parameter allows changing the default location of that file (for example
+	 * for environments sharing a Flink installation between users)
+	 */
+	public static final ConfigOption<String> PROPERTIES_FILE_LOCATION =
+		key("yarn.properties-file.location")
+		.noDefaultValue();
+
+	/**
+	 * The config parameter defining the Akka actor system port for the ApplicationMaster and
+	 * JobManager.
+	 * The port can either be a port, such as "9123",
+	 * a range of ports: "50100-50200"
+	 * or a list of ranges and or points: "50100-50200,50300-50400,51234".
+	 * Setting the port to 0 will let the OS choose an available port.
+	 */
+	public static final ConfigOption<String> APPLICATION_MASTER_PORT =
+		key("yarn.application-master.port")
+		.defaultValue("0");
+
+	/**
+	 * A comma-separated list of strings to use as YARN application tags.
+	 */
+	public static final ConfigOption<String> APPLICATION_TAGS =
+		key("yarn.tags")
+		.defaultValue("");
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 9ead775..e8fccac 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.security.SecurityContext;
@@ -112,21 +113,13 @@ public class YarnEntrypointUtils {
 		// corresponding generic config keys instead. that way, later code needs not
 		// deal with deprecated config keys
 
-		BootstrapTools.substituteDeprecatedConfigKey(configuration,
-			ConfigConstants.YARN_HEAP_CUTOFF_RATIO,
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO);
-
-		BootstrapTools.substituteDeprecatedConfigKey(configuration,
-			ConfigConstants.YARN_HEAP_CUTOFF_MIN,
-			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN);
-
 		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
 			ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
-			ConfigConstants.CONTAINERIZED_MASTER_ENV_PREFIX);
+			ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
 
 		BootstrapTools.substituteDeprecatedConfigPrefix(configuration,
 			ConfigConstants.YARN_TASK_MANAGER_ENV_PREFIX,
-			ConfigConstants.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
+			ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX);
 
 		final String keytabPath;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index d78b390..a2d1668 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -22,7 +22,7 @@ import java.io.IOException
 import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
 
 import akka.actor.ActorRef
-import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
+import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
 import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.yarn.configuration.YarnConfigOptions
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -88,7 +89,7 @@ class YarnJobManager(
   val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds
   val YARN_HEARTBEAT_DELAY: FiniteDuration =
     FiniteDuration(
-      flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
+      flinkConfiguration.getInteger(YarnConfigOptions.HEARTBEAT_DELAY_SECONDS),
       TimeUnit.SECONDS)
 
   val yarnFilesPath: Option[String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES))

http://git-wip-us.apache.org/repos/asf/flink/blob/d63d704e/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index bcb8559..19d1af5 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -87,7 +88,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	public void testConfigOverwrite() {
 		Configuration configuration = new Configuration();
 		// overwrite vcores in config
-		configuration.setInteger(ConfigConstants.YARN_VCORES, Integer.MAX_VALUE);
+		configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
 
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			configuration,