You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/10 12:23:43 UTC
[2/2] flink git commit: [FLINK-7099] Replace usages of deprecated
JOB_MANAGER_IPC_PORT_KEY and JOB_MANAGER_IPC_ADDRESS_KEY
[FLINK-7099] Replace usages of deprecated JOB_MANAGER_IPC_PORT_KEY and JOB_MANAGER_IPC_ADDRESS_KEY
This closes #4278.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb48dc2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb48dc2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb48dc2f
Branch: refs/heads/master
Commit: fb48dc2fdeaa8f86c2a5ced3264beaf538a09d76
Parents: d50076f
Author: zjureel <zj...@gmail.com>
Authored: Fri Jul 7 14:24:31 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 10 12:41:37 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 5 +++--
.../org/apache/flink/client/RemoteExecutor.java | 6 +++---
.../deployment/StandaloneClusterDescriptor.java | 6 +++---
.../flink/client/CliFrontendTestUtils.java | 6 +++---
.../client/program/ClientConnectionTest.java | 6 +++---
.../apache/flink/client/program/ClientTest.java | 6 +++---
.../program/ExecutionPlanCreationTest.java | 6 +++---
...rRetrievalServiceHostnameResolutionTest.java | 10 ++++-----
.../api/avro/AvroExternalJarProgramITCase.java | 5 +++--
.../org/apache/flink/storm/api/FlinkClient.java | 10 ++++-----
.../apache/flink/storm/api/FlinkSubmitter.java | 7 +++----
.../MesosApplicationMasterRunner.java | 5 ++---
.../clusterframework/BootstrapTools.java | 8 +++----
.../minicluster/MiniClusterConfiguration.java | 5 +++--
.../flink/runtime/jobmanager/JobManager.scala | 22 +++++++++-----------
.../runtime/minicluster/FlinkMiniCluster.scala | 11 ++++------
.../minicluster/LocalFlinkMiniCluster.scala | 12 +++++------
.../flink/runtime/jobmanager/JobSubmitTest.java | 6 +++---
.../TaskManagerConfigurationTest.java | 13 ++++++------
.../TaskManagerProcessReapingTestBase.java | 4 ++--
.../taskmanager/TaskManagerStartupTest.java | 9 ++++----
.../flink/runtime/akka/AkkaSslITCase.scala | 6 +++---
.../runtime/testingUtils/TestingCluster.scala | 8 +++----
.../org/apache/flink/api/scala/FlinkShell.scala | 4 ++--
.../environment/RemoteStreamEnvironment.java | 6 +++---
...ctTaskManagerProcessFailureRecoveryTest.java | 4 ++--
.../flink/test/runtime/IPv6HostnamesITCase.java | 3 ++-
...CliFrontendYarnAddressConfigurationTest.java | 5 +++--
.../YARNSessionCapacitySchedulerITCase.java | 5 +++--
.../yarn/AbstractYarnClusterDescriptor.java | 8 +++----
30 files changed, 108 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 5739cdd..cbb7aaa 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -44,6 +44,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
@@ -1146,8 +1147,8 @@ public class CliFrontend {
* @param config The config to write to
*/
public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostString());
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
+ config.setString(JobManagerOptions.ADDRESS, address.getHostString());
+ config.setInteger(JobManagerOptions.PORT, address.getPort());
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 4a3cc74..1ae9b07 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -108,8 +108,8 @@ public class RemoteExecutor extends PlanExecutor {
this.jarFiles = jarFiles;
this.globalClasspaths = globalClasspaths;
- clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
- clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
+ clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
+ clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 699de3b..0507c3f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -19,8 +19,8 @@
package org.apache.flink.client.deployment;
import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
/**
* A deployment descriptor for an existing cluster.
@@ -35,8 +35,8 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
@Override
public String getClusterDescription() {
- String host = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "");
- int port = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+ String host = config.getString(JobManagerOptions.ADDRESS, "");
+ int port = config.getInteger(JobManagerOptions.PORT, -1);
return "Standalone cluster at " + host + ":" + port;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index 2a20d8e..7c9c773 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -18,8 +18,8 @@
package org.apache.flink.client;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import java.io.File;
import java.io.FileNotFoundException;
@@ -75,8 +75,8 @@ public class CliFrontendTestUtils {
}
public static void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) {
- String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
- int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+ String jobManagerAddress = config.getString(JobManagerOptions.ADDRESS);
+ int jobManagerPort = config.getInteger(JobManagerOptions.PORT, -1);
assertEquals(expectedAddress, jobManagerAddress);
assertEquals(expectedPort, jobManagerPort);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 2b760bd..c2505ae 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -19,8 +19,8 @@
package org.apache.flink.client.program;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClientActorTest;
@@ -98,8 +98,8 @@ public class ClientConnectionTest extends TestLogger {
final Configuration config = new Configuration();
config.setString(AkkaOptions.ASK_TIMEOUT, ASK_STARTUP_TIMEOUT + " ms");
config.setString(AkkaOptions.LOOKUP_TIMEOUT, CONNECT_TIMEOUT + " ms");
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, unreachableEndpoint.getHostName());
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, unreachableEndpoint.getPort());
+ config.setString(JobManagerOptions.ADDRESS, unreachableEndpoint.getHostName());
+ config.setInteger(JobManagerOptions.PORT, unreachableEndpoint.getPort());
ClusterClient client = new StandaloneClusterClient(config);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 9349401..97794dd 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -31,8 +31,8 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -96,8 +96,8 @@ public class ClientTest extends TestLogger {
final int freePort = NetUtils.getAvailablePort();
config = new Configuration();
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, freePort);
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+ config.setInteger(JobManagerOptions.PORT, freePort);
config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue());
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index 9c5a878..8bf5f4a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -56,8 +56,8 @@ public class ExecutionPlanCreationTest {
Configuration config = new Configuration();
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, mockJmAddress.getHostName());
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, mockJmAddress.getPort());
+ config.setString(JobManagerOptions.ADDRESS, mockJmAddress.getHostName());
+ config.setInteger(JobManagerOptions.PORT, mockJmAddress.getPort());
Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, -1);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
index 4731d44..a62f9d8 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.client.program;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.util.StandaloneUtils;
import org.apache.flink.util.ConfigurationException;
@@ -54,8 +54,8 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
public void testUnresolvableHostname1() throws UnknownHostException, ConfigurationException {
Configuration config = new Configuration();
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
+ config.setString(JobManagerOptions.ADDRESS, nonExistingHostname);
+ config.setInteger(JobManagerOptions.PORT, 17234);
StandaloneUtils.createLeaderRetrievalService(
config,
@@ -72,8 +72,8 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
try {
Configuration config = new Configuration();
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
+ config.setString(JobManagerOptions.ADDRESS, nonExistingHostname);
+ config.setInteger(JobManagerOptions.PORT, 17234);
StandaloneUtils.createLeaderRetrievalService(
config,
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 7bcba04..6133778 100644
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.avro.testjar.AvroExternalJarProgram;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.util.TestEnvironment;
@@ -66,8 +67,8 @@ public class AvroExternalJarProgramITCase extends TestLogger {
Collections.singleton(new Path(jarFile)),
Collections.<URL>emptyList());
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+ config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
program.invokeInteractiveModeForExecution();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 88a38e2..d53ca42 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -25,9 +25,9 @@ import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -202,8 +202,8 @@ public class FlinkClient {
jobGraph.addJar(new Path(uploadedJarUri));
final Configuration configuration = jobGraph.getJobConfiguration();
- configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
- configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+ configuration.setString(JobManagerOptions.ADDRESS, jobManagerHost);
+ configuration.setInteger(JobManagerOptions.PORT, jobManagerPort);
final ClusterClient client;
try {
@@ -242,8 +242,8 @@ public class FlinkClient {
}
final Configuration configuration = GlobalConfiguration.loadConfiguration();
- configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerHost);
- configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.jobManagerPort);
+ configuration.setString(JobManagerOptions.ADDRESS, this.jobManagerHost);
+ configuration.setInteger(JobManagerOptions.PORT, this.jobManagerPort);
final ClusterClient client;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
index 6135d4d..c36942e 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
@@ -19,9 +19,9 @@ package org.apache.flink.storm.api;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
@@ -91,12 +91,11 @@ public class FlinkSubmitter {
final Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
stormConf.put(Config.NIMBUS_HOST,
- flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
+ flinkConfig.getString(JobManagerOptions.ADDRESS, "localhost"));
}
if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
stormConf.put(Config.NIMBUS_THRIFT_PORT,
- new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- 6123)));
+ new Integer(flinkConfig.getInteger(JobManagerOptions.PORT)));
}
final String serConf = JSONValue.toJSONString(stormConf);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index fc75bd7..d4e2f0d 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -246,10 +246,9 @@ public class MesosApplicationMasterRunner {
taskManagerParameters.cpus());
// JM endpoint, which should be explicitly configured based on acquired net resources
- final int listeningPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+ final int listeningPort = config.getInteger(JobManagerOptions.PORT);
checkState(listeningPort >= 0 && listeningPort <= 65536, "Config parameter \"" +
- ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + "\" is invalid, it must be between 0 and 65536");
+ JobManagerOptions.PORT.key() + "\" is invalid, it must be between 0 and 65536");
// ----------------- (2) start the actor system -------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index b8e5351..a98e574 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -185,8 +185,8 @@ public class BootstrapTools {
// this ensures correct values are present in the web frontend
final Address address = AkkaUtils.getAddress(actorSystem);
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host().get());
- config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port().get().toString());
+ config.setString(JobManagerOptions.ADDRESS, address.host().get());
+ config.setInteger(JobManagerOptions.PORT, Integer.parseInt(address.port().get().toString()));
if (config.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) {
logger.info("Starting JobManager Web Frontend");
@@ -228,8 +228,8 @@ public class BootstrapTools {
Configuration cfg = baseConfig.clone();
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHostname);
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+ cfg.setString(JobManagerOptions.ADDRESS, jobManagerHostname);
+ cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
cfg.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, registrationTimeout.toString());
if (numSlots != -1){
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index b8d6bbb..aa9b0c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.minicluster;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
@@ -133,7 +134,7 @@ public class MiniClusterConfiguration {
public String getJobManagerBindAddress() {
return commonBindAddress != null ?
commonBindAddress :
- config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ config.getString(JobManagerOptions.ADDRESS, "localhost");
}
public String getTaskManagerBindAddress() {
@@ -145,7 +146,7 @@ public class MiniClusterConfiguration {
public String getResourceManagerBindAddress() {
return commonBindAddress != null ?
commonBindAddress :
- config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname
+ config.getString(JobManagerOptions.ADDRESS, "localhost"); // TODO: Introduce proper configuration constant for the resource manager hostname
}
public Time getRpcTimeout() {
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 51d4159..3128cfc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -152,7 +152,7 @@ class JobManager(
protected val jobManagerMetricGroup : Option[JobManagerMetricGroup] = metricsRegistry match {
case Some(registry) =>
- val host = flinkConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+ val host = flinkConfiguration.getString(JobManagerOptions.ADDRESS)
Option(new JobManagerMetricGroup(
registry, NetUtils.unresolvedHostToNormalizedString(host)))
case None =>
@@ -1956,7 +1956,7 @@ object JobManager {
// if it is not in there, the actor system will bind to the loopback interface's
// address and will not be reachable from anyone remote
if (externalHostName == null) {
- val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
+ val message = "Config parameter '" + JobManagerOptions.ADDRESS.key() +
"' is missing (hostname/address to bind JobManager to)."
LOG.error(message)
System.exit(STARTUP_FAILURE_RETURN_CODE)
@@ -1970,7 +1970,7 @@ object JobManager {
System.exit(STARTUP_FAILURE_RETURN_CODE)
}
else {
- val message = s"Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
+ val message = s"Config parameter '" + JobManagerOptions.ADDRESS.key() +
"' does not specify a valid port."
LOG.error(message)
System.exit(STARTUP_FAILURE_RETURN_CODE)
@@ -2181,8 +2181,8 @@ object JobManager {
val address = AkkaUtils.getAddress(jobManagerSystem)
- configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get)
- configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get)
+ configuration.setString(JobManagerOptions.ADDRESS, address.host.get)
+ configuration.setInteger(JobManagerOptions.PORT, address.port.get)
jobManagerSystem
}
@@ -2399,17 +2399,17 @@ object JobManager {
}
if (cliOptions.getHost() != null) {
- configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, cliOptions.getHost())
+ configuration.setString(JobManagerOptions.ADDRESS, cliOptions.getHost())
}
- val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+ val host = configuration.getString(JobManagerOptions.ADDRESS)
val portRange =
// high availability mode
if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
LOG.info("Starting JobManager with high-availability")
- configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
+ configuration.setInteger(JobManagerOptions.PORT, 0)
// The port range of allowed job manager ports or 0 for random
configuration.getValue(HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE)
@@ -2418,12 +2418,10 @@ object JobManager {
LOG.info("Starting JobManager without high-availability")
// In standalone mode, we don't allow port ranges
- val listeningPort = configuration.getInteger(
- ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+ val listeningPort = configuration.getInteger(JobManagerOptions.PORT)
if (listeningPort <= 0 || listeningPort >= 65536) {
- val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
+ val message = "Config parameter '" + JobManagerOptions.PORT.key() +
"' is invalid, it must be greater than 0 and less than 65536."
LOG.error(message)
System.exit(STARTUP_FAILURE_RETURN_CODE)
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 0e6f102..f0a96ca 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -72,7 +72,7 @@ abstract class FlinkMiniCluster(
// NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and
// not getLocalHost(), which may be 127.0.1.1
val hostname = userConfiguration.getString(
- ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
+ JobManagerOptions.ADDRESS,
"localhost")
protected val originalConfiguration = generateConfiguration(userConfiguration)
@@ -129,14 +129,12 @@ abstract class FlinkMiniCluster(
}
def configuration: Configuration = {
- if (originalConfiguration.getInteger(
- ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) == 0) {
+ if (originalConfiguration.getInteger(JobManagerOptions.PORT) == 0) {
val leaderConfiguration = new Configuration(originalConfiguration)
val leaderPort = getLeaderRPCPort
- leaderConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, leaderPort)
+ leaderConfiguration.setInteger(JobManagerOptions.PORT, leaderPort)
leaderConfiguration
} else {
@@ -241,8 +239,7 @@ abstract class FlinkMiniCluster(
AkkaUtils.getAkkaConfig(originalConfiguration, None)
}
else {
- val port = originalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+ val port = originalConfiguration.getInteger(JobManagerOptions.PORT)
val resolvedPort = if(port != 0) port + index else port
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index a3e1c78..27a8ee1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
import akka.actor.{ActorRef, ActorSystem, Props}
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions, TaskManagerOptions}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions, QueryableStateOptions, TaskManagerOptions}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -125,12 +125,10 @@ class LocalFlinkMiniCluster(
val jobManagerName = getJobManagerName(index)
val archiveName = getArchiveName(index)
- val jobManagerPort = config.getInteger(
- ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+ val jobManagerPort = config.getInteger(JobManagerOptions.PORT)
if(jobManagerPort > 0) {
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
+ config.setInteger(JobManagerOptions.PORT, jobManagerPort + index)
}
val (instanceManager,
@@ -389,8 +387,8 @@ class LocalFlinkMiniCluster(
def getDefaultConfig: Configuration = {
val config: Configuration = new Configuration()
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname)
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
+ config.setString(JobManagerOptions.ADDRESS, hostname)
+ config.setInteger(JobManagerOptions.PORT, 0)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 37f503f..79b9c1c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.jobmanager;
import akka.actor.ActorSystem;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobClient;
@@ -78,8 +78,8 @@ public class JobSubmitTest {
int port = NetUtils.getAvailablePort();
- jmConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- jmConfig.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+ jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+ jmConfig.setInteger(JobManagerOptions.PORT, port);
scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.apply(new Tuple2<String, Object>("localhost", port));
jobManagerSystem = AkkaUtils.createActorSystem(jmConfig, listeningAddress);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
index 69cadfb..96d1455 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.testutils.CommonTestUtils;
@@ -54,8 +55,8 @@ public class TaskManagerConfigurationTest {
Configuration config = new Configuration();
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, TEST_HOST_NAME);
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+ config.setInteger(JobManagerOptions.PORT, 7891);
HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
@@ -82,8 +83,8 @@ public class TaskManagerConfigurationTest {
// config with pre-configured hostname to speed up tests (no interface selection)
Configuration config = new Configuration();
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 7891);
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+ config.setInteger(JobManagerOptions.PORT, 7891);
HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
@@ -184,8 +185,8 @@ public class TaskManagerConfigurationTest {
// open a server port to allow the system to connect
Configuration config = new Configuration();
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname);
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, server.getLocalPort());
+ config.setString(JobManagerOptions.ADDRESS, hostname);
+ config.setInteger(JobManagerOptions.PORT, server.getLocalPort());
HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 130610c..08b71da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -240,8 +240,8 @@ public abstract class TaskManagerProcessReapingTestBase extends TestLogger {
int taskManagerPort = Integer.parseInt(args[1]);
Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+ cfg.setString(JobManagerOptions.ADDRESS, "localhost");
+ cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 256);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 0e77ddd..79dd207 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -142,8 +143,8 @@ public class TaskManagerStartupTest {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, nonWritable.getAbsolutePath());
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
+ cfg.setString(JobManagerOptions.ADDRESS, "localhost");
+ cfg.setInteger(JobManagerOptions.PORT, 21656);
try {
TaskManager.runTaskManager(
@@ -184,8 +185,8 @@ public class TaskManagerStartupTest {
public void testMemoryConfigWrong() {
try {
Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
+ cfg.setString(JobManagerOptions.ADDRESS, "localhost");
+ cfg.setInteger(JobManagerOptions.PORT, 21656);
cfg.setString(ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, "true");
// something invalid
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
index 4671981..72596cd 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.akka
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, SecurityOptions}
+import org.apache.flink.configuration._
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
import org.junit.runner.RunWith
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -49,7 +49,7 @@ class AkkaSslITCase(_system: ActorSystem)
"start with akka ssl enabled" in {
val config = new Configuration()
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1")
+ config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
@@ -76,7 +76,7 @@ class AkkaSslITCase(_system: ActorSystem)
an[Exception] should be thrownBy {
val config = new Configuration()
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1")
+ config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "127.0.0.1")
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 7980cdf..9e0a6e1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -26,7 +26,7 @@ import akka.pattern.Patterns._
import akka.pattern.ask
import akka.testkit.CallingThreadDispatcher
import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
@@ -211,11 +211,11 @@ class TestingCluster(
// restart the leading job manager with the same port
val port = getLeaderRPCPort
val oldPort = originalConfiguration.getInteger(
- ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ JobManagerOptions.PORT,
0)
// we have to set the old port in the configuration file because this is used for startup
- originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port)
+ originalConfiguration.setInteger(JobManagerOptions.PORT, port)
clearLeader()
@@ -234,7 +234,7 @@ class TestingCluster(
}
// reset the original configuration
- originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, oldPort)
+ originalConfiguration.setInteger(JobManagerOptions.PORT, oldPort)
val newJobManagerActor = startJobManager(index, newJobManagerActorSystem)
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index bb26454..a5c5860 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -26,7 +26,7 @@ import org.apache.flink.client.cli.CliFrontendParser
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.client.CliFrontend
import org.apache.flink.runtime.minicluster.StandaloneMiniCluster
-import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration}
+import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, JobManagerOptions}
import org.apache.flink.runtime.minicluster.{FlinkMiniCluster, LocalFlinkMiniCluster}
import scala.collection.mutable.ArrayBuffer
@@ -145,7 +145,7 @@ object FlinkShell {
config.executionMode match {
case ExecutionMode.LOCAL => // Local mode
val config = GlobalConfiguration.loadConfiguration()
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
+ config.setInteger(JobManagerOptions.PORT, 0)
val miniCluster = new StandaloneMiniCluster(config)
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 042248b..74b1c68 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -25,8 +25,8 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
@@ -196,8 +196,8 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
Configuration configuration = new Configuration();
configuration.addAll(this.clientConfiguration);
- configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
- configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+ configuration.setString(JobManagerOptions.ADDRESS, host);
+ configuration.setInteger(JobManagerOptions.PORT, port);
ClusterClient client;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 5c65a7f..6882b46 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -405,8 +405,8 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
int jobManagerPort = Integer.parseInt(args[0]);
Configuration cfg = new Configuration();
- cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+ cfg.setString(JobManagerOptions.ADDRESS, "localhost");
+ cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 4c77ef0..7fb4c82 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -70,7 +71,7 @@ public class IPv6HostnamesITCase extends TestLogger {
log.info("Test will use IPv6 address " + addressString + " for connection tests");
Configuration conf = new Configuration();
- conf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, addressString);
+ conf.setString(JobManagerOptions.ADDRESS, addressString);
conf.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString);
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index b3beab6..d59e5b4 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -428,8 +429,8 @@ public class CliFrontendYarnAddressConfigurationTest {
}
private static void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) {
- String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
- int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+ String jobManagerAddress = config.getString(JobManagerOptions.ADDRESS);
+ int jobManagerPort = config.getInteger(JobManagerOptions.PORT, -1);
assertEquals(expectedAddress, jobManagerAddress);
assertEquals(expectedPort, jobManagerPort);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 7d6c5d6..5caea29 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.test.testdata.WordCountData;
@@ -202,9 +203,9 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
LOG.info("Extracted hostname:port: {} {}", hostname, port);
Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname,
- parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY));
+ parsedConfig.get(JobManagerOptions.ADDRESS.key()));
Assert.assertEquals("unable to find port in " + jsonConfig, port,
- parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));
+ parsedConfig.get(JobManagerOptions.PORT.key()));
// test logfile access
String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
http://git-wip-us.apache.org/repos/asf/flink/blob/fb48dc2f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index db67e9a..c86565b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -416,8 +416,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'",
appReport.getHost(), appReport.getRpcPort(), applicationID);
- flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, appReport.getHost());
- flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, appReport.getRpcPort());
+ flinkConfiguration.setString(JobManagerOptions.ADDRESS, appReport.getHost());
+ flinkConfiguration.setInteger(JobManagerOptions.PORT, appReport.getRpcPort());
return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
} catch (Exception e) {
@@ -591,8 +591,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
int port = report.getRpcPort();
// Correctly initialize the Flink config
- flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
- flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+ flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
+ flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, true);