You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/24 10:11:57 UTC
[3/4] flink git commit: [FLINK-4253] [config] Clean up renaming of
'recovery.mode'
http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 5dd4188..fa3135a 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -263,7 +263,7 @@ class ForkableFlinkMiniCluster(
override def start(): Unit = {
val zookeeperURL = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "")
- zookeeperCluster = if (recoveryMode == HighAvailabilityMode.ZOOKEEPER &&
+ zookeeperCluster = if (haMode == HighAvailabilityMode.ZOOKEEPER &&
zookeeperURL.equals("")) {
LOG.info("Starting ZooKeeper cluster.")
http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index 22bf62a..cc8ab80 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -148,7 +148,7 @@ public class ChaosMonkeyITCase extends TestLogger {
// -----------------------------------------------------------------------------------------
// Setup
- Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+ Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
ZooKeeper.getConnectString(), FileStateBackendBasePath.toURI().toString());
// Akka and restart timeouts
@@ -564,7 +564,7 @@ public class ChaosMonkeyITCase extends TestLogger {
fail(fsCheckpoints + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles()));
}
- File fsRecovery = new File(new URI(config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, "")).getPath());
+ File fsRecovery = new File(new URI(config.getString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, "")).getPath());
LOG.info("Checking " + fsRecovery);
http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index f66e52c..49eaeb7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -160,7 +160,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper
+ Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(ZooKeeper
.getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toURI().toString());
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism);
@@ -311,7 +311,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
final String zooKeeperQuorum = ZooKeeper.getConnectString();
final String fileStateBackendPath = FileStateBackendBasePath.getAbsoluteFile().toString();
- Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+ Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
zooKeeperQuorum,
fileStateBackendPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index e0e165d..bf39c4b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -125,7 +125,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
*/
@Test
public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
- Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+ Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
// Configure the cluster
@@ -172,7 +172,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
*/
@Test
public void testSubmitJobToNonLeader() throws Exception {
- Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+ Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
// Configure the cluster
@@ -257,7 +257,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
*/
@Test
public void testClientNonDetachedListeningBehaviour() throws Exception {
- Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+ Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
// Test actor system
http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 0c52204..9b0d9de 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -149,7 +149,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
*/
public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception {
Configuration config = new Configuration();
- config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+ config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zkQuorum);
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
@@ -249,7 +249,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
coordinateTempDir = createTempDirectory();
// Job Managers
- Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+ Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
// Start first process
http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 7091339..9bd8cc3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -91,11 +91,11 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
int numJMs = 10;
int numTMs = 3;
- configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+ configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
- configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
+ configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
@@ -139,12 +139,12 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+ configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
- configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
+ configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message
http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 25dbe53..a293348 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -115,7 +115,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
- "@@" + ConfigConstants.ZOOKEEPER_HA_PATH + "=" + fsStateHandlePath + "/recovery");
+ "@@" + ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "=" + fsStateHandlePath + "/recovery");
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
ClusterClient yarnCluster = null;