You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/24 10:11:55 UTC

[1/4] flink git commit: [FLINK-4253] [config] Rename 'recovery.mode' key to 'high-availability'

Repository: flink
Updated Branches:
  refs/heads/master 720645587 -> 58165d69f


http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 1c0290a..048fbee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -89,8 +89,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testZooKeeperLeaderElectionRetrieval() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -134,8 +134,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testZooKeeperReelection() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 
 		Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
@@ -217,8 +217,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testZooKeeperReelectionWithReplacement() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 
 		int num = 3;
 		int numTries = 30;
@@ -295,9 +295,9 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		final String leaderPath = "/leader";
 
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		configuration.setString(ConfigConstants.ZOOKEEPER_LEADER_PATH, leaderPath);
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -379,8 +379,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testExceptionForwarding() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -448,8 +448,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	@Test
 	public void testEphemeralZooKeeperNodes() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -466,7 +466,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			listener = new TestingListener();
 
 			client = ZooKeeperUtils.startCuratorFramework(configuration);
-			final String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
+			final String leaderPath = configuration.getString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
 					ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
 			cache = new NodeCache(client, leaderPath);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index aae1840..5aace34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -23,6 +23,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -43,6 +44,7 @@ import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
@@ -82,8 +84,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
 		long sleepingTime = 1000;
 
-		config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
 
 		LeaderElectionService leaderElectionService = null;
 		LeaderElectionService faultyLeaderElectionService;
@@ -179,8 +181,8 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 	@Test
 	public void testTimeoutOfFindConnectingAddress() throws Exception {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+		config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
 
 		FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
 
@@ -190,6 +192,46 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 		assertEquals(InetAddress.getLocalHost(), result);
 	}
 
+	@Test
+	public void testConnectionToZookeeperOverridingOldConfig() throws Exception {
+		Configuration config = new Configuration();
+		// The new config will be taken into effect
+		config.setString(ConfigConstants.RECOVERY_MODE, "standalone");
+		config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+
+		FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+
+		LeaderRetrievalService leaderRetrievalService =
+			LeaderRetrievalUtils.createLeaderRetrievalService(config);
+		InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
+
+		assertEquals(InetAddress.getLocalHost(), result);
+	}
+
+	@Test
+	public void testConnectionToStandAloneLeaderOverridingOldConfig() throws Exception {
+		Configuration config = new Configuration();
+		// The new config will be taken into effect
+		config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		config.setString(ConfigConstants.HIGH_AVAILABILITY, "none");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+
+		HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(config);
+		assertTrue(mode == HighAvailabilityMode.NONE);
+	}
+
+	@Test
+	public void testConnectionToZookeeperUsingOldConfig() throws Exception {
+		Configuration config = new Configuration();
+		// The new config will be taken into effect
+		config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
+
+		HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(config);
+		assertTrue(mode == HighAvailabilityMode.ZOOKEEPER);
+	}
+
 	class FindConnectingAddress implements Runnable {
 
 		private final Configuration config;

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
index fac6162..66d523f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -212,7 +212,7 @@ public class JobManagerProcess extends TestJvmProcess {
 		 * <code>--port PORT</code>.
 		 *
 		 * <p>Other arguments are parsed to a {@link Configuration} and passed to the
-		 * JobManager, for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum
+		 * JobManager, for instance: <code>--high-availability ZOOKEEPER --recovery.zookeeper.quorum
 		 * "xyz:123:456"</code>.
 		 */
 		public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
index 97e7cca..417dc88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -96,7 +96,7 @@ public class TaskManagerProcess extends TestJvmProcess {
 
 		/**
 		 * All arguments are parsed to a {@link Configuration} and passed to the Taskmanager,
-		 * for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum "xyz:123:456"</code>.
+		 * for instance: <code>--high-availability ZOOKEEPER --recovery.zookeeper.quorum "xyz:123:456"</code>.
 		 */
 		public static void main(String[] args) throws Exception {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 2796337..c94842f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -31,12 +31,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class ZooKeeperTestUtils {
 
 	/**
-	 * Creates a configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+	 * Creates a configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 	 *
 	 * @param zooKeeperQuorum   ZooKeeper quorum to connect to
 	 * @param fsStateHandlePath Base path for file system state backend (for checkpoints and
 	 *                          recovery)
-	 * @return A new configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+	 * @return A new configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 	 */
 	public static Configuration createZooKeeperRecoveryModeConfig(
 			String zooKeeperQuorum, String fsStateHandlePath) {
@@ -45,13 +45,13 @@ public class ZooKeeperTestUtils {
 	}
 
 	/**
-	 * Sets all necessary configuration keys to operate in {@link RecoveryMode#ZOOKEEPER}.
+	 * Sets all necessary configuration keys to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 	 *
 	 * @param config            Configuration to use
 	 * @param zooKeeperQuorum   ZooKeeper quorum to connect to
 	 * @param fsStateHandlePath Base path for file system state backend (for checkpoints and
 	 *                          recovery)
-	 * @return The modified configuration to operate in {@link RecoveryMode#ZOOKEEPER}.
+	 * @return The modified configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 	 */
 	public static Configuration setZooKeeperRecoveryMode(
 			Configuration config,
@@ -66,8 +66,8 @@ public class ZooKeeperTestUtils {
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
 
 		// ZooKeeper recovery mode
-		config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
-		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum);
+		config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum);
 
 		int connTimeout = 5000;
 		if (System.getenv().containsKey("CI")) {
@@ -75,20 +75,20 @@ public class ZooKeeperTestUtils {
 			connTimeout = 30000;
 		}
 
-		config.setInteger(ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
-		config.setInteger(ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
+		config.setInteger(ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT, connTimeout);
+		config.setInteger(ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
 
 		// File system state backend
 		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
-		config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, fsStateHandlePath + "/recovery");
+		config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, fsStateHandlePath + "/recovery");
 
 		// Akka failure detection and execution retries
 		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
 		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "6 s");
 		config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 9);
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
-		config.setString(ConfigConstants.RECOVERY_JOB_DELAY, "10 s");
+		config.setString(ConfigConstants.HA_JOB_DELAY, "10 s");
 
 		return config;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
index 0d01f65..daed4a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
@@ -71,7 +71,7 @@ public class ZooKeeperUtilTest extends TestLogger {
 	}
 
 	private Configuration setQuorum(Configuration conf, String quorum) {
-		conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, quorum);
+		conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, quorum);
 		return conf;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
index 8fc80e0..467706f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java
@@ -58,7 +58,7 @@ public class ZooKeeperTestEnvironment {
 				zooKeeperServer = new TestingServer(true);
 				zooKeeperCluster = null;
 
-				conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+				conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
 						zooKeeperServer.getConnectString());
 			}
 			else {
@@ -67,7 +67,7 @@ public class ZooKeeperTestEnvironment {
 
 				zooKeeperCluster.start();
 
-				conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+				conf.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
 						zooKeeperCluster.getConnectString());
 			}
 
@@ -127,7 +127,7 @@ public class ZooKeeperTestEnvironment {
 	 */
 	public CuratorFramework createClient() {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, getConnectString());
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, getConnectString());
 		return ZooKeeperUtils.startCuratorFramework(config);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 02a0fec..7d2b86c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -423,7 +423,8 @@ object TestingUtils {
       prefix: String)
     : ActorGateway = {
 
-    configuration.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE)
+    configuration.setString(ConfigConstants.HIGH_AVAILABILITY,
+      ConfigConstants.DEFAULT_HIGH_AVAILABILTY)
 
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
@@ -502,7 +503,8 @@ object TestingUtils {
       configuration: Configuration)
   : ActorGateway = {
 
-    configuration.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE)
+    configuration.setString(ConfigConstants.HIGH_AVAILABILITY,
+      ConfigConstants.DEFAULT_HIGH_AVAILABILTY)
 
     val actor = FlinkResourceManager.startResourceManagerActors(
       configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index c51e666..7e5acee 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -120,7 +120,7 @@ public class TestBaseUtils extends TestLogger {
 
 		if (startZooKeeper) {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-			config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+			config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 		}
 
 		return startCluster(config, singleActorSystem);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 42c0a6a..5dd4188 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -29,7 +29,7 @@ import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
+import org.apache.flink.runtime.jobmanager.{JobManager, HighAvailabilityMode}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
@@ -261,14 +261,16 @@ class ForkableFlinkMiniCluster(
   }
 
   override def start(): Unit = {
-    val zookeeperURL = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "")
+    val zookeeperURL = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "")
 
-    zookeeperCluster = if(recoveryMode == RecoveryMode.ZOOKEEPER && zookeeperURL.equals("")) {
+    zookeeperCluster = if (recoveryMode == HighAvailabilityMode.ZOOKEEPER &&
+      zookeeperURL.equals("")) {
       LOG.info("Starting ZooKeeper cluster.")
 
       val testingCluster = new TestingCluster(1)
 
-      configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString)
+      configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+        testingCluster.getConnectString)
 
       testingCluster.start()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index e97532c..22bf62a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -564,7 +564,7 @@ public class ChaosMonkeyITCase extends TestLogger {
 			fail(fsCheckpoints + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles()));
 		}
 
-		File fsRecovery = new File(new URI(config.getString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "")).getPath());
+		File fsRecovery = new File(new URI(config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, "")).getPath());
 
 		LOG.info("Checking " + fsRecovery);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index eccf971..e0e165d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -482,7 +482,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 
 		// ZooKeeper
 		String currentJobsPath = config.getString(
-				ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+				ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
 				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
@@ -514,7 +514,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 
 		// ZooKeeper
 		String currentJobsPath = config.getString(
-			ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+			ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
 			ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 88aeb09..0c52204 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -149,8 +149,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 	 */
 	public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
-		config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zkQuorum);
+		config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zkQuorum);
 
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"leader", 1, config);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 45ee839..7091339 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -91,11 +91,11 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		int numJMs = 10;
 		int numTMs = 3;
 
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
+		configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
 
 		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
 
@@ -139,12 +139,12 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
 		Configuration configuration = new Configuration();
 
-		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
 		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
+		configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
 
 		// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
 		// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 60ae2ef..48ad7f5 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -202,7 +202,7 @@ public class CliFrontendYarnAddressConfigurationTest {
 				CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
 
 		frontend.retrieveClient(options);
-		String zkNs = frontend.getConfiguration().getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, "error");
+		String zkNs = frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, "error");
 		Assert.assertTrue(zkNs.matches("application_\\d+_0042"));
 	}
 
@@ -216,7 +216,7 @@ public class CliFrontendYarnAddressConfigurationTest {
 				CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace});
 
 		frontend.retrieveClient(options);
-		String zkNs = frontend.getConfiguration().getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, "error");
+		String zkNs = frontend.getConfiguration().getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, "error");
 		Assert.assertEquals(overrideZkNamespace, zkNs);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 9b52975..25dbe53 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -115,7 +115,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
 			"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
 			"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
-			"@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery");
+			"@@" + ConfigConstants.ZOOKEEPER_HA_PATH + "=" + fsStateHandlePath + "/recovery");
 		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
 
 		ClusterClient yarnCluster = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ba07af1..f4c2032 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -23,7 +23,7 @@ import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -553,13 +553,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// no user specified cli argument for namespace?
 		if (zkNamespace == null || zkNamespace.isEmpty()) {
 			// namespace defined in config? else use applicationId as default.
-			zkNamespace = flinkConfiguration.getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, String.valueOf(appId));
+			zkNamespace = flinkConfiguration.getString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, String.valueOf(appId));
 			setZookeeperNamespace(zkNamespace);
 		}
 
-		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+		flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
 
