You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/10 12:23:43 UTC

[2/2] flink git commit: [FLINK-7099] Replace usages of deprecated JOB_MANAGER_IPC_PORT_KEY and JOB_MANAGER_IPC_ADDRESS_KEY

[FLINK-7099] Replace usages of deprecated JOB_MANAGER_IPC_PORT_KEY and JOB_MANAGER_IPC_ADDRESS_KEY

This closes #4278.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb48dc2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb48dc2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb48dc2f

Branch: refs/heads/master
Commit: fb48dc2fdeaa8f86c2a5ced3264beaf538a09d76
Parents: d50076f
Author: zjureel <zj...@gmail.com>
Authored: Fri Jul 7 14:24:31 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 10 12:41:37 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  5 +++--
 .../org/apache/flink/client/RemoteExecutor.java |  6 +++---
 .../deployment/StandaloneClusterDescriptor.java |  6 +++---
 .../flink/client/CliFrontendTestUtils.java      |  6 +++---
 .../client/program/ClientConnectionTest.java    |  6 +++---
 .../apache/flink/client/program/ClientTest.java |  6 +++---
 .../program/ExecutionPlanCreationTest.java      |  6 +++---
 ...rRetrievalServiceHostnameResolutionTest.java | 10 ++++-----
 .../api/avro/AvroExternalJarProgramITCase.java  |  5 +++--
 .../org/apache/flink/storm/api/FlinkClient.java | 10 ++++-----
 .../apache/flink/storm/api/FlinkSubmitter.java  |  7 +++----
 .../MesosApplicationMasterRunner.java           |  5 ++---
 .../clusterframework/BootstrapTools.java        |  8 +++----
 .../minicluster/MiniClusterConfiguration.java   |  5 +++--
 .../flink/runtime/jobmanager/JobManager.scala   | 22 +++++++++-----------
 .../runtime/minicluster/FlinkMiniCluster.scala  | 11 ++++------
 .../minicluster/LocalFlinkMiniCluster.scala     | 12 +++++------
 .../flink/runtime/jobmanager/JobSubmitTest.java |  6 +++---
 .../TaskManagerConfigurationTest.java           | 13 ++++++------
 .../TaskManagerProcessReapingTestBase.java      |  4 ++--
 .../taskmanager/TaskManagerStartupTest.java     |  9 ++++----
 .../flink/runtime/akka/AkkaSslITCase.scala      |  6 +++---
 .../runtime/testingUtils/TestingCluster.scala   |  8 +++----
 .../org/apache/flink/api/scala/FlinkShell.scala |  4 ++--
 .../environment/RemoteStreamEnvironment.java    |  6 +++---
 ...ctTaskManagerProcessFailureRecoveryTest.java |  4 ++--
 .../flink/test/runtime/IPv6HostnamesITCase.java |  3 ++-
 ...CliFrontendYarnAddressConfigurationTest.java |  5 +++--
 .../YARNSessionCapacitySchedulerITCase.java     |  5 +++--
 .../yarn/AbstractYarnClusterDescriptor.java     |  8 +++----
 30 files changed, 108 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 5739cdd..cbb7aaa 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -44,6 +44,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -1146,8 +1147,8 @@ public class CliFrontend {
 	 * @param config The config to write to
 	 */
 	public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
-		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostString());
-		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
+		config.setString(JobManagerOptions.ADDRESS, address.getHostString());
+		config.setInteger(JobManagerOptions.PORT, address.getPort());
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 4a3cc74..1ae9b07 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -108,8 +108,8 @@ public class RemoteExecutor extends PlanExecutor {
 		this.jarFiles = jarFiles;
 		this.globalClasspaths = globalClasspaths;
 
-		clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
-		clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
+		clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
+		clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 699de3b..0507c3f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -19,8 +19,8 @@
 package org.apache.flink.client.deployment;
 
 import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 
 /**
  * A deployment descriptor for an existing cluster.
@@ -35,8 +35,8 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
 
 	@Override
 	public String getClusterDescription() {
-		String host = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "");
-		int port = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+		String host = config.getString(JobManagerOptions.ADDRESS, "");
+		int port = config.getInteger(JobManagerOptions.PORT, -1);
 		return "Standalone cluster at " + host + ":" + port;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index 2a20d8e..7c9c773 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.client;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -75,8 +75,8 @@ public class CliFrontendTestUtils {
 	}
 
 	public static void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) {
-		String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-		int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+		String jobManagerAddress = config.getString(JobManagerOptions.ADDRESS);
+		int jobManagerPort = config.getInteger(JobManagerOptions.PORT, -1);
 
 		assertEquals(expectedAddress, jobManagerAddress);
 		assertEquals(expectedPort, jobManagerPort);

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 2b760bd..c2505ae 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.client.program;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClientActorTest;
@@ -98,8 +98,8 @@ public class ClientConnectionTest extends TestLogger {
 		final Configuration config = new Configuration();
 		config.setString(AkkaOptions.ASK_TIMEOUT, ASK_STARTUP_TIMEOUT + " ms");
 		config.setString(AkkaOptions.LOOKUP_TIMEOUT, CONNECT_TIMEOUT + " ms");
-		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, unreachableEndpoint.getHostName());
-		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, unreachableEndpoint.getPort());
+		config.setString(JobManagerOptions.ADDRESS, unreachableEndpoint.getHostName());
+		config.setInteger(JobManagerOptions.PORT, unreachableEndpoint.getPort());
 
 		ClusterClient client = new StandaloneClusterClient(config);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 9349401..97794dd 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -31,8 +31,8 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -96,8 +96,8 @@ public class ClientTest extends TestLogger {
 
 		final int freePort = NetUtils.getAvailablePort();
 		config = new Configuration();
-		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, freePort);
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+		config.setInteger(JobManagerOptions.PORT, freePort);
 		config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index 9c5a878..8bf5f4a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -56,8 +56,8 @@ public class ExecutionPlanCreationTest {
 
 			Configuration config = new Configuration();
 
-			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, mockJmAddress.getHostName());
-			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, mockJmAddress.getPort());
+			config.setString(JobManagerOptions.ADDRESS, mockJmAddress.getHostName());
+			config.setInteger(JobManagerOptions.PORT, mockJmAddress.getPort());
 
 			Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
 			OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, -1);

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
index 4731d44..a62f9d8 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.client.program;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.util.StandaloneUtils;
 import org.apache.flink.util.ConfigurationException;
@@ -54,8 +54,8 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
 	public void testUnresolvableHostname1() throws UnknownHostException, ConfigurationException {
 		Configuration config = new Configuration();
 
-		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
-		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
+		config.setString(JobManagerOptions.ADDRESS, nonExistingHostname);
+		config.setInteger(JobManagerOptions.PORT, 17234);
 
 		StandaloneUtils.createLeaderRetrievalService(
 			config,
@@ -72,8 +72,8 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
 		try {
 			Configuration config = new Configuration();
 
-			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
-			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
+			config.setString(JobManagerOptions.ADDRESS, nonExistingHostname);
+			config.setInteger(JobManagerOptions.PORT, 17234);
 
 			StandaloneUtils.createLeaderRetrievalService(
 				config,

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 7bcba04..6133778 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.avro.testjar.AvroExternalJarProgram;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.TestEnvironment;
@@ -66,8 +67,8 @@ public class AvroExternalJarProgramITCase extends TestLogger {
 				Collections.singleton(new Path(jarFile)),
 				Collections.<URL>emptyList());
 
-			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
+			config.setString(JobManagerOptions.ADDRESS, "localhost");
+			config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
 
 			program.invokeInteractiveModeForExecution();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 88a38e2..d53ca42 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -25,9 +25,9 @@ import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -202,8 +202,8 @@ public class FlinkClient {
 		jobGraph.addJar(new Path(uploadedJarUri));
 
 		final Configuration configuration = jobGraph.getJobConfiguration();
-		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
-		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+		configuration.setString(JobManagerOptions.ADDRESS, jobManagerHost);
+		configuration.setInteger(JobManagerOptions.PORT, jobManagerPort);
 
 		final ClusterClient client;
 		try {
@@ -242,8 +242,8 @@ public class FlinkClient {
 		}
 
 		final Configuration configuration = GlobalConfiguration.loadConfiguration();
-		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerHost);
-		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.jobManagerPort);
+		configuration.setString(JobManagerOptions.ADDRESS, this.jobManagerHost);
+		configuration.setInteger(JobManagerOptions.PORT, this.jobManagerPort);
 
 		final ClusterClient client;
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
index 6135d4d..c36942e 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
@@ -19,9 +19,9 @@ package org.apache.flink.storm.api;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
 
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
@@ -91,12 +91,11 @@ public class FlinkSubmitter {
 		final Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
 		if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
 			stormConf.put(Config.NIMBUS_HOST,
-					flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
+					flinkConfig.getString(JobManagerOptions.ADDRESS, "localhost"));
 		}
 		if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
 			stormConf.put(Config.NIMBUS_THRIFT_PORT,
-					new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-							6123)));
+					new Integer(flinkConfig.getInteger(JobManagerOptions.PORT)));
 		}
 
 		final String serConf = JSONValue.toJSONString(stormConf);

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index fc75bd7..d4e2f0d 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -246,10 +246,9 @@ public class MesosApplicationMasterRunner {
 				taskManagerParameters.cpus());
 
 			// JM endpoint, which should be explicitly configured based on acquired net resources
-			final int listeningPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-				ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+			final int listeningPort = config.getInteger(JobManagerOptions.PORT);
 			checkState(listeningPort >= 0 && listeningPort <= 65536, "Config parameter \"" +
-				ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + "\" is invalid, it must be between 0 and 65536");
+				JobManagerOptions.PORT.key() + "\" is invalid, it must be between 0 and 65536");
 
 			// ----------------- (2) start the actor system -------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index b8e5351..a98e574 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -185,8 +185,8 @@ public class BootstrapTools {
 
 		// this ensures correct values are present in the web frontend
 		final Address address = AkkaUtils.getAddress(actorSystem);
-		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host().get());
-		config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port().get().toString());
+		config.setString(JobManagerOptions.ADDRESS, address.host().get());
+		config.setInteger(JobManagerOptions.PORT, Integer.parseInt(address.port().get().toString()));
 
 		if (config.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) {
 			logger.info("Starting JobManager Web Frontend");
@@ -228,8 +228,8 @@ public class BootstrapTools {
 
 		Configuration cfg = baseConfig.clone();
 
-		cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHostname);
-		cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+		cfg.setString(JobManagerOptions.ADDRESS, jobManagerHostname);
+		cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
 		cfg.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, registrationTimeout.toString());
 		if (numSlots != -1){
 			cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index b8d6bbb..aa9b0c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.minicluster;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
@@ -133,7 +134,7 @@ public class MiniClusterConfiguration {
 	public String getJobManagerBindAddress() {
 		return commonBindAddress != null ?
 				commonBindAddress :
-				config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+				config.getString(JobManagerOptions.ADDRESS, "localhost");
 	}
 
 	public String getTaskManagerBindAddress() {
@@ -145,7 +146,7 @@ public class MiniClusterConfiguration {
 	public String getResourceManagerBindAddress() {
 		return commonBindAddress != null ?
 			commonBindAddress :
-			config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname
+			config.getString(JobManagerOptions.ADDRESS, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname
 	}
 
 	public Time getRpcTimeout() {

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 51d4159..3128cfc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -152,7 +152,7 @@ class JobManager(
 
   protected val jobManagerMetricGroup : Option[JobManagerMetricGroup] = metricsRegistry match {
     case Some(registry) =>
-      val host = flinkConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+      val host = flinkConfiguration.getString(JobManagerOptions.ADDRESS)
       Option(new JobManagerMetricGroup(
         registry, NetUtils.unresolvedHostToNormalizedString(host)))
     case None =>
@@ -1956,7 +1956,7 @@ object JobManager {
     // if it is not in there, the actor system will bind to the loopback interface's
     // address and will not be reachable from anyone remote
     if (externalHostName == null) {
-      val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
+      val message = "Config parameter '" + JobManagerOptions.ADDRESS.key() +
         "' is missing (hostname/address to bind JobManager to)."
       LOG.error(message)
       System.exit(STARTUP_FAILURE_RETURN_CODE)
@@ -1970,7 +1970,7 @@ object JobManager {
         System.exit(STARTUP_FAILURE_RETURN_CODE)
       }
       else {
-        val message = s"Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
+        val message = s"Config parameter '" + JobManagerOptions.ADDRESS.key() +
           "' does not specify a valid port."
         LOG.error(message)
         System.exit(STARTUP_FAILURE_RETURN_CODE)
@@ -2181,8 +2181,8 @@ object JobManager {
 
     val address = AkkaUtils.getAddress(jobManagerSystem)
 
-    configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get)
-    configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get)
+    configuration.setString(JobManagerOptions.ADDRESS, address.host.get)
+    configuration.setInteger(JobManagerOptions.PORT, address.port.get)
 
     jobManagerSystem
   }
@@ -2399,17 +2399,17 @@ object JobManager {
     }
 
     if (cliOptions.getHost() != null) {
-      configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, cliOptions.getHost())
+      configuration.setString(JobManagerOptions.ADDRESS, cliOptions.getHost())
     }
 
-    val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+    val host = configuration.getString(JobManagerOptions.ADDRESS)
 
     val portRange =
       // high availability mode
       if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
         LOG.info("Starting JobManager with high-availability")
 
-        configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
+        configuration.setInteger(JobManagerOptions.PORT, 0)
 
         // The port range of allowed job manager ports or 0 for random
         configuration.getValue(HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE)
@@ -2418,12 +2418,10 @@ object JobManager {
         LOG.info("Starting JobManager without high-availability")
 
         // In standalone mode, we don't allow port ranges
-        val listeningPort = configuration.getInteger(
-          ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+        val listeningPort = configuration.getInteger(JobManagerOptions.PORT)
 
         if (listeningPort <= 0 || listeningPort >= 65536) {
-          val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
+          val message = "Config parameter '" + JobManagerOptions.PORT.key() +
             "' is invalid, it must be greater than 0 and less than 65536."
           LOG.error(message)
           System.exit(STARTUP_FAILURE_RETURN_CODE)

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 0e6f102..f0a96ca 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -72,7 +72,7 @@ abstract class FlinkMiniCluster(
   // NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and
   // not getLocalHost(), which may be 127.0.1.1
   val hostname = userConfiguration.getString(
-    ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
+    JobManagerOptions.ADDRESS,
     "localhost")
 
   protected val originalConfiguration = generateConfiguration(userConfiguration)
@@ -129,14 +129,12 @@ abstract class FlinkMiniCluster(
   }
 
   def configuration: Configuration = {
-    if (originalConfiguration.getInteger(
-      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) == 0) {
+    if (originalConfiguration.getInteger(JobManagerOptions.PORT) == 0) {
       val leaderConfiguration = new Configuration(originalConfiguration)
 
       val leaderPort = getLeaderRPCPort
 
-      leaderConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, leaderPort)
+      leaderConfiguration.setInteger(JobManagerOptions.PORT, leaderPort)
 
       leaderConfiguration
     } else {
@@ -241,8 +239,7 @@ abstract class FlinkMiniCluster(
       AkkaUtils.getAkkaConfig(originalConfiguration, None)
     }
     else {
-      val port = originalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-                                                  ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+      val port = originalConfiguration.getInteger(JobManagerOptions.PORT)
 
       val resolvedPort = if(port != 0) port + index else port
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index a3e1c78..27a8ee1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
 import akka.actor.{ActorRef, ActorSystem, Props}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions, TaskManagerOptions}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, TaskManagerOptions}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -125,12 +125,10 @@ class LocalFlinkMiniCluster(
     val jobManagerName = getJobManagerName(index)
     val archiveName = getArchiveName(index)
 
-    val jobManagerPort = config.getInteger(
-      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+    val jobManagerPort = config.getInteger(JobManagerOptions.PORT)
 
     if(jobManagerPort > 0) {
-      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
+      config.setInteger(JobManagerOptions.PORT, jobManagerPort + index)
     }
 
     val (instanceManager,
@@ -389,8 +387,8 @@ class LocalFlinkMiniCluster(
   def getDefaultConfig: Configuration = {
     val config: Configuration = new Configuration()
 
-    config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname)
-    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
+    config.setString(JobManagerOptions.ADDRESS, hostname)
+    config.setInteger(JobManagerOptions.PORT, 0)
 
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
       ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 37f503f..79b9c1c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorSystem;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobClient;
@@ -78,8 +78,8 @@ public class JobSubmitTest {
 
 		int port = NetUtils.getAvailablePort();
 
-		jmConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-		jmConfig.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+		jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+		jmConfig.setInteger(JobManagerOptions.PORT, port);
 
 		scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.apply(new Tuple2<String, Object>("localhost", port));
 		jobManagerSystem = AkkaUtils.createActorSystem(jmConfig, listeningAddress);

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
index 69cadfb..96d1455 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.testutils.CommonTestUtils;
@@ -54,8 +55,8 @@ public class TaskManagerConfigurationTest {
 
 		Configuration config = new Configuration();
 		config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, TEST_HOST_NAME);
-		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+		config.setInteger(JobManagerOptions.PORT, 7891);
 
 		HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
 			config,
@@ -82,8 +83,8 @@ public class TaskManagerConfigurationTest {
 		// config with pre-configured hostname to speed up tests (no interface selection)
 		Configuration config = new Configuration();
 		config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
-		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+		config.setInteger(JobManagerOptions.PORT, 7891);
 
 		HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
 			config,
@@ -184,8 +185,8 @@ public class TaskManagerConfigurationTest {
 		// open a server port to allow the system to connect
 		Configuration config = new Configuration();
 
-		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname);
-		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, server.getLocalPort());
+		config.setString(JobManagerOptions.ADDRESS, hostname);
+		config.setInteger(JobManagerOptions.PORT, server.getLocalPort());
 
 		HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
 			config,

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 130610c..08b71da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -240,8 +240,8 @@ public abstract class TaskManagerProcessReapingTestBase extends TestLogger {
 			int taskManagerPort = Integer.parseInt(args[1]);
 
 			Configuration cfg = new Configuration();
-			cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-			cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+			cfg.setString(JobManagerOptions.ADDRESS, "localhost");
+			cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
 			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 			cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 256);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 0e77ddd..79dd207 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -142,8 +143,8 @@ public class TaskManagerStartupTest {
 			Configuration cfg = new Configuration();
 			cfg.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, nonWritable.getAbsolutePath());
 			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
-			cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-			cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
+			cfg.setString(JobManagerOptions.ADDRESS, "localhost");
+			cfg.setInteger(JobManagerOptions.PORT, 21656);
 
 			try {
 				TaskManager.runTaskManager(
@@ -184,8 +185,8 @@ public class TaskManagerStartupTest {
 	public void testMemoryConfigWrong() {
 		try {
 			Configuration cfg = new Configuration();
-			cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-			cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
+			cfg.setString(JobManagerOptions.ADDRESS, "localhost");
+			cfg.setInteger(JobManagerOptions.PORT, 21656);
 			cfg.setString(ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, "true");
 
 			// something invalid

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index 4671981..72596cd 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.akka
 
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, SecurityOptions}
+import org.apache.flink.configuration._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -49,7 +49,7 @@ class AkkaSslITCase(_system: ActorSystem)
     "start with akka ssl enabled" in {
 
       val config = new Configuration()
-      config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1")
+      config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
       config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
       config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
       config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
@@ -76,7 +76,7 @@ class AkkaSslITCase(_system: ActorSystem)
       an[Exception] should be thrownBy {
 
         val config = new Configuration()
-        config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1")
+        config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
         config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
         config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 7980cdf..9e0a6e1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -26,7 +26,7 @@ import akka.pattern.Patterns._
 import akka.pattern.ask
 import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
@@ -211,11 +211,11 @@ class TestingCluster(
           // restart the leading job manager with the same port
           val port = getLeaderRPCPort
           val oldPort = originalConfiguration.getInteger(
-            ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+            JobManagerOptions.PORT,
             0)
 
           // we have to set the old port in the configuration file because this is used for startup
-          originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port)
+          originalConfiguration.setInteger(JobManagerOptions.PORT, port)
 
           clearLeader()
 
@@ -234,7 +234,7 @@ class TestingCluster(
           }
 
           // reset the original configuration
-          originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, oldPort)
+          originalConfiguration.setInteger(JobManagerOptions.PORT, oldPort)
 
           val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index bb26454..a5c5860 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -26,7 +26,7 @@ import org.apache.flink.client.cli.CliFrontendParser
 import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.client.CliFrontend
 import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
-import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration}
+import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, JobManagerOptions}
 import org.apache.flink.runtime.minicluster.{FlinkMiniCluster, LocalFlinkMiniCluster}
 
 import scala.collection.mutable.ArrayBuffer
@@ -145,7 +145,7 @@ object FlinkShell {
     config.executionMode match {
       case ExecutionMode.LOCAL => // Local mode
         val config = GlobalConfiguration.loadConfiguration()
-        config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
+        config.setInteger(JobManagerOptions.PORT, 0)
 
         val miniCluster = new StandaloneMiniCluster(config)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 042248b..74b1c68 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -25,8 +25,8 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
 import org.slf4j.Logger;
@@ -196,8 +196,8 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 		Configuration configuration = new Configuration();
 		configuration.addAll(this.clientConfiguration);
 
-		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
-		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+		configuration.setString(JobManagerOptions.ADDRESS, host);
+		configuration.setInteger(JobManagerOptions.PORT, port);
 
 		ClusterClient client;
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 5c65a7f..6882b46 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -405,8 +405,8 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				int jobManagerPort = Integer.parseInt(args[0]);
 
 				Configuration cfg = new Configuration();
-				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+				cfg.setString(JobManagerOptions.ADDRESS, "localhost");
+				cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
 				cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 				cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 4c77ef0..7fb4c82 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -70,7 +71,7 @@ public class IPv6HostnamesITCase extends TestLogger {
 			log.info("Test will use IPv6 address " + addressString + " for connection tests");
 			
 			Configuration conf = new Configuration();
-			conf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, addressString);
+			conf.setString(JobManagerOptions.ADDRESS, addressString);
 			conf.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString);
 			conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index b3beab6..d59e5b4 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 
@@ -428,8 +429,8 @@ public class CliFrontendYarnAddressConfigurationTest {
 	}
 
 	private static void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) {
-		String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-		int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+		String jobManagerAddress = config.getString(JobManagerOptions.ADDRESS);
+		int jobManagerPort = config.getInteger(JobManagerOptions.PORT, -1);
 
 		assertEquals(expectedAddress, jobManagerAddress);
 		assertEquals(expectedPort, jobManagerPort);

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 7d6c5d6..5caea29 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.test.testdata.WordCountData;
@@ -202,9 +203,9 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			LOG.info("Extracted hostname:port: {} {}", hostname, port);
 
 			Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname,
-				parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY));
+				parsedConfig.get(JobManagerOptions.ADDRESS.key()));
 			Assert.assertEquals("unable to find port in " + jsonConfig, port,
-				parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));
+				parsedConfig.get(JobManagerOptions.PORT.key()));
 
 			// test logfile access
 			String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");

http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index db67e9a..c86565b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -416,8 +416,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'",
 				appReport.getHost(), appReport.getRpcPort(), applicationID);
 
-			flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, appReport.getHost());
-			flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, appReport.getRpcPort());
+			flinkConfiguration.setString(JobManagerOptions.ADDRESS, appReport.getHost());
+			flinkConfiguration.setInteger(JobManagerOptions.PORT, appReport.getRpcPort());
 
 			return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
 		} catch (Exception e) {
@@ -591,8 +591,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		int port = report.getRpcPort();
 
 		// Correctly initialize the Flink config
-		flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
-		flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+		flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
+		flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
 
 		// the Flink cluster is deployed in YARN. Represent cluster
 		return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, true);