You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/24 10:11:56 UTC

[2/4] flink git commit: [FLINK-4253] [config] Rename 'recovery.mode' key to 'high-availability'

[FLINK-4253] [config] Rename 'recovery.mode' key to 'high-availability'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01ffe34c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01ffe34c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01ffe34c

Branch: refs/heads/master
Commit: 01ffe34c682c5f22928fbb476896600d0177d84e
Parents: 7206455
Author: Ramkrishna <ra...@intel.com>
Authored: Tue Aug 9 14:48:12 2016 +0530
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Aug 24 12:09:24 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  26 ++--
 docs/setup/jobmanager_high_availability.md      |  39 +++---
 .../org/apache/flink/client/cli/DefaultCLI.java |   2 +-
 .../flink/configuration/ConfigConstants.java    |  83 ++++++++++++-
 .../apache/flink/util/ConfigurationUtil.java    | 101 ++++++++++++++++
 .../flink/util/ConfigurationUtilTest.java       | 115 ++++++++++++++++++
 flink-dist/src/main/flink-bin/bin/config.sh     |  22 +++-
 flink-dist/src/main/resources/flink-conf.yaml   |   2 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |   4 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  14 +--
 .../flink/runtime/blob/FileSystemBlobStore.java |   7 +-
 .../StandaloneCheckpointIDCounter.java          |   4 +-
 .../StandaloneCheckpointRecoveryFactory.java    |   4 +-
 .../StandaloneCompletedCheckpointStore.java     |   4 +-
 .../ZooKeeperCheckpointIDCounter.java           |   4 +-
 .../ZooKeeperCheckpointRecoveryFactory.java     |   4 +-
 .../ZooKeeperCompletedCheckpointStore.java      |   4 +-
 .../jobmanager/HighAvailabilityMode.java        |  86 +++++++++++++
 .../flink/runtime/jobmanager/RecoveryMode.java  |  72 -----------
 .../StandaloneSubmittedJobGraphStore.java       |   2 +-
 .../ZooKeeperSubmittedJobGraphStore.java        |   2 +-
 .../runtime/util/LeaderRetrievalUtils.java      |  36 ++----
 .../flink/runtime/util/ZooKeeperUtils.java      | 120 +++++++++++++------
 .../flink/runtime/jobmanager/JobManager.scala   |  19 +--
 .../runtime/minicluster/FlinkMiniCluster.scala  |  24 ++--
 .../flink/runtime/blob/BlobRecoveryITCase.java  |   8 +-
 .../BlobLibraryCacheRecoveryITCase.java         |   8 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   4 +-
 .../JobManagerLeaderElectionTest.java           |   4 +-
 .../ZooKeeperLeaderElectionTest.java            |  28 ++---
 .../ZooKeeperLeaderRetrievalTest.java           |  50 +++++++-
 .../runtime/testutils/JobManagerProcess.java    |   2 +-
 .../runtime/testutils/TaskManagerProcess.java   |   2 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |  22 ++--
 .../flink/runtime/util/ZooKeeperUtilTest.java   |   2 +-
 .../zookeeper/ZooKeeperTestEnvironment.java     |   6 +-
 .../runtime/testingUtils/TestingUtils.scala     |   6 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   2 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |  10 +-
 .../flink/test/recovery/ChaosMonkeyITCase.java  |   2 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |   4 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |   4 +-
 .../ZooKeeperLeaderElectionITCase.java          |   8 +-
 ...CliFrontendYarnAddressConfigurationTest.java |   4 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   8 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   2 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   6 +-
 48 files changed, 702 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 6bc9655..e6a335b 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -134,7 +134,7 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups.
 
-- `recovery.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments.
+- `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this config was named as `recovery.zookeeper.storageDir`.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user jar's) on the TaskManagers.
 
@@ -285,29 +285,29 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use
 
 ## High Availability Mode
 
-- `recovery.mode`: (Default 'standalone') Defines the recovery mode used for the cluster execution. Currently, Flink supports the 'standalone' mode where only a single JobManager runs and no JobManager state is checkpointed. The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value.
+- `high-availability`: (Default 'none') Defines the recovery mode used for the cluster execution. Currently, Flink supports the 'none' mode where only a single JobManager runs and no JobManager state is checkpointed. The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value.  Previously this config was named 'recovery.mode' and the default config was 'standalone'.
 
-- `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected
+- `high-availability.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected.  Previously this config was name as `recovery.zookeeper.quorum`.
 
-- `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create namespace directories.
+- `high-availability.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create namespace directories.  Previously this config was name as `recovery.zookeeper.path.root`.
 