-		if (RecoveryMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
+		if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
 			// activate re-execution of failed applications
 			appContext.setMaxAppAttempts(
 				flinkConfiguration.getInteger(

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 39b2510..87a2c98 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -430,7 +430,7 @@ public class YarnApplicationMasterRunner {
 		// override zookeeper namespace with user cli argument (if provided)
 		String cliZKNamespace = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
 		if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) {
-			configuration.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, cliZKNamespace);
+			configuration.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, cliZKNamespace);
 		}
 
 		// if a web monitor shall be started, set the port to random binding

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index bee6a7a..3c93e34 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -58,7 +58,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
-import static org.apache.flink.configuration.ConfigConstants.ZOOKEEPER_NAMESPACE_KEY;
+import static org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY;
 
 /**
  * Class handling the command line interface to the YARN session.
@@ -503,8 +503,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		if(null != applicationID) {
 			String zkNamespace = cmdLine.hasOption(ZOOKEEPER_NAMESPACE.getOpt()) ?
 					cmdLine.getOptionValue(ZOOKEEPER_NAMESPACE.getOpt())
-					: config.getString(ZOOKEEPER_NAMESPACE_KEY, applicationID);
-			config.setString(ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+					: config.getString(HA_ZOOKEEPER_NAMESPACE_KEY, applicationID);
+			config.setString(HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
 
 			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
 			yarnDescriptor.setFlinkConfiguration(config);


[3/4] flink git commit: [FLINK-4253] [config] Clean up renaming of 'recovery.mode'

Posted by uc...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 5dd4188..fa3135a 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -263,7 +263,7 @@ class ForkableFlinkMiniCluster(
   override def start(): Unit = {
     val zookeeperURL = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "")
 
-    zookeeperCluster = if (recoveryMode == HighAvailabilityMode.ZOOKEEPER &&
+    zookeeperCluster = if (haMode == HighAvailabilityMode.ZOOKEEPER &&
       zookeeperURL.equals("")) {
       LOG.info("Starting ZooKeeper cluster.")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index 22bf62a..cc8ab80 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -148,7 +148,7 @@ public class ChaosMonkeyITCase extends TestLogger {
 		// -----------------------------------------------------------------------------------------
 
 		// Setup
-		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
 				ZooKeeper.getConnectString(), FileStateBackendBasePath.toURI().toString());
 
 		// Akka and restart timeouts
@@ -564,7 +564,7 @@ public class ChaosMonkeyITCase extends TestLogger {
 			fail(fsCheckpoints + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles()));
 		}
 
-		File fsRecovery = new File(new URI(config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, "")).getPath());
+		File fsRecovery = new File(new URI(config.getString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, "")).getPath());
 
 		LOG.info("Checking " + fsRecovery);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index f66e52c..49eaeb7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -160,7 +160,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(ZooKeeper
 				.getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toURI().toString());
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism);
 
@@ -311,7 +311,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 		final String zooKeeperQuorum = ZooKeeper.getConnectString();
 		final String fileStateBackendPath = FileStateBackendBasePath.getAbsoluteFile().toString();
 
-		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
 				zooKeeperQuorum,
 				fileStateBackendPath);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index e0e165d..bf39c4b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -125,7 +125,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 	 */
 	@Test
 	public void testJobPersistencyWhenJobManagerShutdown() throws Exception {
-		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
 				ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
 
 		// Configure the cluster
@@ -172,7 +172,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 	 */
 	@Test
 	public void testSubmitJobToNonLeader() throws Exception {
-		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
 				ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
 
 		// Configure the cluster
@@ -257,7 +257,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 	 */
 	@Test
 	public void testClientNonDetachedListeningBehaviour() throws Exception {
-		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
 				ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
 
 		// Test actor system

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 0c52204..9b0d9de 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -149,7 +149,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 	 */
 	public void testJobManagerFailure(String zkQuorum, final File coordinateDir) throws Exception {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+		config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
 		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zkQuorum);
 
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
@@ -249,7 +249,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 			coordinateTempDir = createTempDirectory();
 
 			// Job Managers
-			Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+			Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
 					ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
 
 			// Start first process

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 7091339..9bd8cc3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -91,11 +91,11 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		int numJMs = 10;
 		int numTMs = 3;
 
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
 
 		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
 
@@ -139,12 +139,12 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 
 		Configuration configuration = new Configuration();
 
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
 		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		configuration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
+		configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
 
 		// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
 		// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 25dbe53..a293348 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -115,7 +115,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
 			"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
 			"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
-			"@@" + ConfigConstants.ZOOKEEPER_HA_PATH + "=" + fsStateHandlePath + "/recovery");
+			"@@" + ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "=" + fsStateHandlePath + "/recovery");
 		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
 
 		ClusterClient yarnCluster = null;


[4/4] flink git commit: [FLINK-4253] [config] Clean up renaming of 'recovery.mode'

Posted by uc...@apache.org.
[FLINK-4253] [config] Clean up renaming of 'recovery.mode'

- Renamed config keys and default values to be consistent
- Updated default flink-conf.yaml
- Cleaned up code occurrences of recovery mode

This closes #2342.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58165d69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58165d69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58165d69

Branch: refs/heads/master
Commit: 58165d69fb4cd49018031f14bfaf1e17039b36f7
Parents: 01ffe34
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Aug 22 14:59:14 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Aug 24 12:09:31 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  34 ++--
 docs/setup/jobmanager_high_availability.md      |  16 +-
 .../flink/configuration/ConfigConstants.java    | 157 +++++++++--------
 flink-dist/src/main/flink-bin/bin/config.sh     |  33 ++--
 .../src/main/flink-bin/bin/start-cluster.sh     |   2 +-
 .../src/main/flink-bin/bin/stop-cluster.sh      |   2 +-
 flink-dist/src/main/resources/flink-conf.yaml   |  16 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |   4 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  11 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |   3 +-
 .../flink/runtime/blob/FileSystemBlobStore.java |  20 ++-
 .../StandaloneCheckpointIDCounter.java          |   3 +-
 .../jobmanager/HighAvailabilityMode.java        |  40 ++---
 .../runtime/util/LeaderRetrievalUtils.java      |   2 +-
 .../flink/runtime/util/ZooKeeperUtils.java      | 175 +++++++++----------
 .../flink/runtime/jobmanager/JobManager.scala   |  17 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  14 +-
 .../flink/runtime/blob/BlobRecoveryITCase.java  |   4 +-
 .../client/JobClientActorRecoveryITCase.java    |   3 +-
 .../BlobLibraryCacheRecoveryITCase.java         |   4 +-
 .../jobmanager/HighAvailabilityModeTest.java    |  71 ++++++++
 .../jobmanager/JobManagerHARecoveryTest.java    |   4 +-
 .../JobManagerLeaderElectionTest.java           |   4 +-
 .../ZooKeeperLeaderElectionTest.java            |  12 +-
 .../ZooKeeperLeaderRetrievalTest.java           |  46 +----
 .../runtime/testutils/JobManagerProcess.java    |   2 +-
 .../runtime/testutils/TaskManagerProcess.java   |   2 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |  10 +-
 .../runtime/testingUtils/TestingUtils.scala     |   8 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   2 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   2 +-
 .../flink/test/recovery/ChaosMonkeyITCase.java  |   4 +-
 .../JobManagerHACheckpointRecoveryITCase.java   |   4 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |   6 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |   4 +-
 .../ZooKeeperLeaderElectionITCase.java          |   8 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   2 +-
 37 files changed, 389 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index e6a335b..51475cc 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -134,7 +134,7 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups.
 
-- `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this config was named as `recovery.zookeeper.storageDir`.
+- `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user jar's) on the TaskManagers.
 
@@ -283,31 +283,37 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use
   For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.
 
 
-## High Availability Mode
+## High Availability (HA)
 
-- `high-availability`: (Default 'none') Defines the recovery mode used for the cluster execution. Currently, Flink supports the 'none' mode where only a single JobManager runs and no JobManager state is checkpointed. The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value.  Previously this config was named 'recovery.mode' and the default config was 'standalone'.
+- `high-availability`: Defines the high availability mode used for the cluster execution. Currently, Flink supports the following modes:
+  - `none` (default): No high availability. A single JobManager runs and no JobManager state is checkpointed.
+  - `zookeeper`: Supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `high-availability.zookeeper.quorum` configuration value.
 
-- `high-availability.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected.  Previously this config was name as `recovery.zookeeper.quorum`.
+Previously this key was named `recovery.mode` and the default value was `standalone`.
 
-- `high-availability.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create namespace directories.  Previously this config was name as `recovery.zookeeper.path.root`.
+### ZooKeeper-based HA Mode
 
-- `high-availability.zookeeper.path.namespace`: (Default '/default_ns' in standalone mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under the root dir where the ZooKeeper recovery mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper.  Previously this config was named as `recovery.zookeeper.path.namespace`.
+- `high-availability.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connect to the ZooKeeper cluster when the 'zookeeper' HA mode is selected. Previously this key was name `recovery.zookeeper.quorum`.
 
-- `high-availability.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.  Previously this config was named as `recovery.zookeeper.path.latch`.
+- `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named `recovery.zookeeper.path.root`.
 
-- `high-availability.zookeeper.path.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID  Previously this config was named as `recovery.zookeeper.path.leader`.
+- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in standalone cluster mode, or the <yarn-application-id> under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace`.
 
-- `high-availability.zookeeper.storageDir`: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA.  Previously this config was named as `recovery.zookeeper.storageDir`.
+- `high-availability.zookeeper.path.latch`: (Default `/leaderlatch`) Defines the znode of the leader latch which is used to elect the leader. Previously this key was named `recovery.zookeeper.path.latch`.
 
-- `high-availability.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.  Previously this config was named as `recovery.zookeeper.client.session-timeout`
+- `high-availability.zookeeper.path.leader`: (Default `/leader`) Defines the znode of the leader which contains the URL to the leader and the current leader session ID. Previously this key was named `recovery.zookeeper.path.leader`.
 
-- `high-availability.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.  Previously this config was named as `recovery.zookeeper.client.connection-timeout`.
+- `high-availability.zookeeper.storageDir`: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA. Previously this key was named `recovery.zookeeper.storageDir`.
 
-- `high-availability.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.  Previously this config was named as `recovery.zookeeper.client.retry-wait`.
+- `high-availability.zookeeper.client.session-timeout`: (Default `60000`) Defines the session timeout for the ZooKeeper session in ms. Previously this key was named `recovery.zookeeper.client.session-timeout`
 
-- `high-availability.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.  Previously this config was named as `recovery.zookeeper.client.max-retry-attempts`.
+- `high-availability.zookeeper.client.connection-timeout`: (Default `15000`) Defines the connection timeout for ZooKeeper in ms. Previously this key was named `recovery.zookeeper.client.connection-timeout`.
 
-- `high-availability.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation.  Previously this config was named as `recovery.job.delay`.
+- `high-availability.zookeeper.client.retry-wait`: (Default `5000`) Defines the pause between consecutive retries in ms. Previously this key was named `recovery.zookeeper.client.retry-wait`.
+
+- `high-availability.zookeeper.client.max-retry-attempts`: (Default `3`) Defines the number of connection retries before the client gives up. Previously this key was named `recovery.zookeeper.client.max-retry-attempts`.
+
+- `high-availability.job.delay`: (Default `akka.ask.timeout`) Defines the delay before persisted jobs are recovered in case of a master recovery situation. Previously this key was named `recovery.job.delay`.
 
 ## Environment
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md
index dd6782d..9dcc7cc 100644
--- a/docs/setup/jobmanager_high_availability.md
+++ b/docs/setup/jobmanager_high_availability.md
@@ -44,7 +44,7 @@ As an example, consider the following setup with three JobManager instances:
 
 To enable JobManager High Availability you have to set the **high-availability mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports.
 
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for  *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) installation.
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) installation.
 
 #### Masters File (masters)
 
@@ -67,7 +67,6 @@ In order to start an HA-cluster add the following configuration keys to `conf/fl
 - **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.
 
   <pre>high-availability: zookeeper</pre>
-- **Previously this config was named 'recovery.mode' and the default config was 'standalone'.
 
 - **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.
 
@@ -83,14 +82,14 @@ In order to start an HA-cluster add the following configuration keys to `conf/fl
 
   <pre>high-availability.zookeeper.path.namespace: /default_ns # important: customize per cluster</pre>
 
-  **Important**: if you are running multiple Flink HA clusters, you have to manually configure separate namespaces for each cluster. By default, the Yarn cluster and the Yarn session automatically generate namespaces based on Yarn application id. A manual configuration overrides this behaviour in Yarn. Specifying a namespace with the -z CLI option, in turn, overrides manual configuration. 
+  **Important**: if you are running multiple Flink HA clusters, you have to manually configure separate namespaces for each cluster. By default, the Yarn cluster and the Yarn session automatically generate namespaces based on Yarn application id. A manual configuration overrides this behaviour in Yarn. Specifying a namespace with the -z CLI option, in turn, overrides manual configuration.
 
 - **State backend and storage directory** (required): JobManager meta data is persisted in the *state backend* and only a pointer to this state is stored in ZooKeeper. Currently, only the file system state backend is supported in HA mode.
 
     <pre>
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
 
     The `storageDir` stores all meta data needed to recover a JobManager failure.
 
@@ -98,16 +97,17 @@ After configuring the masters and the ZooKeeper quorum, you can use the provided
 
 #### Example: Standalone Cluster with 2 JobManagers
 
-1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
+1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
 
    <pre>
 high-availability: zookeeper
 high-availability.zookeeper.quorum: localhost:2181
 high-availability.zookeeper.path.root: /flink
 high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
+
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
 
 2. **Configure masters** in `conf/masters`:
 
@@ -184,16 +184,16 @@ This means that the application can be restarted 10 times before YARN fails the
 
 #### Example: Highly Available YARN Session
 
-1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
+1. **Configure HA mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
 
    <pre>
 high-availability: zookeeper
 high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery
 high-availability.zookeeper.path.root: /flink
 high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-high-availability.zookeeper.storageDir: hdfs:///flink/recovery
 yarn.application-attempts: 10</pre>
 
 3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine):

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5cc1161..514c730 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -624,129 +624,134 @@ public final class ConfigConstants {
 	
 	public static final String FLINK_JVM_OPTIONS = "env.java.opts";
 
-	// --------------------------- Recovery -----------------------------------
+	// --------------------------- High Availability --------------------------
 
-	/** Defines recovery mode used for the cluster execution ("standalone", "zookeeper")
-	 *  Use {@link #HIGH_AVAILABILITY} instead
-	 * */
+	/** Defines high availabilty mode used for the cluster execution ("NONE", "ZOOKEEPER") */
+	@PublicEvolving
+	public static final String HA_MODE = "high-availability";
+
+	/** Ports used by the job manager if not in 'none' recovery mode */
+	@PublicEvolving
+	public static final String HA_JOB_MANAGER_PORT = "high-availability.jobmanager.port";
+
+	/** The time before the JobManager recovers persisted jobs */
+	@PublicEvolving
+	public static final String HA_JOB_DELAY = "high-availability.job.delay";
+
+	/** Deprecated in favour of {@link #HA_MODE}. */
 	@Deprecated
 	public static final String RECOVERY_MODE = "recovery.mode";
 
-	/** Defines recovery mode used for the cluster execution ("NONE", "ZOOKEEPER") */
-	public static final String HIGH_AVAILABILITY = "high-availability";
-
-	/** Ports used by the job manager if not in standalone recovery mode */
+	/** Deprecated in favour of {@link #HA_JOB_MANAGER_PORT}. */
 	@Deprecated
 	public static final String RECOVERY_JOB_MANAGER_PORT = "recovery.jobmanager.port";
 
-	/** Ports used by the job manager if not in 'none' recovery mode */
-	public static final String HA_JOB_MANAGER_PORT =
-		"high-availability.jobmanager.port";
-
-	/** The time before the JobManager recovers persisted jobs */
+	/** Deprecated in favour of {@link #HA_JOB_DELAY}. */
 	@Deprecated
 	public static final String RECOVERY_JOB_DELAY = "recovery.job.delay";
 
-	/** The time before the JobManager recovers persisted jobs */
-	public static final String HA_JOB_DELAY = "high-availability.job.delay";
-
 	// --------------------------- ZooKeeper ----------------------------------
 
 	/** ZooKeeper servers. */
-	@Deprecated
-	public static final String ZOOKEEPER_QUORUM_KEY = "recovery.zookeeper.quorum";
-
-	/** ZooKeeper servers. */
-	public static final String HA_ZOOKEEPER_QUORUM_KEY =
-		"high-availability.zookeeper.quorum";
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_QUORUM_KEY = "high-availability.zookeeper.quorum";
 
 	/**
 	 * File system state backend base path for recoverable state handles. Recovery state is written
 	 * to this path and the file state handles are persisted for recovery.
 	 */
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_STORAGE_PATH = "high-availability.zookeeper.storageDir";
+
+	/** ZooKeeper root path. */
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_DIR_KEY = "high-availability.zookeeper.path.root";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_NAMESPACE_KEY = "high-availability.zookeeper.path.namespace";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_LATCH_PATH = "high-availability.zookeeper.path.latch";
+
+	/** ZooKeeper root path (ZNode) for job graphs. */
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH = "high-availability.zookeeper.path.jobgraphs";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";
+
+	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH = "high-availability.zookeeper.path.checkpoints";
+
+	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "high-availability.zookeeper.path.checkpoint-counter";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_SESSION_TIMEOUT = "high-availability.zookeeper.client.session-timeout";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_CONNECTION_TIMEOUT = "high-availability.zookeeper.client.connection-timeout";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_RETRY_WAIT = "high-availability.zookeeper.client.retry-wait";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = "high-availability.zookeeper.client.max-retry-attempts";
+
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_QUORUM_KEY}. */
 	@Deprecated
-	public static final String ZOOKEEPER_RECOVERY_PATH = "recovery.zookeeper.storageDir";
+	public static final String ZOOKEEPER_QUORUM_KEY = "recovery.zookeeper.quorum";
 
-	/**
-	 * File system state backend base path for recoverable state handles. Recovery state is written
-	 * to this path and the file state handles are persisted for recovery.
-	 */
-	public static final String ZOOKEEPER_HA_PATH =
-		"high-availability.zookeeper.storageDir";
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_STORAGE_PATH}. */
+	@Deprecated
+	public static final String ZOOKEEPER_RECOVERY_PATH = "recovery.zookeeper.storageDir";
 
-	/** ZooKeeper root path. */
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_DIR_KEY}. */
 	@Deprecated
 	public static final String ZOOKEEPER_DIR_KEY = "recovery.zookeeper.path.root";
 
-	/** ZooKeeper root path. */
-	public static final String HA_ZOOKEEPER_DIR_KEY =
-		"high-availability.zookeeper.path.root";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_NAMESPACE_KEY}. */
 	@Deprecated
 	public static final String ZOOKEEPER_NAMESPACE_KEY = "recovery.zookeeper.path.namespace";
 
-	public static final String HA_ZOOKEEPER_NAMESPACE_KEY =
-		"high-availability.zookeeper.path.namespace";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_LATCH_PATH}. */
 	@Deprecated
 	public static final String ZOOKEEPER_LATCH_PATH = "recovery.zookeeper.path.latch";
 
-	public static final String HA_ZOOKEEPER_LATCH_PATH =
-		"high-availability.zookeeper.path.latch";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_LEADER_PATH}. */
 	@Deprecated
 	public static final String ZOOKEEPER_LEADER_PATH = "recovery.zookeeper.path.leader";
 
-	public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";
-
-	/** ZooKeeper root path (ZNode) for job graphs. */
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_JOBGRAPHS_PATH}. */
 	@Deprecated
 	public static final String ZOOKEEPER_JOBGRAPHS_PATH = "recovery.zookeeper.path.jobgraphs";
 
-	/** ZooKeeper root path (ZNode) for job graphs. */
-	public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH =
-		"high-availability.zookeeper.path.jobgraphs";
-
-	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_CHECKPOINTS_PATH}. */
 	@Deprecated
 	public static final String ZOOKEEPER_CHECKPOINTS_PATH = "recovery.zookeeper.path.checkpoints";
 
-	/** ZooKeeper root path (ZNode) for completed checkpoints. */
-	public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH =
-		"high-availability.zookeeper.path.checkpoints";
-
-	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}. */
 	@Deprecated
 	public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "recovery.zookeeper.path.checkpoint-counter";
 
-	/** ZooKeeper root path (ZNode) for checkpoint counters. */
-	public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
-		"high-availability.zookeeper.path.checkpoint-counter";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_SESSION_TIMEOUT}. */
 	@Deprecated
 	public static final String ZOOKEEPER_SESSION_TIMEOUT = "recovery.zookeeper.client.session-timeout";
 
-	public static final String HA_ZOOKEEPER_SESSION_TIMEOUT =
-		"high-availability.zookeeper.client.session-timeout";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_CONNECTION_TIMEOUT}. */
 	@Deprecated
 	public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "recovery.zookeeper.client.connection-timeout";
 
-	public static final String HA_ZOOKEEPER_CONNECTION_TIMEOUT =
-		"high-availability.zookeeper.client.connection-timeout";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_RETRY_WAIT}. */
 	@Deprecated
 	public static final String ZOOKEEPER_RETRY_WAIT = "recovery.zookeeper.client.retry-wait";
 
-	public static final String HA_ZOOKEEPER_RETRY_WAIT =
-		"high-availability.zookeeper.client.retry-wait";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS}. */
 	@Deprecated
 	public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "recovery.zookeeper.client.max-retry-attempts";
 
-	public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS =
-		"high-availability.zookeeper.client.max-retry-attempts";
-
 	// ---------------------------- Metrics -----------------------------------
 
 	/**
@@ -1090,16 +1095,24 @@ public final class ConfigConstants {
 
 	public static final String LOCAL_START_WEBSERVER = "local.start-webserver";
 
-	// --------------------------- Recovery ---------------------------------
+	// --------------------------- High Availability ---------------------------------
+
+	@PublicEvolving
+	public static String DEFAULT_HA_MODE = "none";
+
+	/** Deprecated in favour of {@link #DEFAULT_HA_MODE} */
 	@Deprecated
 	public static String DEFAULT_RECOVERY_MODE = "standalone";
 
-	public static String DEFAULT_HIGH_AVAILABILTY = "none";
-
 	/**
 	 * Default port used by the job manager if not in standalone recovery mode. If <code>0</code>
 	 * the OS picks a random port port.
 	 */
+	@PublicEvolving
+	public static final String DEFAULT_HA_JOB_MANAGER_PORT = "0";
+
+	/** Deprecated in favour of {@link #DEFAULT_HA_JOB_MANAGER_PORT} */
+	@Deprecated
 	public static final String DEFAULT_RECOVERY_JOB_MANAGER_PORT = "0";
 
 	// --------------------------- ZooKeeper ----------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 687a39c..f7e7d58 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -104,8 +104,6 @@ KEY_ENV_JAVA_OPTS="env.java.opts"
 KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
 KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
 KEY_ENV_SSH_OPTS="env.ssh.opts"
-#deprecated
-KEY_RECOVERY_MODE="recovery.mode"
 KEY_HIGH_AVAILABILITY="high-availability"
 KEY_ZK_HEAP_MB="zookeeper.heap.mb"
 
@@ -259,25 +257,22 @@ if [ -z "${ZK_HEAP}" ]; then
     ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
 fi
 
-# for backward compatability
-if [ -z "${OLD_RECOVERY_MODE}" ]; then
-    OLD_RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}")
-fi
-
-if [ -z "${RECOVERY_MODE}" ]; then
-     # Read the new config
-     RECOVERY_MODE=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")
-     if [ -z "${RECOVERY_MODE}" ]; then
-        #no new config found. So old config should be used
-        if [ -z "${OLD_RECOVERY_MODE}" ]; then
-            # If old config is also not found, use the 'none' as the default config
-            RECOVERY_MODE="none"
-        elif [ ${OLD_RECOVERY_MODE} = "standalone" ]; then
-            # if oldconfig is 'standalone', rename to 'none'
-            RECOVERY_MODE="none"
+# High availability
+if [ -z "${HIGH_AVAILABILITY}" ]; then
+     HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")
+     if [ -z "${HIGH_AVAILABILITY}" ]; then
+        # Try deprecated value
+        DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
+        if [ -z "${DEPRECATED_HA}" ]; then
+            HIGH_AVAILABILITY="none"
+        elif [ ${DEPRECATED_HA} == "standalone" ]; then
+            # Standalone is now 'none'
+            HIGH_AVAILABILITY="none"
         else
-            RECOVERY_MODE=${OLD_RECOVERY_MODE}
+            HIGH_AVAILABILITY=${DEPRECATED_HA}
         fi
+     else
+         HIGH_AVAILABILITY="none"
      fi
 fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index 77bff1e..7611189 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -29,7 +29,7 @@ bin=`cd "$bin"; pwd`
 
 # Start the JobManager instance(s)
 shopt -s nocasematch
-if [[ $RECOVERY_MODE == "zookeeper" ]]; then
+if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
     # HA Mode
     readMasters
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
index c4d9086..bc86291 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
@@ -37,7 +37,7 @@ fi
 
 # Stop JobManager instance(s)
 shopt -s nocasematch
-if [[ $RECOVERY_MODE == "zookeeper" ]]; then
+if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
     # HA Mode
     readMasters
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index a2586ce..27fd84a 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -131,18 +131,14 @@ jobmanager.web.port: 8081
 
 
 #==============================================================================
-# Master High Availability (required configuration)
+# High Availability
 #==============================================================================
 
-# The list of ZooKepper quorum peers that coordinate the high-availability
+# The list of ZooKeeper quorum peers that coordinate the high-availability
 # setup. This must be a list of the form:
-# "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
 #
-# high-availability: zookeeper
-#
-# recovery.zookeeper.quorum: localhost:2181,...
+# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
 #
-# Note: You need to set the state backend to 'filesystem' and the checkpoint
-# directory (see above) before configuring the storageDir.
-#
-# recovery.zookeeper.storageDir: hdfs:///recovery
+# high-availability: zookeeper
+# high-availability.zookeeper.quorum: localhost:2181
+# high-availability.zookeeper.storageDir: hdfs:///flink/ha/

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index d9edafe..54c5e76 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -146,7 +146,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		List<LeaderRetrievalService> leaderRetrievalServices = new ArrayList<>();
 
 		try (TestingServer zooKeeper = new TestingServer()) {
-			final Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+			final Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
 				zooKeeper.getConnectString(),
 				temporaryFolder.getRoot().getPath());
 
@@ -296,7 +296,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			final Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
 			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