-- `recovery.zookeeper.path.namespace`: (Default '/default_ns' in standalone mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under the root dir where the ZooKeeper recovery mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper.
+- `high-availability.zookeeper.path.namespace`: (Default '/default_ns' in standalone mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under the root dir where the ZooKeeper recovery mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper.  Previously this config was named as `recovery.zookeeper.path.namespace`.
 
-- `recovery.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.
+- `high-availability.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.  Previously this config was named as `recovery.zookeeper.path.latch`.
 
-- `recovery.zookeeper.path.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID
+- `high-availability.zookeeper.path.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID  Previously this config was named as `recovery.zookeeper.path.leader`.
 
-- `recovery.zookeeper.storageDir`: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA.
+- `high-availability.zookeeper.storageDir`: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA.  Previously this config was named as `recovery.zookeeper.storageDir`.
 
-- `recovery.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.
+- `high-availability.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.  Previously this config was named as `recovery.zookeeper.client.session-timeout`
 
-- `recovery.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.
+- `high-availability.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.  Previously this config was named as `recovery.zookeeper.client.connection-timeout`.
 
-- `recovery.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.
+- `high-availability.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.  Previously this config was named as `recovery.zookeeper.client.retry-wait`.
 
-- `recovery.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.
+- `high-availability.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.  Previously this config was named as `recovery.zookeeper.client.max-retry-attempts`.
 
-- `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation.
+- `high-availability.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation.  Previously this config was named as `recovery.job.delay`.
 
 ## Environment
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md
index d4f329a..dd6782d 100644
--- a/docs/setup/jobmanager_high_availability.md
+++ b/docs/setup/jobmanager_high_availability.md
@@ -42,7 +42,7 @@ As an example, consider the following setup with three JobManager instances:
 
 ### Configuration
 
-To enable JobManager High Availability you have to set the **recovery mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports.
+To enable JobManager High Availability you have to set the **high-availability mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports.
 
 Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for  *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) installation.
 
@@ -58,38 +58,39 @@ jobManagerAddress1:webUIPort1
 jobManagerAddressX:webUIPortX
   </pre>
 
-By default, the job manager will pick a *random port* for inter process communication. You can change this via the **`recovery.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`).
+By default, the job manager will pick a *random port* for inter process communication. You can change this via the **`high-availability.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`).
 
 #### Config File (flink-conf.yaml)
 
 In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`:
 
-- **Recovery mode** (required): The *recovery mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.
+- **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.
 
-  <pre>recovery.mode: zookeeper</pre>
+  <pre>high-availability: zookeeper</pre>
+- **Previously this config was named 'recovery.mode' and the default config was 'standalone'.
 
 - **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.
 
-  <pre>recovery.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>
+  <pre>high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>
 
   Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port.
 
 - **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all cluster namespace nodes are placed.
 
-  <pre>recovery.zookeeper.path.root: /flink
+  <pre>high-availability.zookeeper.path.root: /flink
 
 - **ZooKeeper namespace** (recommended): The *namespace ZooKeeper node*, under which all required coordination data for a cluster is placed.
 
-  <pre>recovery.zookeeper.path.namespace: /default_ns # important: customize per cluster</pre>
+  <pre>high-availability.zookeeper.path.namespace: /default_ns # important: customize per cluster</pre>
 
-  **Important**: if you are running multiple Flink HA clusters, you have to manually configure separate namespaces for each cluster. By default, the Yarn cluster and the Yarn session automatically generate namespaces based on Yarn application id. A manual configuration overrides this behaviour in Yarn. Specifying a namespace with the -z CLI option, in turn, overrides manual configuration.
+  **Important**: if you are running multiple Flink HA clusters, you have to manually configure separate namespaces for each cluster. By default, the Yarn cluster and the Yarn session automatically generate namespaces based on Yarn application id. A manual configuration overrides this behaviour in Yarn. Specifying a namespace with the -z CLI option, in turn, overrides manual configuration. 
 
 - **State backend and storage directory** (required): JobManager meta data is persisted in the *state backend* and only a pointer to this state is stored in ZooKeeper. Currently, only the file system state backend is supported in HA mode.
 
     <pre>
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-recovery.zookeeper.storageDir: hdfs:///flink/recovery</pre>
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
 
     The `storageDir` stores all meta data needed to recover a JobManager failure.
 
@@ -100,13 +101,13 @@ After configuring the masters and the ZooKeeper quorum, you can use the provided
 1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
 
    <pre>
-recovery.mode: zookeeper
-recovery.zookeeper.quorum: localhost:2181
-recovery.zookeeper.path.root: /flink
-recovery.zookeeper.path.namespace: /cluster_one # important: customize per cluster
+high-availability: zookeeper
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-recovery.zookeeper.storageDir: hdfs:///flink/recovery</pre>
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
 
 2. **Configure masters** in `conf/masters`:
 
@@ -186,13 +187,13 @@ This means that the application can be restarted 10 times before YARN fails the
 1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
 
    <pre>
-recovery.mode: zookeeper
-recovery.zookeeper.quorum: localhost:2181
-recovery.zookeeper.path.root: /flink
-recovery.zookeeper.path.namespace: /cluster_one # important: customize per cluster
+high-availability: zookeeper
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-recovery.zookeeper.storageDir: hdfs:///flink/recovery
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery
 yarn.application-attempts: 10</pre>
 
 3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine):

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 5f83c3d..18fa323 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -64,7 +64,7 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
 
 		if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
 			String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
-			config.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+			config.setString(ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
 		}
 
 		StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 98a843d..5cc1161 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -626,52 +626,127 @@ public final class ConfigConstants {
 
 	// --------------------------- Recovery -----------------------------------
 
-	/** Defines recovery mode used for the cluster execution ("standalone", "zookeeper") */
+	/** Defines recovery mode used for the cluster execution ("standalone", "zookeeper")
+	 *  Use {@link #HIGH_AVAILABILITY} instead
+	 * */
+	@Deprecated
 	public static final String RECOVERY_MODE = "recovery.mode";
 
+	/** Defines recovery mode used for the cluster execution ("NONE", "ZOOKEEPER") */
+	public static final String HIGH_AVAILABILITY = "high-availability";
+
 	/** Ports used by the job manager if not in standalone recovery mode */
+	@Deprecated
 	public static final String RECOVERY_JOB_MANAGER_PORT = "recovery.jobmanager.port";
 
+	/** Ports used by the job manager if not in 'none' recovery mode */
+	public static final String HA_JOB_MANAGER_PORT =
+		"high-availability.jobmanager.port";
+
 	/** The time before the JobManager recovers persisted jobs */
+	@Deprecated
 	public static final String RECOVERY_JOB_DELAY = "recovery.job.delay";
 
+	/** The time before the JobManager recovers persisted jobs */
+	public static final String HA_JOB_DELAY = "high-availability.job.delay";
+
 	// --------------------------- ZooKeeper ----------------------------------
 
 	/** ZooKeeper servers. */
+	@Deprecated
 	public static final String ZOOKEEPER_QUORUM_KEY = "recovery.zookeeper.quorum";
 
+	/** ZooKeeper servers. */
+	public static final String HA_ZOOKEEPER_QUORUM_KEY =
+		"high-availability.zookeeper.quorum";
+
 	/**
 	 * File system state backend base path for recoverable state handles. Recovery state is written
 	 * to this path and the file state handles are persisted for recovery.
 	 */
+	@Deprecated
 	public static final String ZOOKEEPER_RECOVERY_PATH = "recovery.zookeeper.storageDir";
 
+	/**
+	 * File system state backend base path for recoverable state handles. Recovery state is written
+	 * to this path and the file state handles are persisted for recovery.
+	 */
+	public static final String ZOOKEEPER_HA_PATH =
+		"high-availability.zookeeper.storageDir";
+
 	/** ZooKeeper root path. */
+	@Deprecated
 	public static final String ZOOKEEPER_DIR_KEY = "recovery.zookeeper.path.root";
 
+	/** ZooKeeper root path. */
+	public static final String HA_ZOOKEEPER_DIR_KEY =
+		"high-availability.zookeeper.path.root";
+
+	@Deprecated
 	public static final String ZOOKEEPER_NAMESPACE_KEY = "recovery.zookeeper.path.namespace";
 
+	public static final String HA_ZOOKEEPER_NAMESPACE_KEY =
+		"high-availability.zookeeper.path.namespace";
+
+	@Deprecated
 	public static final String ZOOKEEPER_LATCH_PATH = "recovery.zookeeper.path.latch";
 
+	public static final String HA_ZOOKEEPER_LATCH_PATH =
+		"high-availability.zookeeper.path.latch";
+
+	@Deprecated
 	public static final String ZOOKEEPER_LEADER_PATH = "recovery.zookeeper.path.leader";
 
+	public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";
+
 	/** ZooKeeper root path (ZNode) for job graphs. */
+	@Deprecated
 	public static final String ZOOKEEPER_JOBGRAPHS_PATH = "recovery.zookeeper.path.jobgraphs";
 
+	/** ZooKeeper root path (ZNode) for job graphs. */
+	public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH =
+		"high-availability.zookeeper.path.jobgraphs";
+
 	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	@Deprecated
 	public static final String ZOOKEEPER_CHECKPOINTS_PATH = "recovery.zookeeper.path.checkpoints";
 
+	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH =
+		"high-availability.zookeeper.path.checkpoints";
+
 	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	@Deprecated
 	public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "recovery.zookeeper.path.checkpoint-counter";
 
+	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
+		"high-availability.zookeeper.path.checkpoint-counter";
+
+	@Deprecated
 	public static final String ZOOKEEPER_SESSION_TIMEOUT = "recovery.zookeeper.client.session-timeout";
 
+	public static final String HA_ZOOKEEPER_SESSION_TIMEOUT =
+		"high-availability.zookeeper.client.session-timeout";
+
+	@Deprecated
 	public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "recovery.zookeeper.client.connection-timeout";
 
+	public static final String HA_ZOOKEEPER_CONNECTION_TIMEOUT =
+		"high-availability.zookeeper.client.connection-timeout";
+
+	@Deprecated
 	public static final String ZOOKEEPER_RETRY_WAIT = "recovery.zookeeper.client.retry-wait";
 
+	public static final String HA_ZOOKEEPER_RETRY_WAIT =
+		"high-availability.zookeeper.client.retry-wait";
+
+	@Deprecated
 	public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "recovery.zookeeper.client.max-retry-attempts";
 
+	public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS =
+		"high-availability.zookeeper.client.max-retry-attempts";
+
 	// ---------------------------- Metrics -----------------------------------
 
 	/**
@@ -1015,10 +1090,12 @@ public final class ConfigConstants {
 
 	public static final String LOCAL_START_WEBSERVER = "local.start-webserver";
 
-  	// --------------------------- Recovery ---------------------------------
-
+	// --------------------------- Recovery ---------------------------------
+	@Deprecated
 	public static String DEFAULT_RECOVERY_MODE = "standalone";
 
+	public static String DEFAULT_HIGH_AVAILABILTY = "none";
+
 	/**
 	 * Default port used by the job manager if not in standalone recovery mode. If <code>0</code>
 	 * the OS picks a random port port.

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-core/src/main/java/org/apache/flink/util/ConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ConfigurationUtil.java b/flink-core/src/main/java/org/apache/flink/util/ConfigurationUtil.java
new file mode 100644
index 0000000..44f098b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ConfigurationUtil.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility for accessing deprecated {@link Configuration} values.
+ */
+@Internal
+public class ConfigurationUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ConfigurationUtil.class);
+
+	/**
+	 * Returns the value associated with the given key as an Integer in the
+	 * given Configuration.
+	 *
+	 * <p>The regular key has precedence over any deprecated keys. The
+	 * precedence of deprecated keys depends on the argument order, first
+	 * deprecated keys have higher precedence than later ones.
+	 *
+	 * @param config Configuration to access
+	 * @param key Configuration key (highest precedence)
+	 * @param defaultValue Default value (if no key is set)
+	 * @param deprecatedKeys Optional deprecated keys in precedence order
+	 * @return Integer value associated with first found key or the default value
+	 */
+	public static int getIntegerWithDeprecatedKeys(
+			Configuration config,
+			String key,
+			int defaultValue,
+			String... deprecatedKeys) {
+
+		if (config.containsKey(key)) {
+			return config.getInteger(key, defaultValue);
+		} else {
+			// Check deprecated keys
+			for (String deprecatedKey : deprecatedKeys) {
+				if (config.containsKey(deprecatedKey)) {
+					LOG.warn("Configuration key '{}' has been deprecated. Please use '{}' instead.", deprecatedKey, key);
+					return config.getInteger(deprecatedKey, defaultValue);
+				}
+			}
+			return defaultValue;
+		}
+	}
+
+	/**
+	 * Returns the value associated with the given key as a String in the
+	 * given Configuration.
+	 *
+	 * <p>The regular key has precedence over any deprecated keys. The
+	 * precedence of deprecated keys depends on the argument order, first
+	 * deprecated keys have higher precedence than later ones.
+	 *
+	 * @param config Configuration to access
+	 * @param key Configuration key (highest precedence)
+	 * @param defaultValue Default value (if no key is set)
+	 * @param deprecatedKeys Optional deprecated keys in precedence order
+	 * @return String associated with first found key or the default value
+	 */
+	public static String getStringWithDeprecatedKeys(
+			Configuration config,
+			String key,
+			String defaultValue,
+			String... deprecatedKeys) {
+
+		if (config.containsKey(key)) {
+			return config.getString(key, defaultValue);
+		} else {
+			// Check deprecated keys
+			for (String deprecatedKey : deprecatedKeys) {
+				if (config.containsKey(deprecatedKey)) {
+					LOG.warn("Configuration key {} has been deprecated. Please use {} instead.", deprecatedKey, key);
+					return config.getString(deprecatedKey, defaultValue);
+				}
+			}
+			return defaultValue;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-core/src/test/java/org/apache/flink/util/ConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/ConfigurationUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/ConfigurationUtilTest.java
new file mode 100644
index 0000000..7ecbd3f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/ConfigurationUtilTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConfigurationUtilTest {
+
+	/**
+	 * Tests getInteger without any deprecated keys.
+	 */
+	@Test
+	public void testGetIntegerNoDeprecatedKeys() throws Exception {
+		Configuration config = new Configuration();
+		String key = "asdasd";
+		int value = 1223239;
+		int defaultValue = 272770;
+
+		assertEquals(defaultValue, ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue));
+
+		config.setInteger(key, value);
+		assertEquals(value, ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue));
+	}
+
+	/**
+	 * Tests getInteger with deprecated keys and checks precedence.
+	 */
+	@Test
+	public void testGetIntegerWithDeprecatedKeys() throws Exception {
+		Configuration config = new Configuration();
+		String key = "asdasd";
+		int value = 1223239;
+		int defaultValue = 272770;
+
+		String[] deprecatedKey = new String[] { "deprecated-0", "deprecated-1" };
+		int[] deprecatedValue = new int[] { 99192, 7727 };
+
+		assertEquals(defaultValue, ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue));
+
+		// Set 2nd deprecated key
+		config.setInteger(deprecatedKey[1], deprecatedValue[1]);
+		assertEquals(deprecatedValue[1], ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue, deprecatedKey[1]));
+
+		// Set 1st deprecated key (precedence)
+		config.setInteger(deprecatedKey[0], deprecatedValue[0]);
+		assertEquals(deprecatedValue[0], ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue, deprecatedKey[0]));
+
+		// Set current key
+		config.setInteger(key, value);
+		assertEquals(value, ConfigurationUtil.getIntegerWithDeprecatedKeys(config, key, defaultValue));
+	}
+
+	/**
+	 * Tests getString without any deprecated keys.
+	 */
+	@Test
+	public void testGetStringNoDeprecatedKeys() throws Exception {
+		Configuration config = new Configuration();
+		String key = "asdasd";
+		String value = "1223239";
+		String defaultValue = "272770";
+
+		assertEquals(defaultValue, ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue));
+
+		config.setString(key, value);
+		assertEquals(value, ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue));
+	}
+
+	/**
+	 * Tests getString with deprecated keys and checks precedence.
+	 */
+	@Test
+	public void testGetStringWithDeprecatedKeys() throws Exception {
+		Configuration config = new Configuration();
+		String key = "asdasd";
+		String value = "1223239";
+		String defaultValue = "272770";
+
+		String[] deprecatedKey = new String[] { "deprecated-0", "deprecated-1" };
+		String[] deprecatedValue = new String[] { "99192", "7727" };
+
+		assertEquals(defaultValue, ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue));
+
+		// Set 2nd deprecated key
+		config.setString(deprecatedKey[1], deprecatedValue[1]);
+		assertEquals(deprecatedValue[1], ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue, deprecatedKey[1]));
+
+		// Set 1st deprecated key (precedence)
+		config.setString(deprecatedKey[0], deprecatedValue[0]);
+		assertEquals(deprecatedValue[0], ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue, deprecatedKey[0]));
+
+		// Set current key
+		config.setString(key, value);
+		assertEquals(value, ConfigurationUtil.getStringWithDeprecatedKeys(config, key, defaultValue));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 9ffa713..687a39c 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -104,7 +104,9 @@ KEY_ENV_JAVA_OPTS="env.java.opts"
 KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
 KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
 KEY_ENV_SSH_OPTS="env.ssh.opts"
+#deprecated
 KEY_RECOVERY_MODE="recovery.mode"
+KEY_HIGH_AVAILABILITY="high-availability"
 KEY_ZK_HEAP_MB="zookeeper.heap.mb"
 
 ########################################################################################################################
@@ -257,8 +259,26 @@ if [ -z "${ZK_HEAP}" ]; then
     ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
 fi
 
+# for backward compatability
+if [ -z "${OLD_RECOVERY_MODE}" ]; then
+    OLD_RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}")
+fi
+
 if [ -z "${RECOVERY_MODE}" ]; then
-    RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}")
+     # Read the new config
+     RECOVERY_MODE=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")
+     if [ -z "${RECOVERY_MODE}" ]; then
+        #no new config found. So old config should be used
+        if [ -z "${OLD_RECOVERY_MODE}" ]; then
+            # If old config is also not found, use the 'none' as the default config
+            RECOVERY_MODE="none"
+        elif [ ${OLD_RECOVERY_MODE} = "standalone" ]; then
+            # if oldconfig is 'standalone', rename to 'none'
+            RECOVERY_MODE="none"
+        else
+            RECOVERY_MODE=${OLD_RECOVERY_MODE}
+        fi
+     fi
 fi
 
 # Arguments for the JVM. Used for job and task manager JVMs.

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 8bd4747..a2586ce 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -138,7 +138,7 @@ jobmanager.web.port: 8081
 # setup. This must be a list of the form:
 # "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
 #
-# recovery.mode: zookeeper
+# high-availability: zookeeper
 #
 # recovery.zookeeper.quorum: localhost:2181,...
 #

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 677ff54..d9edafe 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -296,8 +296,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			final Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
 			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
-			config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
-			config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());
+			config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+			config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());
 
 			actorSystem = AkkaUtils.createDefaultActorSystem();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index b4b6812..d1b78a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,7 +77,7 @@ public class BlobServer extends Thread implements BlobService {
 
 	/**
 	 * Shutdown hook thread to ensure deletion of the storage directory (or <code>null</code> if
-	 * the configured recovery mode does not equal{@link RecoveryMode#STANDALONE})
+	 * the configured recovery mode does not equal{@link HighAvailabilityMode#NONE})
 	 */
 	private final Thread shutdownHook;
 
@@ -90,7 +90,7 @@ public class BlobServer extends Thread implements BlobService {
 	public BlobServer(Configuration config) throws IOException {
 		checkNotNull(config, "Configuration");
 
-		RecoveryMode recoveryMode = RecoveryMode.fromConfig(config);
+		HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config);
 
 		// configure and create the storage directory
 		String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
@@ -98,14 +98,14 @@ public class BlobServer extends Thread implements BlobService {
 		LOG.info("Created BLOB server storage directory {}", storageDir);
 
 		// No recovery.
-		if (recoveryMode == RecoveryMode.STANDALONE) {
+		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
 			this.blobStore = new VoidBlobStore();
 		}
 		// Recovery.
-		else if (recoveryMode == RecoveryMode.ZOOKEEPER) {
+		else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
 			this.blobStore = new FileSystemBlobStore(config);
 		} else {
-			throw new IllegalConfigurationException("Unexpected recovery mode '" + recoveryMode + ".");
+			throw new IllegalConfigurationException("Unexpected recovery mode '" + highAvailabilityMode + ".");
 		}
 
 		// configure the maximum number of concurrent connections
@@ -128,7 +128,7 @@ public class BlobServer extends Thread implements BlobService {
 			backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
 		}
 
-		if (recoveryMode == RecoveryMode.STANDALONE) {
+		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
 			// Add shutdown hook to delete storage directory
 			this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 5641d87..f535c35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -51,12 +51,15 @@ class FileSystemBlobStore implements BlobStore {
 	private final String basePath;
 
 	FileSystemBlobStore(Configuration config) throws IOException {
-		String recoveryPath = config.getString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, null);
+		String recoveryPath = config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, null);
+		if(recoveryPath == null) {
+			recoveryPath = config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, null);
+		}
 
 		if (recoveryPath == null) {
 			throw new IllegalConfigurationException(String.format("Missing configuration for " +
 					"file system state backend recovery path. Please specify via " +
-					"'%s' key.", ConfigConstants.ZOOKEEPER_RECOVERY_PATH));
+					"'%s' key.", ConfigConstants.ZOOKEEPER_HA_PATH));
 		}
 
 		this.basePath = recoveryPath + "/blob";

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index 0a235bc..c2f67f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link HighAvailabilityMode#NONE}.
  *
  * <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job managers are not
  * recoverable in this recovery mode.

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index 05f9e77..aecb51e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 /**
- * {@link CheckpointCoordinator} components in {@link RecoveryMode#STANDALONE}.
+ * {@link CheckpointCoordinator} components in {@link HighAvailabilityMode#NONE}.
  */
 public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index bc111cd..a35ca77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -28,7 +28,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#STANDALONE}.
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}.
  */
 public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index 12839c1..0bceb8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -23,14 +23,14 @@ import org.apache.curator.framework.recipes.shared.SharedCount;
 import org.apache.curator.framework.recipes.shared.VersionedValue;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link HighAvailabilityMode#ZOOKEEPER}.
  *
  * <p>Each counter creates a ZNode:
  * <pre>

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index dcd6260..55a0bbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -21,13 +21,13 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@link CheckpointCoordinator} components in {@link RecoveryMode#ZOOKEEPER}.
+ * {@link CheckpointCoordinator} components in {@link HighAvailabilityMode#ZOOKEEPER}.
  */
 public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 541629d..376ef70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -24,7 +24,7 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
@@ -40,7 +40,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
+ * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#ZOOKEEPER}.
  *
  * <p>Checkpoints are added under a ZNode per job:
  * <pre>

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
new file mode 100644
index 0000000..8e2efa8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Recovery mode for Flink's cluster execution. Currently supported modes are:
+ *
+ * - Standalone: No recovery from JobManager failures
+ * - ZooKeeper: JobManager high availability via ZooKeeper
+ * ZooKeeper is used to select a leader among a group of JobManager. This JobManager
+ * is responsible for the job execution. Upon failure of the leader a new leader is elected
+ * which will take over the responsibilities of the old leader
+ */
+public enum HighAvailabilityMode {
+	// STANDALONE mode renamed to NONE
+	NONE,
+	ZOOKEEPER;
+
+	/**
+	 * Return the configured {@link HighAvailabilityMode}.
+	 *
+	 * @param config The config to parse
+	 * @return Configured recovery mode or {@link ConfigConstants#DEFAULT_HIGH_AVAILABILTY} if not
+	 * configured.
+	 */
+	public static HighAvailabilityMode fromConfig(Configuration config) {
+		// Not passing the default value here so that we could determine
+		// if there is an older config set
+		String recoveryMode = config.getString(
+			ConfigConstants.HIGH_AVAILABILITY, "");
+		if (recoveryMode.isEmpty()) {
+			// New config is not set.
+			// check the older one
+			// check for older 'recover.mode' config
+			recoveryMode = config.getString(
+				ConfigConstants.RECOVERY_MODE,
+				ConfigConstants.DEFAULT_RECOVERY_MODE);
+			if (recoveryMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
+				// There is no HA configured.
+				return HighAvailabilityMode.valueOf(ConfigConstants.DEFAULT_HIGH_AVAILABILTY.toUpperCase());
+			}
+		} else if (recoveryMode.equalsIgnoreCase(ConfigConstants.DEFAULT_HIGH_AVAILABILTY)) {
+			// The new config is found but with default value. So use this
+			return HighAvailabilityMode.valueOf(ConfigConstants.DEFAULT_HIGH_AVAILABILTY.toUpperCase());
+		}
+		return HighAvailabilityMode.valueOf(recoveryMode.toUpperCase());
+	}
+
+	/**
+	 * Returns true if the defined recovery mode supports high availability.
+	 *
+	 * @param configuration Configuration which contains the recovery mode
+	 * @return true if high availability is supported by the recovery mode, otherwise false
+	 */
+	public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
+		HighAvailabilityMode mode = fromConfig(configuration);
+		switch (mode) {
+			case NONE:
+				return false;
+			case ZOOKEEPER:
+				return true;
+			default:
+				return false;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
deleted file mode 100644
index 077e34d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Recovery mode for Flink's cluster execution. Currently supported modes are:
- *
- * - Standalone: No recovery from JobManager failures
- * - ZooKeeper: JobManager high availability via ZooKeeper
- * ZooKeeper is used to select a leader among a group of JobManager. This JobManager
- * is responsible for the job execution. Upon failure of the leader a new leader is elected
- * which will take over the responsibilities of the old leader
- */
-public enum RecoveryMode {
-	STANDALONE,
-	ZOOKEEPER;
-
-	/**
-	 * Return the configured {@link RecoveryMode}.
-	 *
-	 * @param config The config to parse
-	 * @return Configured recovery mode or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not
-	 * configured.
-	 */
-	public static RecoveryMode fromConfig(Configuration config) {
-		return RecoveryMode.valueOf(config.getString(
-				ConfigConstants.RECOVERY_MODE,
-				ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase());
-	}
-
-	/**
-	 * Returns true if the defined recovery mode supports high availability.
-	 *
-	 * @param configuration Configuration which contains the recovery mode
-	 * @return true if high availability is supported by the recovery mode, otherwise false
-	 */
-	public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
-		String recoveryMode = configuration.getString(
-			ConfigConstants.RECOVERY_MODE,
-			ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();
-
-		RecoveryMode mode = RecoveryMode.valueOf(recoveryMode);
-
-		switch(mode) {
-			case STANDALONE:
-				return false;
-			case ZOOKEEPER:
-				return true;
-			default:
-				return false;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index db36f92..3041cde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -26,7 +26,7 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link HighAvailabilityMode#NONE}.
  *
  * <p>All operations are NoOps, because {@link JobGraph} instances cannot be recovered in this
  * recovery mode.

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 128db83..7f7c5fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -44,7 +44,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link HighAvailabilityMode#ZOOKEEPER}.
  *
  * <p>Each job graph creates ZNode:
  * <pre>

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 8f88daa..7a656cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -28,7 +28,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -61,15 +61,15 @@ public class LeaderRetrievalUtils {
 	public static LeaderRetrievalService createLeaderRetrievalService(Configuration configuration)
 		throws Exception {
 
-		RecoveryMode recoveryMode = getRecoveryMode(configuration);
+		HighAvailabilityMode highAvailabilityMode = getRecoveryMode(configuration);
 
-		switch (recoveryMode) {
-			case STANDALONE:
+		switch (highAvailabilityMode) {
+			case NONE:
 				return StandaloneUtils.createLeaderRetrievalService(configuration);
 			case ZOOKEEPER:
 				return ZooKeeperUtils.createLeaderRetrievalService(configuration);
 			default:
-				throw new Exception("Recovery mode " + recoveryMode + " is not supported.");
+				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
 		}
 	}
 
@@ -86,16 +86,16 @@ public class LeaderRetrievalUtils {
 	public static LeaderRetrievalService createLeaderRetrievalService(
 				Configuration configuration, ActorRef standaloneRef) throws Exception {
 
-		RecoveryMode recoveryMode = getRecoveryMode(configuration);
+		HighAvailabilityMode highAvailabilityMode = getRecoveryMode(configuration);
 
-		switch (recoveryMode) {
-			case STANDALONE:
+		switch (highAvailabilityMode) {
+			case NONE:
 				String akkaUrl = standaloneRef.path().toSerializationFormat();
 				return new StandaloneLeaderRetrievalService(akkaUrl);
 			case ZOOKEEPER:
 				return ZooKeeperUtils.createLeaderRetrievalService(configuration);
 			default:
-				throw new Exception("Recovery mode " + recoveryMode + " is not supported.");
+				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
 		}
 	}
 	
@@ -282,7 +282,7 @@ public class LeaderRetrievalUtils {
 	}
 
 	/**
-	 * Gets the recovery mode as configured, based on the {@link ConfigConstants#RECOVERY_MODE}
+	 * Gets the recovery mode as configured, based on the {@link ConfigConstants#HIGH_AVAILABILITY}
 	 * config key.
 	 * 
 	 * @param config The configuration to read the recovery mode from.
@@ -291,20 +291,8 @@ public class LeaderRetrievalUtils {
 	 * @throws IllegalConfigurationException Thrown, if the recovery mode does not correspond
 	 *                                       to a known value.
 	 */
-	public static RecoveryMode getRecoveryMode(Configuration config) {
-		String mode = config.getString(
-			ConfigConstants.RECOVERY_MODE,
-			ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();
-		
-		switch (mode) {
-			case "STANDALONE":
-				return RecoveryMode.STANDALONE;
-			case "ZOOKEEPER":
-				return RecoveryMode.ZOOKEEPER;
-			default:
-				throw new IllegalConfigurationException(
-					"The value for '" + ConfigConstants.RECOVERY_MODE + "' is unknown: " + mode);
-		}
+	public static HighAvailabilityMode getRecoveryMode(Configuration config) {
+		return HighAvailabilityMode.fromConfig(config);
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 3986fed..5fd6f8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
@@ -56,35 +56,44 @@ public class ZooKeeperUtils {
 	 * @return {@link CuratorFramework} instance
 	 */
 	public static CuratorFramework startCuratorFramework(Configuration configuration) {
-		String zkQuorum = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
-
+		String zkQuorum = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "");
+		if (zkQuorum.isEmpty()) {
+			zkQuorum = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
+		}
 		if (zkQuorum == null || zkQuorum.equals("")) {
 			throw new RuntimeException("No valid ZooKeeper quorum has been specified. " +
 					"You can specify the quorum via the configuration key '" +
-					ConfigConstants.ZOOKEEPER_QUORUM_KEY + "'.");
+					ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY + "'.");
 		}
 
-		int sessionTimeout = configuration.getInteger(
-				ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT,
-				ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
+		int sessionTimeout = getConfiguredIntValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT,
+			ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT,
+			ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
 
-		int connectionTimeout = configuration.getInteger(
-				ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
-				ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT);
+		int connectionTimeout = getConfiguredIntValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT,
+			ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
+			ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
 
-		int retryWait = configuration.getInteger(
-				ConfigConstants.ZOOKEEPER_RETRY_WAIT,
-				ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
+		int retryWait = getConfiguredIntValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_RETRY_WAIT,
+			ConfigConstants.ZOOKEEPER_RETRY_WAIT,
+			ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
 
-		int maxRetryAttempts = configuration.getInteger(
-				ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS,
-				ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS);
+		int maxRetryAttempts = getConfiguredIntValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+			ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+			ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS);
 
-		String root = configuration.getString(ConfigConstants.ZOOKEEPER_DIR_KEY,
-				ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY);
+		String root = getConfiguredStringValue(configuration, ConfigConstants.HA_ZOOKEEPER_DIR_KEY,
+			ConfigConstants.ZOOKEEPER_DIR_KEY,
+			ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY);
 
-		String namespace = configuration.getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY,
-				ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY);
+		String namespace = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
+			ConfigConstants.ZOOKEEPER_NAMESPACE_KEY,
+			ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY);
 
 		String rootWithNamespace = generateZookeeperPath(root, namespace);
 
@@ -105,11 +114,36 @@ public class ZooKeeperUtils {
 		return cf;
 	}
 
+	private static int getConfiguredIntValue(Configuration configuration, String newConfigName, String oldConfigName, int defaultValue) {
+		int val = configuration.getInteger(newConfigName, -1);
+		if (val == -1) {
+			val = configuration.getInteger(
+				oldConfigName, -1);
+		}
+		// if still the val is not set use the default value
+		if (val == -1) {
+			return defaultValue;
+		}
+		return val;
+	}
+
+	private static String getConfiguredStringValue(Configuration configuration, String newConfigName, String oldConfigName, String defaultValue) {
+		String val = configuration.getString(newConfigName, "");
+		if (val.isEmpty()) {
+			val = configuration.getString(
+				oldConfigName, "");
+		}
+		// still no value found - use the default value
+		if (val.isEmpty()) {
+			return defaultValue;
+		}
+		return val;
+	}
 	/**
-	 * Returns whether {@link RecoveryMode#ZOOKEEPER} is configured.
+	 * Returns whether {@link HighAvailabilityMode#ZOOKEEPER} is configured.
 	 */
 	public static boolean isZooKeeperRecoveryMode(Configuration flinkConf) {
-		return RecoveryMode.fromConfig(flinkConf).equals(RecoveryMode.ZOOKEEPER);
+		return HighAvailabilityMode.fromConfig(flinkConf).equals(HighAvailabilityMode.ZOOKEEPER);
 	}
 
 	/**
@@ -119,7 +153,10 @@ public class ZooKeeperUtils {
 	public static String getZooKeeperEnsemble(Configuration flinkConf)
 			throws IllegalConfigurationException {
 
-		String zkQuorum = flinkConf.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
+		String zkQuorum = flinkConf.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "");
+		if (zkQuorum.isEmpty()) {
+			zkQuorum = flinkConf.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
+		}
 
 		if (zkQuorum == null || zkQuorum.equals("")) {
 			throw new IllegalConfigurationException("No ZooKeeper quorum specified in config.");
@@ -141,8 +178,9 @@ public class ZooKeeperUtils {
 	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
 			Configuration configuration) throws Exception {
 		CuratorFramework client = startCuratorFramework(configuration);
-		String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+		String leaderPath = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, ConfigConstants.ZOOKEEPER_LEADER_PATH,
+			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
 
 		return new ZooKeeperLeaderRetrievalService(client, leaderPath);
 	}
@@ -175,9 +213,11 @@ public class ZooKeeperUtils {
 			CuratorFramework client,
 			Configuration configuration) throws Exception {
 
-		String latchPath = configuration.getString(ConfigConstants.ZOOKEEPER_LATCH_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
-		String leaderPath = configuration.getString(ConfigConstants.ZOOKEEPER_LEADER_PATH,
+		String latchPath = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_LATCH_PATH, ConfigConstants.ZOOKEEPER_LATCH_PATH,
+			ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
+		String leaderPath = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, ConfigConstants.ZOOKEEPER_LEADER_PATH,
 			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
 
 		return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
@@ -199,9 +239,9 @@ public class ZooKeeperUtils {
 		StateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph");
 
 		// ZooKeeper submitted jobs root dir
-		String zooKeeperSubmittedJobsPath = configuration.getString(
-				ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+		String zooKeeperSubmittedJobsPath = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH, ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
+			ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
 				client, zooKeeperSubmittedJobsPath, stateStorage);
@@ -226,7 +266,8 @@ public class ZooKeeperUtils {
 
 		checkNotNull(configuration, "Configuration");
 
-		String checkpointsPath = configuration.getString(
+		String checkpointsPath = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_CHECKPOINTS_PATH,
 			ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
 			ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
 
@@ -257,9 +298,10 @@ public class ZooKeeperUtils {
 			Configuration configuration,
 			JobID jobId) throws Exception {
 
-		String checkpointIdCounterPath = configuration.getString(
-				ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+		String checkpointIdCounterPath = getConfiguredStringValue(configuration,
+			ConfigConstants.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+			ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+			ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
 
 		checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
 
@@ -280,11 +322,15 @@ public class ZooKeeperUtils {
 			String prefix) throws IOException {
 
 		String rootPath = configuration.getString(
-			ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
+			ConfigConstants.ZOOKEEPER_HA_PATH, "");
+		if (rootPath.isEmpty()) {
+			rootPath = configuration.getString(
+				ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
+		}
 
 		if (rootPath.equals("")) {
 			throw new IllegalConfigurationException("Missing recovery path. Specify via " +
-				"configuration key '" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "'.");
+				"configuration key '" + ConfigConstants.ZOOKEEPER_HA_PATH + "'.");
 		} else {
 			return new FileSystemStateStorageHelper<T>(rootPath, prefix);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a82e89a..5962afc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -155,7 +155,7 @@ class JobManager(
   /** Either running or not yet archived jobs (session hasn't been ended). */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
 
-  protected val recoveryMode = RecoveryMode.fromConfig(flinkConfiguration)
+  protected val recoveryMode = HighAvailabilityMode.fromConfig(flinkConfiguration)
 
   var leaderSessionID: Option[UUID] = None
 
@@ -317,7 +317,7 @@ class JobManager(
 
         // TODO (critical next step) This needs to be more flexible and robust (e.g. wait for task
         // managers etc.)
-        if (recoveryMode != RecoveryMode.STANDALONE) {
+        if (recoveryMode != HighAvailabilityMode.NONE) {
           log.info(s"Delaying recovery of all jobs by $jobRecoveryTimeout.")
 
           context.system.scheduler.scheduleOnce(
@@ -2026,7 +2026,7 @@ object JobManager {
 
     if (!listeningPortRange.hasNext) {
       if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
-        val message = "Config parameter '" + ConfigConstants.RECOVERY_JOB_MANAGER_PORT +
+        val message = "Config parameter '" + ConfigConstants.HA_JOB_MANAGER_PORT +
           "' does not specify a valid port range."
         LOG.error(message)
         System.exit(STARTUP_FAILURE_RETURN_CODE)
@@ -2568,8 +2568,8 @@ object JobManager {
 
     // Create recovery related components
     val (leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory) =
-      RecoveryMode.fromConfig(configuration) match {
-        case RecoveryMode.STANDALONE =>
+      HighAvailabilityMode.fromConfig(configuration) match {
+        case HighAvailabilityMode.NONE =>
           val leaderElectionService = leaderElectionServiceOption match {
             case Some(les) => les
             case None => new StandaloneLeaderElectionService()
@@ -2579,7 +2579,7 @@ object JobManager {
             new StandaloneSubmittedJobGraphStore(),
             new StandaloneCheckpointRecoveryFactory())
 
-        case RecoveryMode.ZOOKEEPER =>
+        case HighAvailabilityMode.ZOOKEEPER =>
           val client = ZooKeeperUtils.startCuratorFramework(configuration)
 
           val leaderElectionService = leaderElectionServiceOption match {
@@ -2594,7 +2594,10 @@ object JobManager {
 
     val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
 
-    val jobRecoveryTimeoutStr = configuration.getString(ConfigConstants.RECOVERY_JOB_DELAY, "");
+    var jobRecoveryTimeoutStr = configuration.getString(ConfigConstants.HA_JOB_DELAY, "");
+    if (jobRecoveryTimeoutStr.isEmpty) {
+      jobRecoveryTimeoutStr = configuration.getString(ConfigConstants.RECOVERY_JOB_DELAY, "");
+    }
 
     val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty) {
       timeout
@@ -2604,7 +2607,7 @@ object JobManager {
       } catch {
         case n: NumberFormatException =>
           throw new Exception(
-            s"Invalid config value for ${ConfigConstants.RECOVERY_JOB_DELAY}: " +
+            s"Invalid config value for ${ConfigConstants.HA_JOB_DELAY}: " +
               s"$jobRecoveryTimeoutStr. Value must be a valid duration (such as '10 s' or '1 min')")
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index f6e9360..0778aae 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -24,22 +24,18 @@ import java.util.UUID
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
-
 import com.typesafe.config.Config
-
-import org.apache.flink.api.common.{JobID, JobExecutionResult, JobSubmissionResult}
+import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.{JobExecutionException, JobClient}
-import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
+import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
+import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
-import org.apache.flink.runtime.jobmanager.RecoveryMode
-import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalService, LeaderRetrievalListener,
-StandaloneLeaderRetrievalService}
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.ZooKeeperUtils
-import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor}
-
+import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -88,7 +84,7 @@ abstract class FlinkMiniCluster(
 
   implicit val timeout = AkkaUtils.getTimeout(configuration)
 
-  val recoveryMode = RecoveryMode.fromConfig(configuration)
+  val recoveryMode = HighAvailabilityMode.fromConfig(configuration)
 
   val numJobManagers = getNumberOfJobManagers
 
@@ -126,7 +122,7 @@ abstract class FlinkMiniCluster(
   // --------------------------------------------------------------------------
 
   def getNumberOfJobManagers: Int = {
-    if(recoveryMode == RecoveryMode.STANDALONE) {
+    if(recoveryMode == HighAvailabilityMode.NONE) {
       1
     } else {
       configuration.getInteger(
@@ -137,7 +133,7 @@ abstract class FlinkMiniCluster(
   }
 
   def getNumberOfResourceManagers: Int = {
-    if(recoveryMode == RecoveryMode.STANDALONE) {
+    if(recoveryMode == HighAvailabilityMode.NONE) {
       1
     } else {
       configuration.getInteger(
@@ -528,7 +524,7 @@ abstract class FlinkMiniCluster(
   protected def createLeaderRetrievalService(): LeaderRetrievalService = {
     (jobManagerActorSystems, jobManagerActors) match {
       case (Some(jmActorSystems), Some(jmActors)) =>
-        if (recoveryMode == RecoveryMode.STANDALONE) {
+        if (recoveryMode == HighAvailabilityMode.NONE) {
           new StandaloneLeaderRetrievalService(
             AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)))
         } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 528a20c..bd4723f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -22,7 +22,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,7 +55,7 @@ public class BlobRecoveryITCase {
 	}
 
 	/**
-	 * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any
+	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
 	 * participating BlobServer.
 	 */
 	@Test
@@ -68,9 +68,9 @@ public class BlobRecoveryITCase {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
+			config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
 			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, recoveryDir.getPath());
+			config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, recoveryDir.getPath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index f1021e6..a3fe0d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -48,7 +48,7 @@ public class BlobLibraryCacheRecoveryITCase {
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 	/**
-	 * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are recoverable from any
+	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
 	 * participating BlobLibraryCacheManager.
 	 */
 	@Test
@@ -63,9 +63,9 @@ public class BlobLibraryCacheRecoveryITCase {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
+			config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
 			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, temporaryFolder.getRoot().getAbsolutePath());
+			config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, temporaryFolder.getRoot().getAbsolutePath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index f050e29..d980517 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -123,8 +123,8 @@ public class JobManagerHARecoveryTest {
 		ActorRef jobManager = null;
 		ActorRef taskManager = null;
 
-		flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, temporaryFolder.newFolder().toString());
+		flinkConfiguration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, temporaryFolder.newFolder().toString());
 		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/01ffe34c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index bdd019d..5c696ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.InstanceManager;
-import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -171,7 +171,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 
 	private Props createJobManagerProps(Configuration configuration) throws Exception {
 		LeaderElectionService leaderElectionService;
-		if (RecoveryMode.fromConfig(configuration) == RecoveryMode.STANDALONE) {
+		if (HighAvailabilityMode.fromConfig(configuration) == HighAvailabilityMode.NONE) {
 			leaderElectionService = new StandaloneLeaderElectionService();
 		}
 		else {