-			config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+			config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
 			config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());
 
 			actorSystem = AkkaUtils.createDefaultActorSystem();

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index d1b78a5..ff54b67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -66,7 +66,7 @@ public class BlobServer extends Thread implements BlobService {
 	/** Is the root directory for file storage */
 	private final File storageDir;
 
-	/** Blob store for recovery */
+	/** Blob store for HA */
 	private final BlobStore blobStore;
 
 	/** Set of currently running threads */
@@ -77,7 +77,7 @@ public class BlobServer extends Thread implements BlobService {
 
 	/**
 	 * Shutdown hook thread to ensure deletion of the storage directory (or <code>null</code> if
-	 * the configured recovery mode does not equal{@link HighAvailabilityMode#NONE})
+	 * the configured high availability mode does not equal{@link HighAvailabilityMode#NONE})
 	 */
 	private final Thread shutdownHook;
 
@@ -97,15 +97,12 @@ public class BlobServer extends Thread implements BlobService {
 		this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
 		LOG.info("Created BLOB server storage directory {}", storageDir);
 
-		// No recovery.
 		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
 			this.blobStore = new VoidBlobStore();
-		}
-		// Recovery.
-		else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
+		} else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
 			this.blobStore = new FileSystemBlobStore(config);
 		} else {
-			throw new IllegalConfigurationException("Unexpected recovery mode '" + highAvailabilityMode + ".");
+			throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + ".");
 		}
 
 		// configure the maximum number of concurrent connections

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 6ba1944..e74fa6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -336,7 +336,8 @@ public class BlobUtils {
 	 *
 	 * <p>The returned path can be used with the state backend for recovery purposes.
 	 *
-	 * <p>This follows the same scheme as {@link #getStorageLocation(File, BlobKey)}.
+	 * <p>This follows the same scheme as {@link #getStorageLocation(File, BlobKey)}
+	 * and is used for HA.
 	 */
 	static String getRecoveryPath(String basePath, BlobKey blobKey) {
 		// format: $base/cache/blob_$key

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index f535c35..ee189d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.ConfigurationUtil;
 import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Blob store backed by {@link FileSystem}.
  *
- * <p>This is used in addition to the local blob storage
+ * <p>This is used in addition to the local blob storage for high availability.
  */
 class FileSystemBlobStore implements BlobStore {
 
@@ -51,18 +52,19 @@ class FileSystemBlobStore implements BlobStore {
 	private final String basePath;
 
 	FileSystemBlobStore(Configuration config) throws IOException {
-		String recoveryPath = config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, null);
-		if(recoveryPath == null) {
-			recoveryPath = config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, null);
-		}
+		String storagePath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				config,
+				ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH,
+				null,
+				ConfigConstants.ZOOKEEPER_RECOVERY_PATH);
 
-		if (recoveryPath == null) {
+		if (storagePath == null) {
 			throw new IllegalConfigurationException(String.format("Missing configuration for " +
-					"file system state backend recovery path. Please specify via " +
-					"'%s' key.", ConfigConstants.ZOOKEEPER_HA_PATH));
+					"ZooKeeper file system path. Please specify via " +
+					"'%s' key.", ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH));
 		}
 
-		this.basePath = recoveryPath + "/blob";
+		this.basePath = storagePath + "/blob";
 
 		FileSystem.get(new Path(basePath).toUri()).mkdirs(new Path(basePath));
 		LOG.info("Created blob directory {}.", basePath);

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index c2f67f1..84cbe19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -25,8 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
 /**
  * {@link CheckpointIDCounter} instances for JobManagers running in {@link HighAvailabilityMode#NONE}.
  *
- * <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job managers are not
- * recoverable in this recovery mode.
+ * <p>Simple wrapper around an {@link AtomicLong}.
  */
 public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
index 8e2efa8..087ad3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -20,18 +20,18 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ConfigurationUtil;
 
 /**
- * Recovery mode for Flink's cluster execution. Currently supported modes are:
+ * High availability mode for Flink's cluster execution. Currently supported modes are:
  *
- * - Standalone: No recovery from JobManager failures
+ * - NONE: No high availability.
  * - ZooKeeper: JobManager high availability via ZooKeeper
  * ZooKeeper is used to select a leader among a group of JobManager. This JobManager
  * is responsible for the job execution. Upon failure of the leader a new leader is elected
  * which will take over the responsibilities of the old leader
  */
 public enum HighAvailabilityMode {
-	// STANDALONE mode renamed to NONE
 	NONE,
 	ZOOKEEPER;
 
@@ -39,30 +39,24 @@ public enum HighAvailabilityMode {
 	 * Return the configured {@link HighAvailabilityMode}.
 	 *
 	 * @param config The config to parse
-	 * @return Configured recovery mode or {@link ConfigConstants#DEFAULT_HIGH_AVAILABILTY} if not
+	 * @return Configured recovery mode or {@link ConfigConstants#DEFAULT_HA_MODE} if not
 	 * configured.
 	 */
 	public static HighAvailabilityMode fromConfig(Configuration config) {
-		// Not passing the default value here so that we could determine
-		// if there is an older config set
-		String recoveryMode = config.getString(
-			ConfigConstants.HIGH_AVAILABILITY, "");
-		if (recoveryMode.isEmpty()) {
-			// New config is not set.
-			// check the older one
-			// check for older 'recover.mode' config
-			recoveryMode = config.getString(
-				ConfigConstants.RECOVERY_MODE,
-				ConfigConstants.DEFAULT_RECOVERY_MODE);
-			if (recoveryMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
-				// There is no HA configured.
-				return HighAvailabilityMode.valueOf(ConfigConstants.DEFAULT_HIGH_AVAILABILTY.toUpperCase());
-			}
-		} else if (recoveryMode.equalsIgnoreCase(ConfigConstants.DEFAULT_HIGH_AVAILABILTY)) {
-			// The new config is found but with default value. So use this
-			return HighAvailabilityMode.valueOf(ConfigConstants.DEFAULT_HIGH_AVAILABILTY.toUpperCase());
+		String haMode = ConfigurationUtil.getStringWithDeprecatedKeys(
+				config,
+				ConfigConstants.HA_MODE,
+				null,
+				ConfigConstants.RECOVERY_MODE);
+
+		if (haMode == null) {
+			return HighAvailabilityMode.NONE;
+		} else if (haMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
+			// Map old default to new default
+			return HighAvailabilityMode.NONE;
+		} else {
+			return HighAvailabilityMode.valueOf(haMode.toUpperCase());
 		}
-		return HighAvailabilityMode.valueOf(recoveryMode.toUpperCase());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 7a656cf..b6d9306 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -282,7 +282,7 @@ public class LeaderRetrievalUtils {
 	}
 
 	/**
-	 * Gets the recovery mode as configured, based on the {@link ConfigConstants#HIGH_AVAILABILITY}
+	 * Gets the recovery mode as configured, based on the {@link ConfigConstants#HA_MODE}
 	 * config key.
 	 * 
 	 * @param config The configuration to read the recovery mode from.

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 5fd6f8c..bb48c81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
+import org.apache.flink.util.ConfigurationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,48 +57,57 @@ public class ZooKeeperUtils {
 	 * @return {@link CuratorFramework} instance
 	 */
 	public static CuratorFramework startCuratorFramework(Configuration configuration) {
-		String zkQuorum = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "");
-		if (zkQuorum.isEmpty()) {
-			zkQuorum = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
-		}
+		String zkQuorum = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+				null,
+				ConfigConstants.ZOOKEEPER_QUORUM_KEY);
+
 		if (zkQuorum == null || zkQuorum.equals("")) {
 			throw new RuntimeException("No valid ZooKeeper quorum has been specified. " +
 					"You can specify the quorum via the configuration key '" +
 					ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY + "'.");
 		}
 
-		int sessionTimeout = getConfiguredIntValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT,
-			ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT,
-			ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
-
-		int connectionTimeout = getConfiguredIntValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT,
-			ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
-			ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
-
-		int retryWait = getConfiguredIntValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_RETRY_WAIT,
-			ConfigConstants.ZOOKEEPER_RETRY_WAIT,
-			ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
-
-		int maxRetryAttempts = getConfiguredIntValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
-			ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS,
-			ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS);
-
-		String root = getConfiguredStringValue(configuration, ConfigConstants.HA_ZOOKEEPER_DIR_KEY,
-			ConfigConstants.ZOOKEEPER_DIR_KEY,
-			ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY);
-
-		String namespace = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
-			ConfigConstants.ZOOKEEPER_NAMESPACE_KEY,
-			ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY);
+		int sessionTimeout = ConfigurationUtil.getIntegerWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT,
+				ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT,
+				ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT);
+
+		int connectionTimeout = ConfigurationUtil.getIntegerWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT,
+				ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT,
+				ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT);
+
+		int retryWait = ConfigurationUtil.getIntegerWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_RETRY_WAIT,
+				ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT,
+				ConfigConstants.ZOOKEEPER_RETRY_WAIT);
+
+		int maxRetryAttempts = ConfigurationUtil.getIntegerWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+				ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+				ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS);
+
+		String root = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_DIR_KEY,
+				ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY,
+				ConfigConstants.ZOOKEEPER_DIR_KEY);
+
+		String namespace = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
+				ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY,
+				ConfigConstants.ZOOKEEPER_NAMESPACE_KEY);
 
 		String rootWithNamespace = generateZookeeperPath(root, namespace);
 
-		LOG.info("Using '{}' as zookeeper namespace.", rootWithNamespace);
+		LOG.info("Using '{}' as Zookeeper namespace.", rootWithNamespace);
 
 		CuratorFramework cf = CuratorFrameworkFactory.builder()
 				.connectString(zkQuorum)
@@ -114,31 +124,6 @@ public class ZooKeeperUtils {
 		return cf;
 	}
 
-	private static int getConfiguredIntValue(Configuration configuration, String newConfigName, String oldConfigName, int defaultValue) {
-		int val = configuration.getInteger(newConfigName, -1);
-		if (val == -1) {
-			val = configuration.getInteger(
-				oldConfigName, -1);
-		}
-		// if still the val is not set use the default value
-		if (val == -1) {
-			return defaultValue;
-		}
-		return val;
-	}
-
-	private static String getConfiguredStringValue(Configuration configuration, String newConfigName, String oldConfigName, String defaultValue) {
-		String val = configuration.getString(newConfigName, "");
-		if (val.isEmpty()) {
-			val = configuration.getString(
-				oldConfigName, "");
-		}
-		// still no value found - use the default value
-		if (val.isEmpty()) {
-			return defaultValue;
-		}
-		return val;
-	}
 	/**
 	 * Returns whether {@link HighAvailabilityMode#ZOOKEEPER} is configured.
 	 */
@@ -153,10 +138,11 @@ public class ZooKeeperUtils {
 	public static String getZooKeeperEnsemble(Configuration flinkConf)
 			throws IllegalConfigurationException {
 
-		String zkQuorum = flinkConf.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "");
-		if (zkQuorum.isEmpty()) {
-			zkQuorum = flinkConf.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
-		}
+		String zkQuorum = ConfigurationUtil.getStringWithDeprecatedKeys(
+				flinkConf,
+				ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+				"",
+				ConfigConstants.ZOOKEEPER_QUORUM_KEY);
 
 		if (zkQuorum == null || zkQuorum.equals("")) {
 			throw new IllegalConfigurationException("No ZooKeeper quorum specified in config.");
@@ -178,9 +164,11 @@ public class ZooKeeperUtils {
 	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
 			Configuration configuration) throws Exception {
 		CuratorFramework client = startCuratorFramework(configuration);
-		String leaderPath = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, ConfigConstants.ZOOKEEPER_LEADER_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+		String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
+				ConfigConstants.ZOOKEEPER_LEADER_PATH);
 
 		return new ZooKeeperLeaderRetrievalService(client, leaderPath);
 	}
@@ -213,12 +201,16 @@ public class ZooKeeperUtils {
 			CuratorFramework client,
 			Configuration configuration) throws Exception {
 
-		String latchPath = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_LATCH_PATH, ConfigConstants.ZOOKEEPER_LATCH_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
-		String leaderPath = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, ConfigConstants.ZOOKEEPER_LEADER_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+		String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_LATCH_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH,
+				ConfigConstants.ZOOKEEPER_LATCH_PATH);
+		String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
+				ConfigConstants.ZOOKEEPER_LEADER_PATH);
 
 		return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
 	}
@@ -239,9 +231,11 @@ public class ZooKeeperUtils {
 		StateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph");
 
 		// ZooKeeper submitted jobs root dir
-		String zooKeeperSubmittedJobsPath = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH, ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+		String zooKeeperSubmittedJobsPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH,
+				ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
 				client, zooKeeperSubmittedJobsPath, stateStorage);
@@ -266,10 +260,11 @@ public class ZooKeeperUtils {
 
 		checkNotNull(configuration, "Configuration");
 
-		String checkpointsPath = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_CHECKPOINTS_PATH,
-			ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
+		String checkpointsPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_CHECKPOINTS_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH,
+				ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH);
 
 		StateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage(
 			configuration,
@@ -298,10 +293,11 @@ public class ZooKeeperUtils {
 			Configuration configuration,
 			JobID jobId) throws Exception {
 
-		String checkpointIdCounterPath = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
-			ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+		String checkpointIdCounterPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+				ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
 
 		checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
 
@@ -321,16 +317,15 @@ public class ZooKeeperUtils {
 			Configuration configuration,
 			String prefix) throws IOException {
 
-		String rootPath = configuration.getString(
-			ConfigConstants.ZOOKEEPER_HA_PATH, "");
-		if (rootPath.isEmpty()) {
-			rootPath = configuration.getString(
-				ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
-		}
+		String rootPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH,
+				"",
+				ConfigConstants.ZOOKEEPER_RECOVERY_PATH);
 
 		if (rootPath.equals("")) {
 			throw new IllegalConfigurationException("Missing recovery path. Specify via " +
-				"configuration key '" + ConfigConstants.ZOOKEEPER_HA_PATH + "'.");
+				"configuration key '" + ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "'.");
 		} else {
 			return new FileSystemStateStorageHelper<T>(rootPath, prefix);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5962afc..d172a2b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -82,7 +82,7 @@ import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.{InstantiationUtil, NetUtils}
+import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
 
 import org.jboss.netty.channel.ChannelException
 
@@ -155,7 +155,7 @@ class JobManager(
   /** Either running or not yet archived jobs (session hasn't been ended). */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
 
-  protected val recoveryMode = HighAvailabilityMode.fromConfig(flinkConfiguration)
+  protected val haMode = HighAvailabilityMode.fromConfig(flinkConfiguration)
 
   var leaderSessionID: Option[UUID] = None
 
@@ -317,7 +317,7 @@ class JobManager(
 
         // TODO (critical next step) This needs to be more flexible and robust (e.g. wait for task
         // managers etc.)
-        if (recoveryMode != HighAvailabilityMode.NONE) {
+        if (haMode != HighAvailabilityMode.NONE) {
           log.info(s"Delaying recovery of all jobs by $jobRecoveryTimeout.")
 
           context.system.scheduler.scheduleOnce(
@@ -2462,7 +2462,7 @@ object JobManager {
         // The port range of allowed job manager ports or 0 for random
         configuration.getString(
           ConfigConstants.RECOVERY_JOB_MANAGER_PORT,
-          ConfigConstants.DEFAULT_RECOVERY_JOB_MANAGER_PORT)
+          ConfigConstants.DEFAULT_HA_JOB_MANAGER_PORT)
       }
       else {
         LOG.info("Starting JobManager without high-availability")
@@ -2594,10 +2594,11 @@ object JobManager {
 
     val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
 
-    var jobRecoveryTimeoutStr = configuration.getString(ConfigConstants.HA_JOB_DELAY, "");
-    if (jobRecoveryTimeoutStr.isEmpty) {
-      jobRecoveryTimeoutStr = configuration.getString(ConfigConstants.RECOVERY_JOB_DELAY, "");
-    }
+    val jobRecoveryTimeoutStr = ConfigurationUtil.getStringWithDeprecatedKeys(
+      configuration,
+      ConfigConstants.HA_JOB_DELAY,
+      null,
+      ConfigConstants.RECOVERY_JOB_DELAY)
 
     val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty) {
       timeout

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 0778aae..a547d25 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -84,7 +84,7 @@ abstract class FlinkMiniCluster(
 
   implicit val timeout = AkkaUtils.getTimeout(configuration)
 
-  val recoveryMode = HighAvailabilityMode.fromConfig(configuration)
+  val haMode = HighAvailabilityMode.fromConfig(configuration)
 
   val numJobManagers = getNumberOfJobManagers
 
@@ -122,7 +122,7 @@ abstract class FlinkMiniCluster(
   // --------------------------------------------------------------------------
 
   def getNumberOfJobManagers: Int = {
-    if(recoveryMode == HighAvailabilityMode.NONE) {
+    if(haMode == HighAvailabilityMode.NONE) {
       1
     } else {
       configuration.getInteger(
@@ -133,7 +133,7 @@ abstract class FlinkMiniCluster(
   }
 
   def getNumberOfResourceManagers: Int = {
-    if(recoveryMode == HighAvailabilityMode.NONE) {
+    if(haMode == HighAvailabilityMode.NONE) {
       1
     } else {
       configuration.getInteger(
@@ -372,7 +372,7 @@ abstract class FlinkMiniCluster(
     webMonitor foreach {
       _.stop()
     }
-    
+
     val tmFutures = taskManagerActors map {
       _.map(gracefulStop(_, timeout))
     } getOrElse(Seq())
@@ -417,7 +417,7 @@ abstract class FlinkMiniCluster(
       _ foreach(_.awaitTermination())
     }
   }
-  
+
   def running = isRunning
 
   // --------------------------------------------------------------------------
@@ -466,7 +466,7 @@ abstract class FlinkMiniCluster(
   : JobExecutionResult = {
     submitJobAndWait(jobGraph, printUpdates, timeout, createLeaderRetrievalService())
   }
-  
+
   @throws(classOf[JobExecutionException])
   def submitJobAndWait(
       jobGraph: JobGraph,
@@ -524,7 +524,7 @@ abstract class FlinkMiniCluster(
   protected def createLeaderRetrievalService(): LeaderRetrievalService = {
     (jobManagerActorSystems, jobManagerActors) match {
       case (Some(jmActorSystems), Some(jmActors)) =>
-        if (recoveryMode == HighAvailabilityMode.NONE) {
+        if (haMode == HighAvailabilityMode.NONE) {
           new StandaloneLeaderRetrievalService(
             AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)))
         } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index bd4723f..8464d68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -68,9 +68,9 @@ public class BlobRecoveryITCase {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+			config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
 			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, recoveryDir.getPath());
+			config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, recoveryDir.getPath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
index cc1994a..e947744 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.client;
 
 import akka.actor.PoisonPill;
 import org.apache.curator.test.TestingServer;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -82,7 +81,7 @@ public class JobClientActorRecoveryITCase extends TestLogger {
 	public void testJobClientRecovery() throws Exception {
 		File rootFolder = tempFolder.getRoot();
 
-		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
 			zkServer.getConnectString(),
 			rootFolder.getPath());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index a3fe0d4..f6bed56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -63,9 +63,9 @@ public class BlobLibraryCacheRecoveryITCase {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+			config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
 			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, temporaryFolder.getRoot().getAbsolutePath());
+			config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
new file mode 100644
index 0000000..04c0e48
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class HighAvailabilityModeTest {
+
+	// Default HA mode
+	private final static HighAvailabilityMode DEFAULT_HA_MODE = HighAvailabilityMode.valueOf(
+			ConfigConstants.DEFAULT_HA_MODE.toUpperCase());
+
+	/**
+	 * Tests HA mode configuration.
+	 */
+	@Test
+	public void testFromConfig() throws Exception {
+		Configuration config = new Configuration();
+
+		// Check default
+		assertEquals(DEFAULT_HA_MODE, HighAvailabilityMode.fromConfig(config));
+
+		// Check not equals default
+		config.setString(ConfigConstants.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+		assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config));
+	}
+
+	/**
+	 * Tests HA mode configuration with deprecated config values.
+	 */
+	@Test
+	public void testDeprecatedFromConfig() throws Exception {
+		Configuration config = new Configuration();
+
+		// Check mapping of old default to new default
+		config.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE);
+		assertEquals(DEFAULT_HA_MODE, HighAvailabilityMode.fromConfig(config));
+
+		// Check deprecated config
+		config.setString(ConfigConstants.RECOVERY_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+		assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config));
+
+		// Check precedence over deprecated config
+		config.setString(ConfigConstants.HA_MODE, HighAvailabilityMode.NONE.name().toLowerCase());
+		config.setString(ConfigConstants.RECOVERY_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+
+		assertEquals(HighAvailabilityMode.NONE, HighAvailabilityMode.fromConfig(config));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index d980517..0e1c7c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -123,8 +123,8 @@ public class JobManagerHARecoveryTest {
 		ActorRef jobManager = null;
 		ActorRef taskManager = null;
 
-		flinkConfiguration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
-		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, temporaryFolder.newFolder().toString());
+		flinkConfiguration.setString(ConfigConstants.HA_MODE, "zookeeper");
+		flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, temporaryFolder.newFolder().toString());
 		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 5c696ce..ed4d530 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -101,7 +101,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 	@Test
 	public void testLeaderElection() throws Exception {
 		final Configuration configuration = ZooKeeperTestUtils
-			.createZooKeeperRecoveryModeConfig(
+			.createZooKeeperHAConfig(
 				testingServer.getConnectString(),
 				tempFolder.getRoot().getPath());
 
@@ -130,7 +130,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 	@Test
 	public void testLeaderReelection() throws Exception {
 		final Configuration configuration = ZooKeeperTestUtils
-			.createZooKeeperRecoveryModeConfig(
+			.createZooKeeperHAConfig(
 				testingServer.getConnectString(),
 				tempFolder.getRoot().getPath());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 048fbee..e20985b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -90,7 +90,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	public void testZooKeeperLeaderElectionRetrieval() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -135,7 +135,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	public void testZooKeeperReelection() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 
 		Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
@@ -218,7 +218,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	public void testZooKeeperReelectionWithReplacement() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 
 		int num = 3;
 		int numTries = 30;
@@ -296,7 +296,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
@@ -380,7 +380,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	public void testExceptionForwarding() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -449,7 +449,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	public void testEphemeralZooKeeperNodes() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 5aace34..0fe0644 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -23,7 +23,6 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -44,7 +43,6 @@ import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
@@ -84,7 +82,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
 		long sleepingTime = 1000;
 
-		config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		config.setString(ConfigConstants.HA_MODE, "zookeeper");
 		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
 
 		LeaderElectionService leaderElectionService = null;
@@ -181,7 +179,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 	@Test
 	public void testTimeoutOfFindConnectingAddress() throws Exception {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		config.setString(ConfigConstants.HA_MODE, "zookeeper");
 		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
 
 		FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
@@ -192,46 +190,6 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 		assertEquals(InetAddress.getLocalHost(), result);
 	}
 
-	@Test
-	public void testConnectionToZookeeperOverridingOldConfig() throws Exception {
-		Configuration config = new Configuration();
-		// The new config will be taken into effect
-		config.setString(ConfigConstants.RECOVERY_MODE, "standalone");
-		config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
-		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-
-		FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
-		LeaderRetrievalService leaderRetrievalService =
-			LeaderRetrievalUtils.createLeaderRetrievalService(config);
-		InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
-
-		assertEquals(InetAddress.getLocalHost(), result);
-	}
-
-	@Test
-	public void testConnectionToStandAloneLeaderOverridingOldConfig() throws Exception {
-		Configuration config = new Configuration();
-		// The new config will be taken into effect
-		config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		config.setString(ConfigConstants.HIGH_AVAILABILITY, "none");
-		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-
-		HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(config);
-		assertTrue(mode == HighAvailabilityMode.NONE);
-	}
-
-	@Test
-	public void testConnectionToZookeeperUsingOldConfig() throws Exception {
-		Configuration config = new Configuration();
-		// The new config will be taken into effect
-		config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-
-		HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(config);
-		assertTrue(mode == HighAvailabilityMode.ZOOKEEPER);
-	}
-
 	class FindConnectingAddress implements Runnable {
 
 		private final Configuration config;

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
index 66d523f..e8981a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -212,7 +212,7 @@ public class JobManagerProcess extends TestJvmProcess {
 		 * <code>--port PORT</code>.
 		 *
 		 * <p>Other arguments are parsed to a {@link Configuration} and passed to the
-		 * JobManager, for instance: <code>--high-availability ZOOKEEPER --recovery.zookeeper.quorum
+		 * JobManager, for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum
 		 * "xyz:123:456"</code>.
 		 */
 		public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
index 417dc88..58bc50e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -96,7 +96,7 @@ public class TaskManagerProcess extends TestJvmProcess {
 
 		/**
 		 * All arguments are parsed to a {@link Configuration} and passed to the Taskmanager,
-		 * for instance: <code>--high-availability ZOOKEEPER --recovery.zookeeper.quorum "xyz:123:456"</code>.
+		 * for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum "xyz:123:456"</code>.
 		 */
 		public static void main(String[] args) throws Exception {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index c94842f..7dd7067 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -38,10 +38,10 @@ public class ZooKeeperTestUtils {
 	 *                          recovery)
 	 * @return A new configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 	 */
-	public static Configuration createZooKeeperRecoveryModeConfig(
+	public static Configuration createZooKeeperHAConfig(
 			String zooKeeperQuorum, String fsStateHandlePath) {
 
-		return setZooKeeperRecoveryMode(new Configuration(), zooKeeperQuorum, fsStateHandlePath);
+		return configureZooKeeperHA(new Configuration(), zooKeeperQuorum, fsStateHandlePath);
 	}
 
 	/**
@@ -53,7 +53,7 @@ public class ZooKeeperTestUtils {
 	 *                          recovery)
 	 * @return The modified configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 	 */
-	public static Configuration setZooKeeperRecoveryMode(
+	public static Configuration configureZooKeeperHA(
 			Configuration config,
 			String zooKeeperQuorum,
 			String fsStateHandlePath) {
@@ -66,7 +66,7 @@ public class ZooKeeperTestUtils {
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
 
 		// ZooKeeper recovery mode
-		config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+		config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
 		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum);
 
 		int connTimeout = 5000;
@@ -81,7 +81,7 @@ public class ZooKeeperTestUtils {
 		// File system state backend
 		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
-		config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, fsStateHandlePath + "/recovery");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, fsStateHandlePath + "/recovery");
 
 		// Akka failure detection and execution retries
 		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 7d2b86c..5628f3c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -423,8 +423,8 @@ object TestingUtils {
       prefix: String)
     : ActorGateway = {
 
-    configuration.setString(ConfigConstants.HIGH_AVAILABILITY,
-      ConfigConstants.DEFAULT_HIGH_AVAILABILTY)
+    configuration.setString(ConfigConstants.HA_MODE,
+      ConfigConstants.DEFAULT_HA_MODE)
 
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
@@ -503,8 +503,8 @@ object TestingUtils {
       configuration: Configuration)
   : ActorGateway = {
 
-    configuration.setString(ConfigConstants.HIGH_AVAILABILITY,
-      ConfigConstants.DEFAULT_HIGH_AVAILABILTY)
+    configuration.setString(ConfigConstants.HA_MODE,
+      ConfigConstants.DEFAULT_HA_MODE)
 
     val actor = FlinkResourceManager.startResourceManagerActors(
       configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 7e5acee..4014b80 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -120,7 +120,7 @@ public class TestBaseUtils extends TestLogger {
 
 		if (startZooKeeper) {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-			config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+			config.setString(ConfigConstants.HA_MODE, "zookeeper");
 		}
 
 		return startCluster(config, singleActorSystem);


[2/4] flink git commit: [FLINK-4253] [config] Rename 'recovery.mode' key to 'high-availability'

Posted by uc...@apache.org.
[FLINK-4253] [config] Rename 'recovery.mode' key to 'high-availability'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01ffe34c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01ffe34c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01ffe34c

Branch: refs/heads/master
Commit: 01ffe34c682c5f22928fbb476896600d0177d84e
Parents: 7206455
Author: Ramkrishna <ra...@intel.com>
Authored: Tue Aug 9 14:48:12 2016 +0530
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Aug 24 12:09:24 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  26 ++--
 docs/setup/jobmanager_high_availability.md      |  39 +++---
 .../org/apache/flink/client/cli/DefaultCLI.java |   2 +-
 .../flink/configuration/ConfigConstants.java    |  83 ++++++++++++-
 .../apache/flink/util/ConfigurationUtil.java    | 101 ++++++++++++++++
 .../flink/util/ConfigurationUtilTest.java       | 115 ++++++++++++++++++
 flink-dist/src/main/flink-bin/bin/config.sh     |  22 +++-
 flink-dist/src/main/resources/flink-conf.yaml   |   2 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |   4 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  14 +--
 .../flink/runtime/blob/FileSystemBlobStore.java |   7 +-
 .../StandaloneCheckpointIDCounter.java          |   4 +-
 .../StandaloneCheckpointRecoveryFactory.java    |   4 +-
 .../StandaloneCompletedCheckpointStore.java     |   4 +-
 .../ZooKeeperCheckpointIDCounter.java           |   4 +-
 .../ZooKeeperCheckpointRecoveryFactory.java     |   4 +-
 .../ZooKeeperCompletedCheckpointStore.java      |   4 +-
 .../jobmanager/HighAvailabilityMode.java        |  86 +++++++++++++
 .../flink/runtime/jobmanager/RecoveryMode.java  |  72 -----------
 .../StandaloneSubmittedJobGraphStore.java       |   2 +-
 .../ZooKeeperSubmittedJobGraphStore.java        |   2 +-
 .../runtime/util/LeaderRetrievalUtils.java      |  36 ++----
 .../flink/runtime/util/ZooKeeperUtils.java      | 120 +++++++++++++------
 .../flink/runtime/jobmanager/JobManager.scala   |  19 +--
 .../runtime/minicluster/FlinkMiniCluster.scala  |  24 ++--
 .../flink/runtime/blob/BlobRecoveryITCase.java  |   8 +-
 .../BlobLibraryCacheRecoveryITCase.java         |   8 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   4 +-
 .../JobManagerLeaderElectionTest.java           |   4 +-
 .../ZooKeeperLeaderElectionTest.java            |  28 ++---
 .../ZooKeeperLeaderRetrievalTest.java           |  50 +++++++-
 .../runtime/testutils/JobManagerProcess.java    |   2 +-
 .../runtime/testutils/TaskManagerProcess.java   |   2 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |  22 ++--
 .../flink/runtime/util/ZooKeeperUtilTest.java   |   2 +-
 .../zookeeper/ZooKeeperTestEnvironment.java     |   6 +-
 .../runtime/testingUtils/TestingUtils.scala     |   6 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   2 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |  10 +-
 .../flink/test/recovery/ChaosMonkeyITCase.java  |   2 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |   4 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |   4 +-
 .../ZooKeeperLeaderElectionITCase.java          |   8 +-
 ...CliFrontendYarnAddressConfigurationTest.java |   4 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   8 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   2 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   6 +-
 48 files changed, 702 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 6bc9655..e6a335b 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -134,7 +134,7 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups.
 
-- `recovery.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments.
+- `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this config was named as `recovery.zookeeper.storageDir`.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user jar's) on the TaskManagers.
 
@@ -285,29 +285,29 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use
 
 ## High Availability Mode
 
-- `recovery.mode`: (Default 'standalone') Defines the recovery mode used for the cluster execution. Currently, Flink supports the 'standalone' mode where only a single JobManager runs and no JobManager state is checkpointed. The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value.
+- `high-availability`: (Default 'none') Defines the recovery mode used for the cluster execution. Currently, Flink supports the 'none' mode where only a single JobManager runs and no JobManager state is checkpointed. The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value.  Previously this config was named 'recovery.mode' and the default config was 'standalone'.
 
-- `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected
+- `high-availability.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected.  Previously this config was name as `recovery.zookeeper.quorum`.
 
-- `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create namespace directories.
+- `high-availability.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create namespace directories.  Previously this config was name as `recovery.zookeeper.path.root`.
 
-- `recovery.zookeeper.path.namespace`: (Default '/default_ns' in standalone mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under the root dir where the ZooKeeper recovery mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper.
+- `high-availability.zookeeper.path.namespace`: (Default '/default_ns' in standalone mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under the root dir where the ZooKeeper recovery mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper.  Previously this config was named as `recovery.zookeeper.path.namespace`.
 
-- `recovery.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.
+- `high-availability.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.  Previously this config was named as `recovery.zookeeper.path.latch`.
 
-- `recovery.zookeeper.path.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID
+- `high-availability.zookeeper.path.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID  Previously this config was named as `recovery.zookeeper.path.leader`.
 
-- `recovery.zookeeper.storageDir`: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA.
+- `high-availability.zookeeper.storageDir`: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA.  Previously this config was named as `recovery.zookeeper.storageDir`.
 
-- `recovery.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.
+- `high-availability.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.  Previously this config was named as `recovery.zookeeper.client.session-timeout`
 
-- `recovery.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.
+- `high-availability.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.  Previously this config was named as `recovery.zookeeper.client.connection-timeout`.
 
-- `recovery.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.
+- `high-availability.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.  Previously this config was named as `recovery.zookeeper.client.retry-wait`.
 
-- `recovery.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.
+- `high-availability.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.  Previously this config was named as `recovery.zookeeper.client.max-retry-attempts`.
 
-- `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation.
+- `high-availability.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation.  Previously this config was named as `recovery.job.delay`.
 
 ## Environment
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md
index d4f329a..dd6782d 100644
--- a/docs/setup/jobmanager_high_availability.md
+++ b/docs/setup/jobmanager_high_availability.md
@@ -42,7 +42,7 @@ As an example, consider the following setup with three JobManager instances:
 
 ### Configuration
 
-To enable JobManager High Availability you have to set the **recovery mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports.
+To enable JobManager High Availability you have to set the **high-availability mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports.
 
 Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for  *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) installation.
 
@@ -58,38 +58,39 @@ jobManagerAddress1:webUIPort1
 jobManagerAddressX:webUIPortX
   </pre>
 
-By default, the job manager will pick a *random port* for inter process communication. You can change this via the **`recovery.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`).
+By default, the job manager will pick a *random port* for inter process communication. You can change this via the **`high-availability.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`).
 
 #### Config File (flink-conf.yaml)
 
 In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`:
 
-- **Recovery mode** (required): The *recovery mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.
+- **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.
 
-  <pre>recovery.mode: zookeeper</pre>
+  <pre>high-availability: zookeeper</pre>
+- **Previously this config was named 'recovery.mode' and the default config was 'standalone'.
 
 - **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.
 
-  <pre>recovery.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>
+  <pre>high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>
 
   Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port.
 
 - **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all cluster namespace nodes are placed.
 
-  <pre>recovery.zookeeper.path.root: /flink
+  <pre>high-availability.zookeeper.path.root: /flink
 
 - **ZooKeeper namespace** (recommended): The *namespace ZooKeeper node*, under which all required coordination data for a cluster is placed.
 
-  <pre>recovery.zookeeper.path.namespace: /default_ns # important: customize per cluster</pre>
+  <pre>high-availability.zookeeper.path.namespace: /default_ns # important: customize per cluster</pre>
 
-  **Important**: if you are running multiple Flink HA clusters, you have to manually configure separate namespaces for each cluster. By default, the Yarn cluster and the Yarn session automatically generate namespaces based on Yarn application id. A manual configuration overrides this behaviour in Yarn. Specifying a namespace with the -z CLI option, in turn, overrides manual configuration.
+  **Important**: if you are running multiple Flink HA clusters, you have to manually configure separate namespaces for each cluster. By default, the Yarn cluster and the Yarn session automatically generate namespaces based on Yarn application id. A manual configuration overrides this behaviour in Yarn. Specifying a namespace with the -z CLI option, in turn, overrides manual configuration. 
 
 - **State backend and storage directory** (required): JobManager meta data is persisted in the *state backend* and only a pointer to this state is stored in ZooKeeper. Currently, only the file system state backend is supported in HA mode.
 
     <pre>
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-recovery.zookeeper.storageDir: hdfs:///flink/recovery</pre>
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
 
     The `storageDir` stores all meta data needed to recover a JobManager failure.
 
@@ -100,13 +101,13 @@ After configuring the masters and the ZooKeeper quorum, you can use the provided
 1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
 
    <pre>
-recovery.mode: zookeeper
-recovery.zookeeper.quorum: localhost:2181
-recovery.zookeeper.path.root: /flink
-recovery.zookeeper.path.namespace: /cluster_one # important: customize per cluster
+high-availability: zookeeper
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-recovery.zookeeper.storageDir: hdfs:///flink/recovery</pre>
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
 
 2. **Configure masters** in `conf/masters`:
 
@@ -186,13 +187,13 @@ This means that the application can be restarted 10 times before YARN fails the
 1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
 
    <pre>
-recovery.mode: zookeeper
-recovery.zookeeper.quorum: localhost:2181
-recovery.zookeeper.path.root: /flink
-recovery.zookeeper.path.namespace: /cluster_one # important: customize per cluster
+high-availability: zookeeper
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-recovery.zookeeper.storageDir: hdfs:///flink/recovery
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery
 yarn.application-attempts: 10</pre>
 
 3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine):

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 5f83c3d..18fa323 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -64,7 +64,7 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
 
 		if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
 			String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
-			config.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+			config.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
 		}
 
 		StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 98a843d..5cc1161 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -626,52 +626,127 @@ public final class ConfigConstants {
 
 	// --------------------------- Recovery -----------------------------------
 
-	/** Defines recovery mode used for the cluster execution ("standalone", "zookeeper") */
+	/** Defines recovery mode used for the cluster execution ("standalone", "zookeeper")
+	 *  Use {@link #HIGH_AVAILABILITY} instead
+	 * */
+	@Deprecated
 	public static final String RECOVERY_MODE = "recovery.mode";
 
+	/** Defines recovery mode used for the cluster execution ("NONE", "ZOOKEEPER") */
+	public static final String HIGH_AVAILABILITY = "high-availability";
+
 	/** Ports used by the job manager if not in standalone recovery mode */
+	@Deprecated
 	public static final String RECOVERY_JOB_MANAGER_PORT = "recovery.jobmanager.port";
 
+	/** Ports used by the job manager if not in 'none' recovery mode */
+	public static final String HA_JOB_MANAGER_PORT =
+		"high-availability.jobmanager.port";
+
 	/** The time before the JobManager recovers persisted jobs */
+	@Deprecated
 	public static final String RECOVERY_JOB_DELAY = "recovery.job.delay";
 
+	/** The time before the JobManager recovers persisted jobs */
+	public static final String HA_JOB_DELAY = "high-availability.job.delay";
+
 	// --------------------------- ZooKeeper ----------------------------------
 
 	/** ZooKeeper servers. */
+	@Deprecated
 	public static final String ZOOKEEPER_QUORUM_KEY = "recovery.zookeeper.quorum";
 
+	/** ZooKeeper servers. */
+	public static final String HA_ZOOKEEPER_QUORUM_KEY =
+		"high-availability.zookeeper.quorum";
+
 	/**
 	 * File system state backend base path for recoverable state handles. Recovery state is written
 	 * to this path and the file state handles are persisted for recovery.
 	 */
+	@Deprecated
 	public static final String ZOOKEEPER_RECOVERY_PATH = "recovery.zookeeper.storageDir";
 
+	/**
+	 * File system state backend base path for recoverable state handles. Recovery state is written
+	 * to this path and the file state handles are persisted for recovery.
+	 */
+	public static final String ZOOKEEPER_HA_PATH =
+		"high-availability.zookeeper.storageDir";
+
 	/** ZooKeeper root path. */
+	@Deprecated
 	public static final String ZOOKEEPER_DIR_KEY = "recovery.zookeeper.path.root";
 
+	/** ZooKeeper root path. */
+	public static final String HA_ZOOKEEPER_DIR_KEY =
+		"high-availability.zookeeper.path.root";
+
+	@Deprecated
 	public static final String ZOOKEEPER_NAMESPACE_KEY = "recovery.zookeeper.path.namespace";
 
+	public static final String HA_ZOOKEEPER_NAMESPACE_KEY =
+		"high-availability.zookeeper.path.namespace";
+
+	@Deprecated
 	public static final String ZOOKEEPER_LATCH_PATH = "recovery.zookeeper.path.latch";
 
+	public static final String HA_ZOOKEEPER_LATCH_PATH =
+		"high-availability.zookeeper.path.latch";
+
+	@Deprecated
 	public static final String ZOOKEEPER_LEADER_PATH = "recovery.zookeeper.path.leader";
 
+	public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";
+
 	/** ZooKeeper root path (ZNode) for job graphs. */
+	@Deprecated
 	public static final String ZOOKEEPER_JOBGRAPHS_PATH = "recovery.zookeeper.path.jobgraphs";
 
+	/** ZooKeeper root path (ZNode) for job graphs. */
+	public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH =
+		"high-availability.zookeeper.path.jobgraphs";
+
 	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	@Deprecated
 	public static final String ZOOKEEPER_CHECKPOINTS_PATH = "recovery.zookeeper.path.checkpoints";
 
+	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH =
+		"high-availability.zookeeper.path.checkpoints";
+
 	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	@Deprecated
 	public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "recovery.zookeeper.path.checkpoint-counter";
 
+	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
+		"high-availability.zookeeper.path.checkpoint-counter";
+
+	@Deprecated
 	public static final String ZOOKEEPER_SESSION_TIMEOUT = "recovery.zookeeper.client.session-timeout";
 
+	public static final String HA_ZOOKEEPER_SESSION_TIMEOUT =
+		"high-availability.zookeeper.client.session-timeout";
+
+	@Deprecated
 	public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "recovery.zookeeper.client.connection-timeout";
 
+	public static final String HA_ZOOKEEPER_CONNECTION_TIMEOUT =
+		"high-availability.zookeeper.client.connection-timeout";
+
+	@Deprecated
 	public static final String ZOOKEEPER_RETRY_WAIT = "recovery.zookeeper.client.retry-wait";
 
+	public static final String HA_ZOOKEEPER_RETRY_WAIT =
+		"high-availability.zookeeper.client.retry-wait";
+
+	@Deprecated
 	public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "recovery.zookeeper.client.max-retry-attempts";
 
+	public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS =
+		"high-availability.zookeeper.client.max-retry-attempts";
+
 	// ---------------------------- Metrics -----------------------------------
 
 	/**
@@ -1015,10 +1090,12 @@ public final class ConfigConstants {
 
 	public static final String LOCAL_START_WEBSERVER = "local.start-webserver";
 
-  	// --------------------------- Recovery ---------------------------------
-
+	// --------------------------- Recovery ---------------------------------
+	@Deprecated
 	public static String DEFAULT_RECOVERY_MODE = "standalone";
 
+	public static String DEFAULT_HIGH_AVAILABILTY = "none";
+
 	/**
 	 * Default port used by the job manager if not in standalone recovery mode. If <code>0</code>
 	 * the OS picks a random port port.

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-core/src/main/java/org/apache/flink/util/ConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ConfigurationUtil.java b/flink-core/src/main/java/org/apache/flink/util/ConfigurationUtil.java
new file mode 100644
index 0000000..44f098b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ConfigurationUtil.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility for accessing deprecated {@link Configuration} values.
+ */
+@Internal
+public class ConfigurationUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ConfigurationUtil.class);
+
+	/**
+	 * Returns the value associated with the given key as an Integer in the
+	 * given Configuration.
+	 *
+	 * <p>The regular key has precedence over any deprecated keys. The
+	 * precedence of deprecated keys depends on the argument order, first
+	 * deprecated keys have higher precedence than later ones.
+	 *
+	 * @param config Configuration to access
+	 * @param key Configuration key (highest precedence)
+	 * @param defaultValue Default value (if no key is set)
+	 * @param deprecatedKeys Optional deprecated keys in precedence order
+	 * @return Integer value associated with first found key or the default value
+	 */
+	public static int getIntegerWithDeprecatedKeys(
+			Configuration config,
+			String key,
+			int defaultValue,
+			String... deprecatedKeys) {
+
+		if (config.containsKey(key)) {
+			return config.getInteger(key, defaultValue);
+		} else {
+			// Check deprecated keys
+			for (String deprecatedKey : deprecatedKeys) {
+				if (config.containsKey(deprecatedKey)) {
+					LOG.warn("Configuration key '{}' has been deprecated. Please use '{}' instead.", deprecatedKey, key);
+					return config.getInteger(deprecatedKey, defaultValue);
+				}
+			}
+			return defaultValue;
+		}
+	}
+
+	/**
+	 * Returns the value associated with the given key as a String in the
+	 * given Configuration.
+	 *
+	 * <p>The regular key has precedence over any deprecated keys. The
+	 * precedence of deprecated keys depends on the argument order, first
+	 * deprecated keys have higher precedence than later ones.
+	 *
+	 * @param config Configuration to access
+	 * @param key Configuration key (highest precedence)
+	 * @param defaultValue Default value (if no key is set)
+	 * @param deprecatedKeys Optional deprecated keys in precedence order
+	 * @return String associated with first found key or the default value
+	 */
+	public static String getStringWithDeprecatedKeys(
+			Configuration config,
+			String key,
+			String defaultValue,
+			String... deprecatedKeys) {
+
+		if (config.containsKey(key)) {
+			return config.getString(key, defaultValue);
+		} else {
+			// Check deprecated keys
+			for (String deprecatedKey : deprecatedKeys) {
+				if (config.containsKey(deprecatedKey)) {
+					LOG.warn("Configuration key {} has been deprecated. Please use {} instead.", deprecatedKey, key);
+					return config.getString(deprecatedKey, defaultValue);
+				}
+			}
+			return defaultValue;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-core/src/test/java/org/apache/flink/util/ConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/ConfigurationUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/ConfigurationUtilTest.java
new file mode 100644
index 0000000..7ecbd3f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/ConfigurationUtilTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConfigurationUtilTest {
+
+	/**
+	 * Tests getInteger without any deprecated keys.
+	 */
+	@Test
+	public void testGetIntegerNoDeprecatedKeys() throws Exception {
+		Configuration config = new Configuration();
+		String key = "asdasd";
+		int value = 1223239;
+		int defaultValue = 272770;
+
+		assertEquals(defaultValue, ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue));
+
+		config.setInteger(key, value);
+		assertEquals(value, ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue));
+	}
+
+	/**
+	 * Tests getInteger with deprecated keys and checks precedence.
+	 */
+	@Test
+	public void testGetIntegerWithDeprecatedKeys() throws Exception {
+		Configuration config = new Configuration();
+		String key = "asdasd";
+		int value = 1223239;
+		int defaultValue = 272770;
+
+		String[] deprecatedKey = new String[] { "deprecated-0", "deprecated-1" };
+		int[] deprecatedValue = new int[] { 99192, 7727 };
+
+		assertEquals(defaultValue, ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue));
+
+		// Set 2nd deprecated key
+		config.setInteger(deprecatedKey[1], deprecatedValue[1]);
+		assertEquals(deprecatedValue[1], ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue, deprecatedKey[1]));
+
+		// Set 1st deprecated key (precedence)
+		config.setInteger(deprecatedKey[0], deprecatedValue[0]);
+		assertEquals(deprecatedValue[0], ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue, deprecatedKey[0]));
+
+		// Set current key
+		config.setInteger(key, value);
+		assertEquals(value, ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue));
+	}
+
+	/**
+	 * Tests getString without any deprecated keys.
+	 */
+	@Test
+	public void testGetStringNoDeprecatedKeys() throws Exception {
+		Configuration config = new Configuration();
+		String key = "asdasd";
+		String value = "1223239";
+		String defaultValue = "272770";
+
+		assertEquals(defaultValue, ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue));
+
+		config.setString(key, value);
+		assertEquals(value, ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue));
+	}
+
+	/**
+	 * Tests getString with deprecated keys and checks precedence.
+	 */
+	@Test
+	public void testGetStringWithDeprecatedKeys() throws Exception {
+		Configuration config = new Configuration();
+		String key = "asdasd";
+		String value = "1223239";
+		String defaultValue = "272770";
+
+		String[] deprecatedKey = new String[] { "deprecated-0", "deprecated-1" };
+		String[] deprecatedValue = new String[] { "99192", "7727" };
+
+		assertEquals(defaultValue, ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue));
+
+		// Set 2nd deprecated key
+		config.setString(deprecatedKey[1], deprecatedValue[1]);
+		assertEquals(deprecatedValue[1], ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue, deprecatedKey[1]));
+
+		// Set 1st deprecated key (precedence)
+		config.setString(deprecatedKey[0], deprecatedValue[0]);
+		assertEquals(deprecatedValue[0], ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue, deprecatedKey[0]));
+
+		// Set current key
+		config.setString(key, value);
+		assertEquals(value, ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 9ffa713..687a39c 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -104,7 +104,9 @@ KEY_ENV_JAVA_OPTS="env.java.opts"
 KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
 KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
 KEY_ENV_SSH_OPTS="env.ssh.opts"
+#deprecated
 KEY_RECOVERY_MODE="recovery.mode"
+KEY_HIGH_AVAILABILITY="high-availability"
 KEY_ZK_HEAP_MB="zookeeper.heap.mb"
 
 ########################################################################################################################
@@ -257,8 +259,26 @@ if [ -z "${ZK_HEAP}" ]; then
     ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
 fi
 
+# for backward compatability
+if [ -z "${OLD_RECOVERY_MODE}" ]; then
+    OLD_RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}")
+fi
+
 if [ -z "${RECOVERY_MODE}" ]; then
-    RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}")
+     # Read the new config
+     RECOVERY_MODE=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")
+     if [ -z "${RECOVERY_MODE}" ]; then
+        #no new config found. So old config should be used
+        if [ -z "${OLD_RECOVERY_MODE}" ]; then
+            # If old config is also not found, use the 'none' as the default config
+            RECOVERY_MODE="none"
+        elif [ ${OLD_RECOVERY_MODE} = "standalone" ]; then
+            # if oldconfig is 'standalone', rename to 'none'
+            RECOVERY_MODE="none"
+        else
+            RECOVERY_MODE=${OLD_RECOVERY_MODE}
+        fi
+     fi
 fi
 
 # Arguments for the JVM. Used for job and task manager JVMs.

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 8bd4747..a2586ce 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -138,7 +138,7 @@ jobmanager.web.port: 8081
 # setup. This must be a list of the form:
 # "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
 #
-# recovery.mode: zookeeper
+# high-availability: zookeeper
 #
 # recovery.zookeeper.quorum: localhost:2181,...
 #

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 677ff54..d9edafe 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -296,8 +296,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			final Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
 			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
-			config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
-			config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());
+			config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+			config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());
 
 			actorSystem = AkkaUtils.createDefaultActorSystem();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index b4b6812..d1b78a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,7 +77,7 @@ public class BlobServer extends Thread implements BlobService {
 
 	/**
 	 * Shutdown hook thread to ensure deletion of the storage directory (or <code>null</code> if
-	 * the configured recovery mode does not equal{@link RecoveryMode#STANDALONE})
+	 * the configured recovery mode does not equal{@link HighAvailabilityMode#NONE})
 	 */
 	private final Thread shutdownHook;
 
@@ -90,7 +90,7 @@ public class BlobServer extends Thread implements BlobService {
 	public BlobServer(Configuration config) throws IOException {
 		checkNotNull(config, "Configuration");
 
-		RecoveryMode recoveryMode = RecoveryMode.fromConfig(config);
+		HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
 
 		// configure and create the storage directory
 		String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
@@ -98,14 +98,14 @@ public class BlobServer extends Thread implements BlobService {
 		LOG.info("Created BLOB server storage directory {}", storageDir);
 
 		// No recovery.
-		if (recoveryMode == RecoveryMode.STANDALONE) {
+		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
 			this.blobStore = new VoidBlobStore();
 		}
 		// Recovery.
-		else if (recoveryMode == RecoveryMode.ZOOKEEPER) {
+		else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
 			this.blobStore = new FileSystemBlobStore(config);
 		} else {
-			throw new IllegalConfigurationException("Unexpected recovery mode '" + recoveryMode + ".");
+			throw new IllegalConfigurationException("Unexpected recovery mode '" + highAvailabilityMode + ".");
 		}
 
 		// configure the maximum number of concurrent connections
@@ -128,7 +128,7 @@ public class BlobServer extends Thread implements BlobService {
 			backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
 		}
 
-		if (recoveryMode == RecoveryMode.STANDALONE) {
+		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
 			// Add shutdown hook to delete storage directory
 			this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 5641d87..f535c35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -51,12 +51,15 @@ class FileSystemBlobStore implements BlobStore {
 	private final String basePath;
 
 	FileSystemBlobStore(Configuration config) throws IOException {
-		String recoveryPath = config.getString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, null);
+		String recoveryPath = config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, null);
+		if(recoveryPath == null) {
+			recoveryPath = config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, null);
+		}
 
 		if (recoveryPath == null) {
 			throw new IllegalConfigurationException(String.format("Missing configuration for " +
 					"file system state backend recovery path. Please specify via " +
-					"'%s' key.", ConfigConstants.ZOOKEEPER_RECOVERY_PATH));
+					"'%s' key.", ConfigConstants.ZOOKEEPER_HA_PATH));
 		}
 
 		this.basePath = recoveryPath + "/blob";

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index 0a235bc..c2f67f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link HighAvailabilityMode#NONE}.
  *
  * <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job managers are not
  * recoverable in this recovery mode.

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index 05f9e77..aecb51e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 /**
- * {@link CheckpointCoordinator} components in {@link RecoveryMode#STANDALONE}.
+ * {@link CheckpointCoordinator} components in {@link HighAvailabilityMode#NONE}.
  */
 public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index bc111cd..a35ca77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -28,7 +28,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#STANDALONE}.
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}.
  */
 public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index 12839c1..0bceb8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -23,14 +23,14 @@ import org.apache.curator.framework.recipes.shared.SharedCount;
 import org.apache.curator.framework.recipes.shared.VersionedValue;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link HighAvailabilityMode#ZOOKEEPER}.
  *
  * <p>Each counter creates a ZNode:
  * <pre>

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index dcd6260..55a0bbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -21,13 +21,13 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@link CheckpointCoordinator} components in {@link RecoveryMode#ZOOKEEPER}.
+ * {@link CheckpointCoordinator} components in {@link HighAvailabilityMode#ZOOKEEPER}.
  */
 public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 541629d..376ef70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -24,7 +24,7 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
@@ -40,7 +40,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#ZOOKEEPER}.
  *
  * <p>Checkpoints are added under a ZNode per job:
  * <pre>

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
new file mode 100644
index 0000000..8e2efa8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Recovery mode for Flink's cluster execution. Currently supported modes are:
+ *
+ * - Standalone: No recovery from JobManager failures
+ * - ZooKeeper: JobManager high availability via ZooKeeper
+ * ZooKeeper is used to select a leader among a group of JobManager. This JobManager
+ * is responsible for the job execution. Upon failure of the leader a new leader is elected
+ * which will take over the responsibilities of the old leader
+ */
+public enum HighAvailabilityMode {
+	// STANDALONE mode renamed to NONE
+	NONE,
+	ZOOKEEPER;
+
+	/**
+	 * Return the configured {@link HighAvailabilityMode}.
+	 *
+	 * @param config The config to parse
+	 * @return Configured recovery mode or {@link ConfigConstants#DEFAULT_HIGH_AVAILABILTY} if not
+	 * configured.
+	 */
+	public static HighAvailabilityMode fromConfig(Configuration config) {
+		// Not passing the default value here so that we could determine
+		// if there is an older config set
+		String recoveryMode = config.getString(
+			ConfigConstants.HIGH_AVAILABILITY, "");
+		if (recoveryMode.isEmpty()) {
+			// New config is not set.
+			// check the older one
+			// check for older 'recover.mode' config
+			recoveryMode = config.getString(
+				ConfigConstants.RECOVERY_MODE,
+				ConfigConstants.DEFAULT_RECOVERY_MODE);
+			if (recoveryMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
+				// There is no HA configured.
+				return HighAvailabilityMode.valueOf(ConfigConstants.DEFAULT_HIGH_AVAILABILTY.toUpperCase());
+			}
+		} else if (recoveryMode.equalsIgnoreCase(ConfigConstants.DEFAULT_HIGH_AVAILABILTY)) {
+			// The new config is found but with default value. So use this
+			return HighAvailabilityMode.valueOf(ConfigConstants.DEFAULT_HIGH_AVAILABILTY.toUpperCase());
+		}
+		return HighAvailabilityMode.valueOf(recoveryMode.toUpperCase());
+	}
+
+	/**
+	 * Returns true if the defined recovery mode supports high availability.
+	 *
+	 * @param configuration Configuration which contains the recovery mode
+	 * @return true if high availability is supported by the recovery mode, otherwise false
+	 */
+	public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
+		HighAvailabilityMode mode = fromConfig(configuration);
+		switch (mode) {
+			case NONE:
+				return false;
+			case ZOOKEEPER:
+				return true;
+			default:
+				return false;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
deleted file mode 100644
index 077e34d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Recovery mode for Flink's cluster execution. Currently supported modes are:
- *
- * - Standalone: No recovery from JobManager failures
- * - ZooKeeper: JobManager high availability via ZooKeeper
- * ZooKeeper is used to select a leader among a group of JobManager. This JobManager
- * is responsible for the job execution. Upon failure of the leader a new leader is elected
- * which will take over the responsibilities of the old leader
- */
-public enum RecoveryMode {
-	STANDALONE,
-	ZOOKEEPER;
-
-	/**
-	 * Return the configured {@link RecoveryMode}.
-	 *
-	 * @param config The config to parse
-	 * @return Configured recovery mode or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not
-	 * configured.
-	 */
-	public static RecoveryMode fromConfig(Configuration config) {
-		return RecoveryMode.valueOf(config.getString(
-				ConfigConstants.RECOVERY_MODE,
-				ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase());
-	}
-
-	/**
-	 * Returns true if the defined recovery mode supports high availability.
-	 *
-	 * @param configuration Configuration which contains the recovery mode
-	 * @return true if high availability is supported by the recovery mode, otherwise false
-	 */
-	public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
-		String recoveryMode = configuration.getString(
-			ConfigConstants.RECOVERY_MODE,
-			ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();
-
-		RecoveryMode mode = RecoveryMode.valueOf(recoveryMode);
-
-		switch(mode) {
-			case STANDALONE:
-				return false;
-			case ZOOKEEPER:
-				return true;
-			default:
-				return false;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index db36f92..3041cde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -26,7 +26,7 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link HighAvailabilityMode#NONE}.
  *
  * <p>All operations are NoOps, because {@link JobGraph} instances cannot be recovered in this
  * recovery mode.

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 128db83..7f7c5fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -44,7 +44,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link HighAvailabilityMode#ZOOKEEPER}.
  *
  * <p>Each job graph creates ZNode:
  * <pre>

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 8f88daa..7a656cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -28,7 +28,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -61,15 +61,15 @@ public class LeaderRetrievalUtils {
 	public static LeaderRetrievalService createLeaderRetrievalService(Configuration configuration)
 		throws Exception {
 
-		RecoveryMode recoveryMode = getRecoveryMode(configuration);
+		HighAvailabilityMode highAvailabilityMode = getRecoveryMode(configuration);
 
-		switch (recoveryMode) {
-			case STANDALONE:
+		switch (highAvailabilityMode) {
+			case NONE:
 				return StandaloneUtils.createLeaderRetrievalService(configuration);
 			case ZOOKEEPER:
 				return ZooKeeperUtils.createLeaderRetrievalService(configuration);
 			default:
-				throw new Exception("Recovery mode " + recoveryMode + " is not supported.");
+				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
 		}
 	}
 
@@ -86,16 +86,16 @@ public class LeaderRetrievalUtils {
 	public static LeaderRetrievalService createLeaderRetrievalService(
 				Configuration configuration, ActorRef standaloneRef) throws Exception {
 
-		RecoveryMode recoveryMode = getRecoveryMode(configuration);
+		HighAvailabilityMode highAvailabilityMode = getRecoveryMode(configuration);
 
-		switch (recoveryMode) {
-			case STANDALONE:
+		switch (highAvailabilityMode) {
+			case NONE:
 				String akkaUrl = standaloneRef.path().toSerializationFormat();
 				return new StandaloneLeaderRetrievalService(akkaUrl);
 			case ZOOKEEPER:
 				return ZooKeeperUtils.createLeaderRetrievalService(configuration);
 			default:
-				throw new Exception("Recovery mode " + recoveryMode + " is not supported.");
+				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
 		}
 	}
 	
@@ -282,7 +282,7 @@ public class LeaderRetrievalUtils {
 	}
 
 	/**
-	 * Gets the recovery mode as configured, based on the {@link ConfigConstants#RECOVERY_MODE}
+	 * Gets the recovery mode as configured, based on the {@link ConfigConstants#HIGH_AVAILABILITY}
 	 * config key.
 	 * 
 	 * @param config The configuration to read the recovery mode from.
@@ -291,20 +291,8 @@ public class LeaderRetrievalUtils {
 	 * @throws IllegalConfigurationException Thrown, if the recovery mode does not correspond
 	 *                                       to a known value.
 	 */
-	public static RecoveryMode getRecoveryMode(Configuration config) {
-		String mode = config.getString(
-			ConfigConstants.RECOVERY_MODE,
-			ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();
-		
-		switch (mode) {
-			case "STANDALONE":
-				return RecoveryMode.STANDALONE;
-			case "ZOOKEEPER":
-				return RecoveryMode.ZOOKEEPER;
-			default:
-				throw new IllegalConfigurationException(
-					"The value for '" + ConfigConstants.RECOVERY_MODE + "' is unknown: " + mode);
-		}
+	public static HighAvailabilityMode getRecoveryMode(Configuration config) {
+		return HighAvailabilityMode.fromConfig(config);
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 3986fed..5fd6f8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
@@ -56,35 +56,44 @@ public class ZooKeeperUtils {
 	 * @return {@link CuratorFramework} instance
 	 */
 	public static CuratorFramework startCuratorFramework(Configuration configuration) {
-		String zkQuorum = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
-
+		String zkQuorum = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "");
+		if (zkQuorum.isEmpty()) {
+			zkQuorum = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
+		}
 		if (zkQuorum == null || zkQuorum.equals("")) {
 			throw new RuntimeException("No valid ZooKeeper quorum has been specified. " +
 					"You can specify the quorum via the configuration key '" +
-					ConfigConstants.ZOOKEEPER_QUORUM_KEY + "'.");
+					ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY + "'.");
 		}
 
-		int sessionTimeout = configuration.getInteger(
-				ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT,
-				ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
+		int sessionTimeout = getConfiguredIntValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT,
+			ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT,
+			ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
 
-		int connectionTimeout = configuration.getInteger(
-				ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
-				ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT);
+		int connectionTimeout = getConfiguredIntValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT,
+			ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
+			ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
 
-		int retryWait = configuration.getInteger(
-				ConfigConstants.ZOOKEEPER_RETRY_WAIT,
-				ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
+		int retryWait = getConfiguredIntValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_RETRY_WAIT,
+			ConfigConstants.ZOOKEEPER_RETRY_WAIT,
+			ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
 
-		int maxRetryAttempts = configuration.getInteger(
-				ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS,
-				ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS);
+		int maxRetryAttempts = getConfiguredIntValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+			ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+			ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS);
 
-		String root = configuration.getString(ConfigConstants.ZOOKEEPER_DIR_KEY,
-				ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY);
+		String root = getConfiguredStringValue(configuration, ConfigConstants.HA_ZOOKEEPER_DIR_KEY,
+			ConfigConstants.ZOOKEEPER_DIR_KEY,
+			ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY);
 
-		String namespace = configuration.getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY,
-				ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY);
+		String namespace = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
+			ConfigConstants.ZOOKEEPER_NAMESPACE_KEY,
+			ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY);
 
 		String rootWithNamespace = generateZookeeperPath(root, namespace);
 
@@ -105,11 +114,36 @@ public class ZooKeeperUtils {
 		return cf;
 	}
 
+	private static int getConfiguredIntValue(Configuration configuration, String newConfigName, String oldConfigName, int defaultValue) {
+		int val = configuration.getInteger(newConfigName, -1);
+		if (val == -1) {
+			val = configuration.getInteger(
+				oldConfigName, -1);
+		}
+		// if still the val is not set use the default value
+		if (val == -1) {
+			return defaultValue;
+		}
+		return val;
+	}
+
+	private static String getConfiguredStringValue(Configuration configuration, String newConfigName, String oldConfigName, String defaultValue) {
+		String val = configuration.getString(newConfigName, "");
+		if (val.isEmpty()) {
+			val = configuration.getString(
+				oldConfigName, "");
+		}
+		// still no value found - use the default value
+		if (val.isEmpty()) {
+			return defaultValue;
+		}
+		return val;
+	}
 	/**
-	 * Returns whether {@link RecoveryMode#ZOOKEEPER} is configured.
+	 * Returns whether {@link HighAvailabilityMode#ZOOKEEPER} is configured.
 	 */
 	public static boolean isZooKeeperRecoveryMode(Configuration flinkConf) {
-		return RecoveryMode.fromConfig(flinkConf).equals(RecoveryMode.ZOOKEEPER);
+		return HighAvailabilityMode.fromConfig(flinkConf).equals(HighAvailabilityMode.ZOOKEEPER);
 	}
 
 	/**
@@ -119,7 +153,10 @@ public class ZooKeeperUtils {
 	public static String getZooKeeperEnsemble(Configuration flinkConf)
 			throws IllegalConfigurationException {
 
-		String zkQuorum = flinkConf.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
+		String zkQuorum = flinkConf.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "");
+		if (zkQuorum.isEmpty()) {
+			zkQuorum = flinkConf.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
+		}
 
 		if (zkQuorum == null || zkQuorum.equals("")) {
 			throw new IllegalConfigurationException("No ZooKeeper quorum specified in config.");
@@ -141,8 +178,9 @@ public class ZooKeeperUtils {
 	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
 			Configuration configuration) throws Exception {
 		CuratorFramework client = startCuratorFramework(configuration);
-		String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+		String leaderPath = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, ConfigConstants.ZOOKEEPER_LEADER_PATH,
+			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
 
 		return new ZooKeeperLeaderRetrievalService(client, leaderPath);
 	}
@@ -175,9 +213,11 @@ public class ZooKeeperUtils {
 			CuratorFramework client,
 			Configuration configuration) throws Exception {
 
-		String latchPath = configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
-		String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
+		String latchPath = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_LATCH_PATH, ConfigConstants.ZOOKEEPER_LATCH_PATH,
+			ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
+		String leaderPath = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, ConfigConstants.ZOOKEEPER_LEADER_PATH,
 			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
 
 		return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
@@ -199,9 +239,9 @@ public class ZooKeeperUtils {
 		StateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph");
 
 		// ZooKeeper submitted jobs root dir
-		String zooKeeperSubmittedJobsPath = configuration.getString(
-				ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+		String zooKeeperSubmittedJobsPath = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH, ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+			ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
 				client, zooKeeperSubmittedJobsPath, stateStorage);
@@ -226,7 +266,8 @@ public class ZooKeeperUtils {
 
 		checkNotNull(configuration, "Configuration");
 
-		String checkpointsPath = configuration.getString(
+		String checkpointsPath = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_CHECKPOINTS_PATH,
 			ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
 			ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
 
@@ -257,9 +298,10 @@ public class ZooKeeperUtils {
 			Configuration configuration,
 			JobID jobId) throws Exception {
 
-		String checkpointIdCounterPath = configuration.getString(
-				ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+		String checkpointIdCounterPath = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+			ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+			ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
 
 		checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
 
@@ -280,11 +322,15 @@ public class ZooKeeperUtils {
 			String prefix) throws IOException {
 
 		String rootPath = configuration.getString(
-			ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
+			ConfigConstants.ZOOKEEPER_HA_PATH, "");
+		if (rootPath.isEmpty()) {
+			rootPath = configuration.getString(
+				ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
+		}
 
 		if (rootPath.equals("")) {
 			throw new IllegalConfigurationException("Missing recovery path. Specify via " +
-				"configuration key '" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "'.");
+				"configuration key '" + ConfigConstants.ZOOKEEPER_HA_PATH + "'.");
 		} else {
 			return new FileSystemStateStorageHelper<T>(rootPath, prefix);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a82e89a..5962afc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -155,7 +155,7 @@ class JobManager(
   /** Either running or not yet archived jobs (session hasn't been ended). */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
 
-  protected val recoveryMode = RecoveryMode.fromConfig(flinkConfiguration)
+  protected val recoveryMode = HighAvailabilityMode.fromConfig(flinkConfiguration)
 
   var leaderSessionID: Option[UUID] = None
 
@@ -317,7 +317,7 @@ class JobManager(
 
         // TODO (critical next step) This needs to be more flexible and robust (e.g. wait for task
         // managers etc.)
-        if (recoveryMode != RecoveryMode.STANDALONE) {
+        if (recoveryMode != HighAvailabilityMode.NONE) {
           log.info(s"Delaying recovery of all jobs by $jobRecoveryTimeout.")
 
           context.system.scheduler.scheduleOnce(
@@ -2026,7 +2026,7 @@ object JobManager {
 
     if (!listeningPortRange.hasNext) {
       if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
-        val message = "Config parameter '" + ConfigConstants.RECOVERY_JOB_MANAGER_PORT +
+        val message = "Config parameter '" + ConfigConstants.HA_JOB_MANAGER_PORT +
           "' does not specify a valid port range."
         LOG.error(message)
         System.exit(STARTUP_FAILURE_RETURN_CODE)
@@ -2568,8 +2568,8 @@ object JobManager {
 
     // Create recovery related components
     val (leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory) =
-      RecoveryMode.fromConfig(configuration) match {
-        case RecoveryMode.STANDALONE =>
+      HighAvailabilityMode.fromConfig(configuration) match {
+        case HighAvailabilityMode.NONE =>
           val leaderElectionService = leaderElectionServiceOption match {
             case Some(les) => les
             case None => new StandaloneLeaderElectionService()
@@ -2579,7 +2579,7 @@ object JobManager {
             new StandaloneSubmittedJobGraphStore(),
             new StandaloneCheckpointRecoveryFactory())
 
-        case RecoveryMode.ZOOKEEPER =>
+        case HighAvailabilityMode.ZOOKEEPER =>
           val client = ZooKeeperUtils.startCuratorFramework(configuration)
 
           val leaderElectionService = leaderElectionServiceOption match {
@@ -2594,7 +2594,10 @@ object JobManager {
 
     val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
 
-    val jobRecoveryTimeoutStr = configuration.getString(ConfigConstants.RECOVERY_JOB_DELAY, "");
+    var jobRecoveryTimeoutStr = configuration.getString(ConfigConstants.HA_JOB_DELAY, "");
+    if (jobRecoveryTimeoutStr.isEmpty) {
+      jobRecoveryTimeoutStr = configuration.getString(ConfigConstants.RECOVERY_JOB_DELAY, "");
+    }
 
     val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty) {
       timeout
@@ -2604,7 +2607,7 @@ object JobManager {
       } catch {
         case n: NumberFormatException =>
           throw new Exception(
-            s"Invalid config value for ${ConfigConstants.RECOVERY_JOB_DELAY}: " +
+            s"Invalid config value for ${ConfigConstants.HA_JOB_DELAY}: " +
               s"$jobRecoveryTimeoutStr. Value must be a valid duration (such as '10 s' or '1 min')")
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index f6e9360..0778aae 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -24,22 +24,18 @@ import java.util.UUID
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
-
 import com.typesafe.config.Config
-
-import org.apache.flink.api.common.{JobID, JobExecutionResult, JobSubmissionResult}
+import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.{JobExecutionException, JobClient}
-import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
+import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
+import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
-import org.apache.flink.runtime.jobmanager.RecoveryMode
-import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalService, LeaderRetrievalListener,
-StandaloneLeaderRetrievalService}
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.ZooKeeperUtils
-import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
-
+import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -88,7 +84,7 @@ abstract class FlinkMiniCluster(
 
   implicit val timeout = AkkaUtils.getTimeout(configuration)
 
-  val recoveryMode = RecoveryMode.fromConfig(configuration)
+  val recoveryMode = HighAvailabilityMode.fromConfig(configuration)
 
   val numJobManagers = getNumberOfJobManagers
 
@@ -126,7 +122,7 @@ abstract class FlinkMiniCluster(
   // --------------------------------------------------------------------------
 
   def getNumberOfJobManagers: Int = {
-    if(recoveryMode == RecoveryMode.STANDALONE) {
+    if(recoveryMode == HighAvailabilityMode.NONE) {
       1
     } else {
       configuration.getInteger(
@@ -137,7 +133,7 @@ abstract class FlinkMiniCluster(
   }
 
   def getNumberOfResourceManagers: Int = {
-    if(recoveryMode == RecoveryMode.STANDALONE) {
+    if(recoveryMode == HighAvailabilityMode.NONE) {
       1
     } else {
       configuration.getInteger(
@@ -528,7 +524,7 @@ abstract class FlinkMiniCluster(
   protected def createLeaderRetrievalService(): LeaderRetrievalService = {
     (jobManagerActorSystems, jobManagerActors) match {
       case (Some(jmActorSystems), Some(jmActors)) =>
-        if (recoveryMode == RecoveryMode.STANDALONE) {
+        if (recoveryMode == HighAvailabilityMode.NONE) {
           new StandaloneLeaderRetrievalService(
             AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)))
         } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 528a20c..bd4723f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -22,7 +22,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,7 +55,7 @@ public class BlobRecoveryITCase {
 	}
 
 	/**
-	 * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any
+	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
 	 * participating BlobServer.
 	 */
 	@Test
@@ -68,9 +68,9 @@ public class BlobRecoveryITCase {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
+			config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
 			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, recoveryDir.getPath());
+			config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, recoveryDir.getPath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index f1021e6..a3fe0d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -48,7 +48,7 @@ public class BlobLibraryCacheRecoveryITCase {
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 	/**
-	 * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any
+	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
 	 * participating BlobLibraryCacheManager.
 	 */
 	@Test
@@ -63,9 +63,9 @@ public class BlobLibraryCacheRecoveryITCase {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
+			config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
 			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, temporaryFolder.getRoot().getAbsolutePath());
+			config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, temporaryFolder.getRoot().getAbsolutePath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index f050e29..d980517 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -123,8 +123,8 @@ public class JobManagerHARecoveryTest {
 		ActorRef jobManager = null;
 		ActorRef taskManager = null;
 
-		flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, temporaryFolder.newFolder().toString());
+		flinkConfiguration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, temporaryFolder.newFolder().toString());
 		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index bdd019d..5c696ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.InstanceManager;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -171,7 +171,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 
 	private Props createJobManagerProps(Configuration configuration) throws Exception {
 		LeaderElectionService leaderElectionService;
-		if (RecoveryMode.fromConfig(configuration) == RecoveryMode.STANDALONE) {
+		if (HighAvailabilityMode.fromConfig(configuration) == HighAvailabilityMode.NONE) {
 			leaderElectionService = new StandaloneLeaderElectionService();
 		}
 		else {