You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/25 18:48:59 UTC

[56/89] [abbrv] flink git commit: [FLINK-4253] [config] Clean up renaming of 'recovery.mode'

[FLINK-4253] [config] Clean up renaming of 'recovery.mode'

- Renamed config keys and default values to be consistent
- Updated default flink-conf.yaml
- Cleaned up code occurrences of recovery mode

This closes #2342.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58165d69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58165d69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58165d69

Branch: refs/heads/flip-6
Commit: 58165d69fb4cd49018031f14bfaf1e17039b36f7
Parents: 01ffe34
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Aug 22 14:59:14 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Aug 24 12:09:31 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  34 ++--
 docs/setup/jobmanager_high_availability.md      |  16 +-
 .../flink/configuration/ConfigConstants.java    | 157 +++++++++--------
 flink-dist/src/main/flink-bin/bin/config.sh     |  33 ++--
 .../src/main/flink-bin/bin/start-cluster.sh     |   2 +-
 .../src/main/flink-bin/bin/stop-cluster.sh      |   2 +-
 flink-dist/src/main/resources/flink-conf.yaml   |  16 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |   4 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  11 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |   3 +-
 .../flink/runtime/blob/FileSystemBlobStore.java |  20 ++-
 .../StandaloneCheckpointIDCounter.java          |   3 +-
 .../jobmanager/HighAvailabilityMode.java        |  40 ++---
 .../runtime/util/LeaderRetrievalUtils.java      |   2 +-
 .../flink/runtime/util/ZooKeeperUtils.java      | 175 +++++++++----------
 .../flink/runtime/jobmanager/JobManager.scala   |  17 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  14 +-
 .../flink/runtime/blob/BlobRecoveryITCase.java  |   4 +-
 .../client/JobClientActorRecoveryITCase.java    |   3 +-
 .../BlobLibraryCacheRecoveryITCase.java         |   4 +-
 .../jobmanager/HighAvailabilityModeTest.java    |  71 ++++++++
 .../jobmanager/JobManagerHARecoveryTest.java    |   4 +-
 .../JobManagerLeaderElectionTest.java           |   4 +-
 .../ZooKeeperLeaderElectionTest.java            |  12 +-
 .../ZooKeeperLeaderRetrievalTest.java           |  46 +----
 .../runtime/testutils/JobManagerProcess.java    |   2 +-
 .../runtime/testutils/TaskManagerProcess.java   |   2 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |  10 +-
 .../runtime/testingUtils/TestingUtils.scala     |   8 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   2 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   2 +-
 .../flink/test/recovery/ChaosMonkeyITCase.java  |   4 +-
 .../JobManagerHACheckpointRecoveryITCase.java   |   4 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |   6 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |   4 +-
 .../ZooKeeperLeaderElectionITCase.java          |   8 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   2 +-
 37 files changed, 389 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index e6a335b..51475cc 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -134,7 +134,7 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups.
 
-- `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this config was named as `recovery.zookeeper.storageDir`.
+- `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user jar's) on the TaskManagers.
 
@@ -283,31 +283,37 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use
   For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.
 
 
-## High Availability Mode
+## High Availability (HA)
 
-- `high-availability`: (Default 'none') Defines the recovery mode used for the cluster execution. Currently, Flink supports the 'none' mode where only a single JobManager runs and no JobManager state is checkpointed. The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value.  Previously this config was named 'recovery.mode' and the default config was 'standalone'.
+- `high-availability`: Defines the high availability mode used for the cluster execution. Currently, Flink supports the following modes:
+  - `none` (default): No high availability. A single JobManager runs and no JobManager state is checkpointed.
+  - `zookeeper`: Supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `high-availability.zookeeper.quorum` configuration value.
 
-- `high-availability.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected.  Previously this config was name as `recovery.zookeeper.quorum`.
+Previously this key was named `recovery.mode` and the default value was `standalone`.
 
-- `high-availability.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create namespace directories.  Previously this config was name as `recovery.zookeeper.path.root`.
+### ZooKeeper-based HA Mode
 
-- `high-availability.zookeeper.path.namespace`: (Default '/default_ns' in standalone mode, or the <yarn-application-id> under Yarn) Defines the subdirectory under the root dir where the ZooKeeper recovery mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper.  Previously this config was named as `recovery.zookeeper.path.namespace`.
+- `high-availability.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connect to the ZooKeeper cluster when the 'zookeeper' HA mode is selected. Previously this key was name `recovery.zookeeper.quorum`.
 
-- `high-availability.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.  Previously this config was named as `recovery.zookeeper.path.latch`.
+- `high-availability.zookeeper.path.root`: (Default `/flink`) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named `recovery.zookeeper.path.root`.
 
-- `high-availability.zookeeper.path.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID  Previously this config was named as `recovery.zookeeper.path.leader`.
+- `high-availability.zookeeper.path.namespace`: (Default `/default_ns` in standalone cluster mode, or the <yarn-application-id> under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named `recovery.zookeeper.path.namespace`.
 
-- `high-availability.zookeeper.storageDir`: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA.  Previously this config was named as `recovery.zookeeper.storageDir`.
+- `high-availability.zookeeper.path.latch`: (Default `/leaderlatch`) Defines the znode of the leader latch which is used to elect the leader. Previously this key was named `recovery.zookeeper.path.latch`.
 
-- `high-availability.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.  Previously this config was named as `recovery.zookeeper.client.session-timeout`
+- `high-availability.zookeeper.path.leader`: (Default `/leader`) Defines the znode of the leader which contains the URL to the leader and the current leader session ID. Previously this key was named `recovery.zookeeper.path.leader`.
 
-- `high-availability.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.  Previously this config was named as `recovery.zookeeper.client.connection-timeout`.
+- `high-availability.zookeeper.storageDir`: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA. Previously this key was named `recovery.zookeeper.storageDir`.
 
-- `high-availability.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.  Previously this config was named as `recovery.zookeeper.client.retry-wait`.
+- `high-availability.zookeeper.client.session-timeout`: (Default `60000`) Defines the session timeout for the ZooKeeper session in ms. Previously this key was named `recovery.zookeeper.client.session-timeout`
 
-- `high-availability.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.  Previously this config was named as `recovery.zookeeper.client.max-retry-attempts`.
+- `high-availability.zookeeper.client.connection-timeout`: (Default `15000`) Defines the connection timeout for ZooKeeper in ms. Previously this key was named `recovery.zookeeper.client.connection-timeout`.
 
-- `high-availability.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation.  Previously this config was named as `recovery.job.delay`.
+- `high-availability.zookeeper.client.retry-wait`: (Default `5000`) Defines the pause between consecutive retries in ms. Previously this key was named `recovery.zookeeper.client.retry-wait`.
+
+- `high-availability.zookeeper.client.max-retry-attempts`: (Default `3`) Defines the number of connection retries before the client gives up. Previously this key was named `recovery.zookeeper.client.max-retry-attempts`.
+
+- `high-availability.job.delay`: (Default `akka.ask.timeout`) Defines the delay before persisted jobs are recovered in case of a master recovery situation. Previously this key was named `recovery.job.delay`.
 
 ## Environment
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md
index dd6782d..9dcc7cc 100644
--- a/docs/setup/jobmanager_high_availability.md
+++ b/docs/setup/jobmanager_high_availability.md
@@ -44,7 +44,7 @@ As an example, consider the following setup with three JobManager instances:
 
 To enable JobManager High Availability you have to set the **high-availability mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports.
 
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for  *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) installation.
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) installation.
 
 #### Masters File (masters)
 
@@ -67,7 +67,6 @@ In order to start an HA-cluster add the following configuration keys to `conf/fl
 - **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.
 
   <pre>high-availability: zookeeper</pre>
-- **Previously this config was named 'recovery.mode' and the default config was 'standalone'.
 
 - **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.
 
@@ -83,14 +82,14 @@ In order to start an HA-cluster add the following configuration keys to `conf/fl
 
   <pre>high-availability.zookeeper.path.namespace: /default_ns # important: customize per cluster</pre>
 
-  **Important**: if you are running multiple Flink HA clusters, you have to manually configure separate namespaces for each cluster. By default, the Yarn cluster and the Yarn session automatically generate namespaces based on Yarn application id. A manual configuration overrides this behaviour in Yarn. Specifying a namespace with the -z CLI option, in turn, overrides manual configuration. 
+  **Important**: if you are running multiple Flink HA clusters, you have to manually configure separate namespaces for each cluster. By default, the Yarn cluster and the Yarn session automatically generate namespaces based on Yarn application id. A manual configuration overrides this behaviour in Yarn. Specifying a namespace with the -z CLI option, in turn, overrides manual configuration.
 
 - **State backend and storage directory** (required): JobManager meta data is persisted in the *state backend* and only a pointer to this state is stored in ZooKeeper. Currently, only the file system state backend is supported in HA mode.
 
     <pre>
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
 
     The `storageDir` stores all meta data needed to recover a JobManager failure.
 
@@ -98,16 +97,17 @@ After configuring the masters and the ZooKeeper quorum, you can use the provided
 
 #### Example: Standalone Cluster with 2 JobManagers
 
-1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
+1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
 
    <pre>
 high-availability: zookeeper
 high-availability.zookeeper.quorum: localhost:2181
 high-availability.zookeeper.path.root: /flink
 high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
+
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-high-availability.zookeeper.storageDir: hdfs:///flink/recovery</pre>
 
 2. **Configure masters** in `conf/masters`:
 
@@ -184,16 +184,16 @@ This means that the application can be restarted 10 times before YARN fails the
 
 #### Example: Highly Available YARN Session
 
-1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
+1. **Configure HA mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
 
    <pre>
 high-availability: zookeeper
 high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.storageDir: hdfs:///flink/recovery
 high-availability.zookeeper.path.root: /flink
 high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster
 state.backend: filesystem
 state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
-high-availability.zookeeper.storageDir: hdfs:///flink/recovery
 yarn.application-attempts: 10</pre>
 
 3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine):

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 5cc1161..514c730 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -624,129 +624,134 @@ public final class ConfigConstants {
 	
 	public static final String FLINK_JVM_OPTIONS = "env.java.opts";
 
-	// --------------------------- Recovery -----------------------------------
+	// --------------------------- High Availability --------------------------
 
-	/** Defines recovery mode used for the cluster execution ("standalone", "zookeeper")
-	 *  Use {@link #HIGH_AVAILABILITY} instead
-	 * */
+	/** Defines high availabilty mode used for the cluster execution ("NONE", "ZOOKEEPER") */
+	@PublicEvolving
+	public static final String HA_MODE = "high-availability";
+
+	/** Ports used by the job manager if not in 'none' recovery mode */
+	@PublicEvolving
+	public static final String HA_JOB_MANAGER_PORT = "high-availability.jobmanager.port";
+
+	/** The time before the JobManager recovers persisted jobs */
+	@PublicEvolving
+	public static final String HA_JOB_DELAY = "high-availability.job.delay";
+
+	/** Deprecated in favour of {@link #HA_MODE}. */
 	@Deprecated
 	public static final String RECOVERY_MODE = "recovery.mode";
 
-	/** Defines recovery mode used for the cluster execution ("NONE", "ZOOKEEPER") */
-	public static final String HIGH_AVAILABILITY = "high-availability";
-
-	/** Ports used by the job manager if not in standalone recovery mode */
+	/** Deprecated in favour of {@link #HA_JOB_MANAGER_PORT}. */
 	@Deprecated
 	public static final String RECOVERY_JOB_MANAGER_PORT = "recovery.jobmanager.port";
 
-	/** Ports used by the job manager if not in 'none' recovery mode */
-	public static final String HA_JOB_MANAGER_PORT =
-		"high-availability.jobmanager.port";
-
-	/** The time before the JobManager recovers persisted jobs */
+	/** Deprecated in favour of {@link #HA_JOB_DELAY}. */
 	@Deprecated
 	public static final String RECOVERY_JOB_DELAY = "recovery.job.delay";
 
-	/** The time before the JobManager recovers persisted jobs */
-	public static final String HA_JOB_DELAY = "high-availability.job.delay";
-
 	// --------------------------- ZooKeeper ----------------------------------
 
 	/** ZooKeeper servers. */
-	@Deprecated
-	public static final String ZOOKEEPER_QUORUM_KEY = "recovery.zookeeper.quorum";
-
-	/** ZooKeeper servers. */
-	public static final String HA_ZOOKEEPER_QUORUM_KEY =
-		"high-availability.zookeeper.quorum";
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_QUORUM_KEY = "high-availability.zookeeper.quorum";
 
 	/**
 	 * File system state backend base path for recoverable state handles. Recovery state is written
 	 * to this path and the file state handles are persisted for recovery.
 	 */
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_STORAGE_PATH = "high-availability.zookeeper.storageDir";
+
+	/** ZooKeeper root path. */
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_DIR_KEY = "high-availability.zookeeper.path.root";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_NAMESPACE_KEY = "high-availability.zookeeper.path.namespace";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_LATCH_PATH = "high-availability.zookeeper.path.latch";
+
+	/** ZooKeeper root path (ZNode) for job graphs. */
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH = "high-availability.zookeeper.path.jobgraphs";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";
+
+	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH = "high-availability.zookeeper.path.checkpoints";
+
+	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "high-availability.zookeeper.path.checkpoint-counter";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_SESSION_TIMEOUT = "high-availability.zookeeper.client.session-timeout";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_CONNECTION_TIMEOUT = "high-availability.zookeeper.client.connection-timeout";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_RETRY_WAIT = "high-availability.zookeeper.client.retry-wait";
+
+	@PublicEvolving
+	public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = "high-availability.zookeeper.client.max-retry-attempts";
+
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_QUORUM_KEY}. */
 	@Deprecated
-	public static final String ZOOKEEPER_RECOVERY_PATH = "recovery.zookeeper.storageDir";
+	public static final String ZOOKEEPER_QUORUM_KEY = "recovery.zookeeper.quorum";
 
-	/**
-	 * File system state backend base path for recoverable state handles. Recovery state is written
-	 * to this path and the file state handles are persisted for recovery.
-	 */
-	public static final String ZOOKEEPER_HA_PATH =
-		"high-availability.zookeeper.storageDir";
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_STORAGE_PATH}. */
+	@Deprecated
+	public static final String ZOOKEEPER_RECOVERY_PATH = "recovery.zookeeper.storageDir";
 
-	/** ZooKeeper root path. */
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_DIR_KEY}. */
 	@Deprecated
 	public static final String ZOOKEEPER_DIR_KEY = "recovery.zookeeper.path.root";
 
-	/** ZooKeeper root path. */
-	public static final String HA_ZOOKEEPER_DIR_KEY =
-		"high-availability.zookeeper.path.root";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_NAMESPACE_KEY}. */
 	@Deprecated
 	public static final String ZOOKEEPER_NAMESPACE_KEY = "recovery.zookeeper.path.namespace";
 
-	public static final String HA_ZOOKEEPER_NAMESPACE_KEY =
-		"high-availability.zookeeper.path.namespace";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_LATCH_PATH}. */
 	@Deprecated
 	public static final String ZOOKEEPER_LATCH_PATH = "recovery.zookeeper.path.latch";
 
-	public static final String HA_ZOOKEEPER_LATCH_PATH =
-		"high-availability.zookeeper.path.latch";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_LEADER_PATH}. */
 	@Deprecated
 	public static final String ZOOKEEPER_LEADER_PATH = "recovery.zookeeper.path.leader";
 
-	public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";
-
-	/** ZooKeeper root path (ZNode) for job graphs. */
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_JOBGRAPHS_PATH}. */
 	@Deprecated
 	public static final String ZOOKEEPER_JOBGRAPHS_PATH = "recovery.zookeeper.path.jobgraphs";
 
-	/** ZooKeeper root path (ZNode) for job graphs. */
-	public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH =
-		"high-availability.zookeeper.path.jobgraphs";
-
-	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_CHECKPOINTS_PATH}. */
 	@Deprecated
 	public static final String ZOOKEEPER_CHECKPOINTS_PATH = "recovery.zookeeper.path.checkpoints";
 
-	/** ZooKeeper root path (ZNode) for completed checkpoints. */
-	public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH =
-		"high-availability.zookeeper.path.checkpoints";
-
-	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}. */
 	@Deprecated
 	public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "recovery.zookeeper.path.checkpoint-counter";
 
-	/** ZooKeeper root path (ZNode) for checkpoint counters. */
-	public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
-		"high-availability.zookeeper.path.checkpoint-counter";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_SESSION_TIMEOUT}. */
 	@Deprecated
 	public static final String ZOOKEEPER_SESSION_TIMEOUT = "recovery.zookeeper.client.session-timeout";
 
-	public static final String HA_ZOOKEEPER_SESSION_TIMEOUT =
-		"high-availability.zookeeper.client.session-timeout";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_CONNECTION_TIMEOUT}. */
 	@Deprecated
 	public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "recovery.zookeeper.client.connection-timeout";
 
-	public static final String HA_ZOOKEEPER_CONNECTION_TIMEOUT =
-		"high-availability.zookeeper.client.connection-timeout";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_RETRY_WAIT}. */
 	@Deprecated
 	public static final String ZOOKEEPER_RETRY_WAIT = "recovery.zookeeper.client.retry-wait";
 
-	public static final String HA_ZOOKEEPER_RETRY_WAIT =
-		"high-availability.zookeeper.client.retry-wait";
-
+	/** Deprecated in favour of {@link #HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS}. */
 	@Deprecated
 	public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "recovery.zookeeper.client.max-retry-attempts";
 
-	public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS =
-		"high-availability.zookeeper.client.max-retry-attempts";
-
 	// ---------------------------- Metrics -----------------------------------
 
 	/**
@@ -1090,16 +1095,24 @@ public final class ConfigConstants {
 
 	public static final String LOCAL_START_WEBSERVER = "local.start-webserver";
 
-	// --------------------------- Recovery ---------------------------------
+	// --------------------------- High Availability ---------------------------------
+
+	@PublicEvolving
+	public static String DEFAULT_HA_MODE = "none";
+
+	/** Deprecated in favour of {@link #DEFAULT_HA_MODE} */
 	@Deprecated
 	public static String DEFAULT_RECOVERY_MODE = "standalone";
 
-	public static String DEFAULT_HIGH_AVAILABILTY = "none";
-
 	/**
 	 * Default port used by the job manager if not in standalone recovery mode. If <code>0</code>
 	 * the OS picks a random port port.
 	 */
+	@PublicEvolving
+	public static final String DEFAULT_HA_JOB_MANAGER_PORT = "0";
+
+	/** Deprecated in favour of {@link #DEFAULT_HA_JOB_MANAGER_PORT} */
+	@Deprecated
 	public static final String DEFAULT_RECOVERY_JOB_MANAGER_PORT = "0";
 
 	// --------------------------- ZooKeeper ----------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 687a39c..f7e7d58 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -104,8 +104,6 @@ KEY_ENV_JAVA_OPTS="env.java.opts"
 KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
 KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
 KEY_ENV_SSH_OPTS="env.ssh.opts"
-#deprecated
-KEY_RECOVERY_MODE="recovery.mode"
 KEY_HIGH_AVAILABILITY="high-availability"
 KEY_ZK_HEAP_MB="zookeeper.heap.mb"
 
@@ -259,25 +257,22 @@ if [ -z "${ZK_HEAP}" ]; then
     ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
 fi
 
-# for backward compatability
-if [ -z "${OLD_RECOVERY_MODE}" ]; then
-    OLD_RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}")
-fi
-
-if [ -z "${RECOVERY_MODE}" ]; then
-     # Read the new config
-     RECOVERY_MODE=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")
-     if [ -z "${RECOVERY_MODE}" ]; then
-        #no new config found. So old config should be used
-        if [ -z "${OLD_RECOVERY_MODE}" ]; then
-            # If old config is also not found, use the 'none' as the default config
-            RECOVERY_MODE="none"
-        elif [ ${OLD_RECOVERY_MODE} = "standalone" ]; then
-            # if oldconfig is 'standalone', rename to 'none'
-            RECOVERY_MODE="none"
+# High availability
+if [ -z "${HIGH_AVAILABILITY}" ]; then
+     HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")
+     if [ -z "${HIGH_AVAILABILITY}" ]; then
+        # Try deprecated value
+        DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
+        if [ -z "${DEPRECATED_HA}" ]; then
+            HIGH_AVAILABILITY="none"
+        elif [ ${DEPRECATED_HA} == "standalone" ]; then
+            # Standalone is now 'none'
+            HIGH_AVAILABILITY="none"
         else
-            RECOVERY_MODE=${OLD_RECOVERY_MODE}
+            HIGH_AVAILABILITY=${DEPRECATED_HA}
         fi
+     else
+         HIGH_AVAILABILITY="none"
      fi
 fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index 77bff1e..7611189 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -29,7 +29,7 @@ bin=`cd "$bin"; pwd`
 
 # Start the JobManager instance(s)
 shopt -s nocasematch
-if [[ $RECOVERY_MODE == "zookeeper" ]]; then
+if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
     # HA Mode
     readMasters
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
index c4d9086..bc86291 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
@@ -37,7 +37,7 @@ fi
 
 # Stop JobManager instance(s)
 shopt -s nocasematch
-if [[ $RECOVERY_MODE == "zookeeper" ]]; then
+if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then
     # HA Mode
     readMasters
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index a2586ce..27fd84a 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -131,18 +131,14 @@ jobmanager.web.port: 8081
 
 
 #==============================================================================
-# Master High Availability (required configuration)
+# High Availability
 #==============================================================================
 
-# The list of ZooKepper quorum peers that coordinate the high-availability
+# The list of ZooKeeper quorum peers that coordinate the high-availability
 # setup. This must be a list of the form:
-# "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
 #
-# high-availability: zookeeper
-#
-# recovery.zookeeper.quorum: localhost:2181,...
+# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
 #
-# Note: You need to set the state backend to 'filesystem' and the checkpoint
-# directory (see above) before configuring the storageDir.
-#
-# recovery.zookeeper.storageDir: hdfs:///recovery
+# high-availability: zookeeper
+# high-availability.zookeeper.quorum: localhost:2181
+# high-availability.zookeeper.storageDir: hdfs:///flink/ha/

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index d9edafe..54c5e76 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -146,7 +146,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		List<LeaderRetrievalService> leaderRetrievalServices = new ArrayList<>();
 
 		try (TestingServer zooKeeper = new TestingServer()) {
-			final Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+			final Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
 				zooKeeper.getConnectString(),
 				temporaryFolder.getRoot().getPath());
 
@@ -296,7 +296,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			final Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
 			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
-			config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+			config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
 			config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());
 
 			actorSystem = AkkaUtils.createDefaultActorSystem();

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index d1b78a5..ff54b67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -66,7 +66,7 @@ public class BlobServer extends Thread implements BlobService {
 	/** Is the root directory for file storage */
 	private final File storageDir;
 
-	/** Blob store for recovery */
+	/** Blob store for HA */
 	private final BlobStore blobStore;
 
 	/** Set of currently running threads */
@@ -77,7 +77,7 @@ public class BlobServer extends Thread implements BlobService {
 
 	/**
 	 * Shutdown hook thread to ensure deletion of the storage directory (or <code>null</code> if
-	 * the configured recovery mode does not equal{@link HighAvailabilityMode#NONE})
+	 * the configured high availability mode does not equal{@link HighAvailabilityMode#NONE})
 	 */
 	private final Thread shutdownHook;
 
@@ -97,15 +97,12 @@ public class BlobServer extends Thread implements BlobService {
 		this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
 		LOG.info("Created BLOB server storage directory {}", storageDir);
 
-		// No recovery.
 		if (highAvailabilityMode == HighAvailabilityMode.NONE) {
 			this.blobStore = new VoidBlobStore();
-		}
-		// Recovery.
-		else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
+		} else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) {
 			this.blobStore = new FileSystemBlobStore(config);
 		} else {
-			throw new IllegalConfigurationException("Unexpected recovery mode '" + highAvailabilityMode + ".");
+			throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + ".");
 		}
 
 		// configure the maximum number of concurrent connections

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 6ba1944..e74fa6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -336,7 +336,8 @@ public class BlobUtils {
 	 *
 	 * <p>The returned path can be used with the state backend for recovery purposes.
 	 *
-	 * <p>This follows the same scheme as {@link #getStorageLocation(File, BlobKey)}.
+	 * <p>This follows the same scheme as {@link #getStorageLocation(File, BlobKey)}
+	 * and is used for HA.
 	 */
 	static String getRecoveryPath(String basePath, BlobKey blobKey) {
 		// format: $base/cache/blob_$key

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index f535c35..ee189d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.ConfigurationUtil;
 import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Blob store backed by {@link FileSystem}.
  *
- * <p>This is used in addition to the local blob storage
+ * <p>This is used in addition to the local blob storage for high availability.
  */
 class FileSystemBlobStore implements BlobStore {
 
@@ -51,18 +52,19 @@ class FileSystemBlobStore implements BlobStore {
 	private final String basePath;
 
 	FileSystemBlobStore(Configuration config) throws IOException {
-		String recoveryPath = config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, null);
-		if(recoveryPath == null) {
-			recoveryPath = config.getString(ConfigConstants.ZOOKEEPER_HA_PATH, null);
-		}
+		String storagePath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				config,
+				ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH,
+				null,
+				ConfigConstants.ZOOKEEPER_RECOVERY_PATH);
 
-		if (recoveryPath == null) {
+		if (storagePath == null) {
 			throw new IllegalConfigurationException(String.format("Missing configuration for " +
-					"file system state backend recovery path. Please specify via " +
-					"'%s' key.", ConfigConstants.ZOOKEEPER_HA_PATH));
+					"ZooKeeper file system path. Please specify via " +
+					"'%s' key.", ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH));
 		}
 
-		this.basePath = recoveryPath + "/blob";
+		this.basePath = storagePath + "/blob";
 
 		FileSystem.get(new Path(basePath).toUri()).mkdirs(new Path(basePath));
 		LOG.info("Created blob directory {}.", basePath);

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index c2f67f1..84cbe19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -25,8 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
 /**
  * {@link CheckpointIDCounter} instances for JobManagers running in {@link HighAvailabilityMode#NONE}.
  *
- * <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job managers are not
- * recoverable in this recovery mode.
+ * <p>Simple wrapper around an {@link AtomicLong}.
  */
 public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
index 8e2efa8..087ad3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -20,18 +20,18 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ConfigurationUtil;
 
 /**
- * Recovery mode for Flink's cluster execution. Currently supported modes are:
+ * High availability mode for Flink's cluster execution. Currently supported modes are:
  *
- * - Standalone: No recovery from JobManager failures
+ * - NONE: No high availability.
  * - ZooKeeper: JobManager high availability via ZooKeeper
  * ZooKeeper is used to select a leader among a group of JobManager. This JobManager
  * is responsible for the job execution. Upon failure of the leader a new leader is elected
  * which will take over the responsibilities of the old leader
  */
 public enum HighAvailabilityMode {
-	// STANDALONE mode renamed to NONE
 	NONE,
 	ZOOKEEPER;
 
@@ -39,30 +39,24 @@ public enum HighAvailabilityMode {
 	 * Return the configured {@link HighAvailabilityMode}.
 	 *
 	 * @param config The config to parse
-	 * @return Configured recovery mode or {@link ConfigConstants#DEFAULT_HIGH_AVAILABILTY} if not
+	 * @return Configured recovery mode or {@link ConfigConstants#DEFAULT_HA_MODE} if not
 	 * configured.
 	 */
 	public static HighAvailabilityMode fromConfig(Configuration config) {
-		// Not passing the default value here so that we could determine
-		// if there is an older config set
-		String recoveryMode = config.getString(
-			ConfigConstants.HIGH_AVAILABILITY, "");
-		if (recoveryMode.isEmpty()) {
-			// New config is not set.
-			// check the older one
-			// check for older 'recover.mode' config
-			recoveryMode = config.getString(
-				ConfigConstants.RECOVERY_MODE,
-				ConfigConstants.DEFAULT_RECOVERY_MODE);
-			if (recoveryMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
-				// There is no HA configured.
-				return HighAvailabilityMode.valueOf(ConfigConstants.DEFAULT_HIGH_AVAILABILTY.toUpperCase());
-			}
-		} else if (recoveryMode.equalsIgnoreCase(ConfigConstants.DEFAULT_HIGH_AVAILABILTY)) {
-			// The new config is found but with default value. So use this
-			return HighAvailabilityMode.valueOf(ConfigConstants.DEFAULT_HIGH_AVAILABILTY.toUpperCase());
+		String haMode = ConfigurationUtil.getStringWithDeprecatedKeys(
+				config,
+				ConfigConstants.HA_MODE,
+				null,
+				ConfigConstants.RECOVERY_MODE);
+
+		if (haMode == null) {
+			return HighAvailabilityMode.NONE;
+		} else if (haMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
+			// Map old default to new default
+			return HighAvailabilityMode.NONE;
+		} else {
+			return HighAvailabilityMode.valueOf(haMode.toUpperCase());
 		}
-		return HighAvailabilityMode.valueOf(recoveryMode.toUpperCase());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 7a656cf..b6d9306 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -282,7 +282,7 @@ public class LeaderRetrievalUtils {
 	}
 
 	/**
-	 * Gets the recovery mode as configured, based on the {@link ConfigConstants#HIGH_AVAILABILITY}
+	 * Gets the recovery mode as configured, based on the {@link ConfigConstants#HA_MODE}
 	 * config key.
 	 * 
 	 * @param config The configuration to read the recovery mode from.

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 5fd6f8c..bb48c81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
+import org.apache.flink.util.ConfigurationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,48 +57,57 @@ public class ZooKeeperUtils {
 	 * @return {@link CuratorFramework} instance
 	 */
 	public static CuratorFramework startCuratorFramework(Configuration configuration) {
-		String zkQuorum = configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "");
-		if (zkQuorum.isEmpty()) {
-			zkQuorum = configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
-		}
+		String zkQuorum = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+				null,
+				ConfigConstants.ZOOKEEPER_QUORUM_KEY);
+
 		if (zkQuorum == null || zkQuorum.equals("")) {
 			throw new RuntimeException("No valid ZooKeeper quorum has been specified. " +
 					"You can specify the quorum via the configuration key '" +
 					ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY + "'.");
 		}
 
-		int sessionTimeout = getConfiguredIntValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT,
-			ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT,
-			ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
-
-		int connectionTimeout = getConfiguredIntValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT,
-			ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
-			ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
-
-		int retryWait = getConfiguredIntValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_RETRY_WAIT,
-			ConfigConstants.ZOOKEEPER_RETRY_WAIT,
-			ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
-
-		int maxRetryAttempts = getConfiguredIntValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
-			ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS,
-			ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS);
-
-		String root = getConfiguredStringValue(configuration, ConfigConstants.HA_ZOOKEEPER_DIR_KEY,
-			ConfigConstants.ZOOKEEPER_DIR_KEY,
-			ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY);
-
-		String namespace = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
-			ConfigConstants.ZOOKEEPER_NAMESPACE_KEY,
-			ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY);
+		int sessionTimeout = ConfigurationUtil.getIntegerWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_SESSION_TIMEOUT,
+				ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT,
+				ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT);
+
+		int connectionTimeout = ConfigurationUtil.getIntegerWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_CONNECTION_TIMEOUT,
+				ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT,
+				ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT);
+
+		int retryWait = ConfigurationUtil.getIntegerWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_RETRY_WAIT,
+				ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT,
+				ConfigConstants.ZOOKEEPER_RETRY_WAIT);
+
+		int maxRetryAttempts = ConfigurationUtil.getIntegerWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+				ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+				ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS);
+
+		String root = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_DIR_KEY,
+				ConfigConstants.DEFAULT_ZOOKEEPER_DIR_KEY,
+				ConfigConstants.ZOOKEEPER_DIR_KEY);
+
+		String namespace = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY,
+				ConfigConstants.DEFAULT_ZOOKEEPER_NAMESPACE_KEY,
+				ConfigConstants.ZOOKEEPER_NAMESPACE_KEY);
 
 		String rootWithNamespace = generateZookeeperPath(root, namespace);
 
-		LOG.info("Using '{}' as zookeeper namespace.", rootWithNamespace);
+		LOG.info("Using '{}' as Zookeeper namespace.", rootWithNamespace);
 
 		CuratorFramework cf = CuratorFrameworkFactory.builder()
 				.connectString(zkQuorum)
@@ -114,31 +124,6 @@ public class ZooKeeperUtils {
 		return cf;
 	}
 
-	private static int getConfiguredIntValue(Configuration configuration, String newConfigName, String oldConfigName, int defaultValue) {
-		int val = configuration.getInteger(newConfigName, -1);
-		if (val == -1) {
-			val = configuration.getInteger(
-				oldConfigName, -1);
-		}
-		// if still the val is not set use the default value
-		if (val == -1) {
-			return defaultValue;
-		}
-		return val;
-	}
-
-	private static String getConfiguredStringValue(Configuration configuration, String newConfigName, String oldConfigName, String defaultValue) {
-		String val = configuration.getString(newConfigName, "");
-		if (val.isEmpty()) {
-			val = configuration.getString(
-				oldConfigName, "");
-		}
-		// still no value found - use the default value
-		if (val.isEmpty()) {
-			return defaultValue;
-		}
-		return val;
-	}
 	/**
 	 * Returns whether {@link HighAvailabilityMode#ZOOKEEPER} is configured.
 	 */
@@ -153,10 +138,11 @@ public class ZooKeeperUtils {
 	public static String getZooKeeperEnsemble(Configuration flinkConf)
 			throws IllegalConfigurationException {
 
-		String zkQuorum = flinkConf.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "");
-		if (zkQuorum.isEmpty()) {
-			zkQuorum = flinkConf.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
-		}
+		String zkQuorum = ConfigurationUtil.getStringWithDeprecatedKeys(
+				flinkConf,
+				ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
+				"",
+				ConfigConstants.ZOOKEEPER_QUORUM_KEY);
 
 		if (zkQuorum == null || zkQuorum.equals("")) {
 			throw new IllegalConfigurationException("No ZooKeeper quorum specified in config.");
@@ -178,9 +164,11 @@ public class ZooKeeperUtils {
 	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
 			Configuration configuration) throws Exception {
 		CuratorFramework client = startCuratorFramework(configuration);
-		String leaderPath = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, ConfigConstants.ZOOKEEPER_LEADER_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+		String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
+				ConfigConstants.ZOOKEEPER_LEADER_PATH);
 
 		return new ZooKeeperLeaderRetrievalService(client, leaderPath);
 	}
@@ -213,12 +201,16 @@ public class ZooKeeperUtils {
 			CuratorFramework client,
 			Configuration configuration) throws Exception {
 
-		String latchPath = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_LATCH_PATH, ConfigConstants.ZOOKEEPER_LATCH_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH);
-		String leaderPath = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, ConfigConstants.ZOOKEEPER_LEADER_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+		String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_LATCH_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH,
+				ConfigConstants.ZOOKEEPER_LATCH_PATH);
+		String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
+				ConfigConstants.ZOOKEEPER_LEADER_PATH);
 
 		return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
 	}
@@ -239,9 +231,11 @@ public class ZooKeeperUtils {
 		StateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph");
 
 		// ZooKeeper submitted jobs root dir
-		String zooKeeperSubmittedJobsPath = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH, ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+		String zooKeeperSubmittedJobsPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH,
+				ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
 				client, zooKeeperSubmittedJobsPath, stateStorage);
@@ -266,10 +260,11 @@ public class ZooKeeperUtils {
 
 		checkNotNull(configuration, "Configuration");
 
-		String checkpointsPath = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_CHECKPOINTS_PATH,
-			ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH);
+		String checkpointsPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_CHECKPOINTS_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH,
+				ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH);
 
 		StateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage(
 			configuration,
@@ -298,10 +293,11 @@ public class ZooKeeperUtils {
 			Configuration configuration,
 			JobID jobId) throws Exception {
 
-		String checkpointIdCounterPath = getConfiguredStringValue(configuration,
-			ConfigConstants.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
-			ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+		String checkpointIdCounterPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+				ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
 
 		checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
 
@@ -321,16 +317,15 @@ public class ZooKeeperUtils {
 			Configuration configuration,
 			String prefix) throws IOException {
 
-		String rootPath = configuration.getString(
-			ConfigConstants.ZOOKEEPER_HA_PATH, "");
-		if (rootPath.isEmpty()) {
-			rootPath = configuration.getString(
-				ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
-		}
+		String rootPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+				configuration,
+				ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH,
+				"",
+				ConfigConstants.ZOOKEEPER_RECOVERY_PATH);
 
 		if (rootPath.equals("")) {
 			throw new IllegalConfigurationException("Missing recovery path. Specify via " +
-				"configuration key '" + ConfigConstants.ZOOKEEPER_HA_PATH + "'.");
+				"configuration key '" + ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH + "'.");
 		} else {
 			return new FileSystemStateStorageHelper<T>(rootPath, prefix);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5962afc..d172a2b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -82,7 +82,7 @@ import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.{InstantiationUtil, NetUtils}
+import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
 
 import org.jboss.netty.channel.ChannelException
 
@@ -155,7 +155,7 @@ class JobManager(
   /** Either running or not yet archived jobs (session hasn't been ended). */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
 
-  protected val recoveryMode = HighAvailabilityMode.fromConfig(flinkConfiguration)
+  protected val haMode = HighAvailabilityMode.fromConfig(flinkConfiguration)
 
   var leaderSessionID: Option[UUID] = None
 
@@ -317,7 +317,7 @@ class JobManager(
 
         // TODO (critical next step) This needs to be more flexible and robust (e.g. wait for task
         // managers etc.)
-        if (recoveryMode != HighAvailabilityMode.NONE) {
+        if (haMode != HighAvailabilityMode.NONE) {
           log.info(s"Delaying recovery of all jobs by $jobRecoveryTimeout.")
 
           context.system.scheduler.scheduleOnce(
@@ -2462,7 +2462,7 @@ object JobManager {
         // The port range of allowed job manager ports or 0 for random
         configuration.getString(
           ConfigConstants.RECOVERY_JOB_MANAGER_PORT,
-          ConfigConstants.DEFAULT_RECOVERY_JOB_MANAGER_PORT)
+          ConfigConstants.DEFAULT_HA_JOB_MANAGER_PORT)
       }
       else {
         LOG.info("Starting JobManager without high-availability")
@@ -2594,10 +2594,11 @@ object JobManager {
 
     val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
 
-    var jobRecoveryTimeoutStr = configuration.getString(ConfigConstants.HA_JOB_DELAY, "");
-    if (jobRecoveryTimeoutStr.isEmpty) {
-      jobRecoveryTimeoutStr = configuration.getString(ConfigConstants.RECOVERY_JOB_DELAY, "");
-    }
+    val jobRecoveryTimeoutStr = ConfigurationUtil.getStringWithDeprecatedKeys(
+      configuration,
+      ConfigConstants.HA_JOB_DELAY,
+      null,
+      ConfigConstants.RECOVERY_JOB_DELAY)
 
     val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty) {
       timeout

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 0778aae..a547d25 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -84,7 +84,7 @@ abstract class FlinkMiniCluster(
 
   implicit val timeout = AkkaUtils.getTimeout(configuration)
 
-  val recoveryMode = HighAvailabilityMode.fromConfig(configuration)
+  val haMode = HighAvailabilityMode.fromConfig(configuration)
 
   val numJobManagers = getNumberOfJobManagers
 
@@ -122,7 +122,7 @@ abstract class FlinkMiniCluster(
   // --------------------------------------------------------------------------
 
   def getNumberOfJobManagers: Int = {
-    if(recoveryMode == HighAvailabilityMode.NONE) {
+    if(haMode == HighAvailabilityMode.NONE) {
       1
     } else {
       configuration.getInteger(
@@ -133,7 +133,7 @@ abstract class FlinkMiniCluster(
   }
 
   def getNumberOfResourceManagers: Int = {
-    if(recoveryMode == HighAvailabilityMode.NONE) {
+    if(haMode == HighAvailabilityMode.NONE) {
       1
     } else {
       configuration.getInteger(
@@ -372,7 +372,7 @@ abstract class FlinkMiniCluster(
     webMonitor foreach {
       _.stop()
     }
-    
+
     val tmFutures = taskManagerActors map {
       _.map(gracefulStop(_, timeout))
     } getOrElse(Seq())
@@ -417,7 +417,7 @@ abstract class FlinkMiniCluster(
       _ foreach(_.awaitTermination())
     }
   }
-  
+
   def running = isRunning
 
   // --------------------------------------------------------------------------
@@ -466,7 +466,7 @@ abstract class FlinkMiniCluster(
   : JobExecutionResult = {
     submitJobAndWait(jobGraph, printUpdates, timeout, createLeaderRetrievalService())
   }
-  
+
   @throws(classOf[JobExecutionException])
   def submitJobAndWait(
       jobGraph: JobGraph,
@@ -524,7 +524,7 @@ abstract class FlinkMiniCluster(
   protected def createLeaderRetrievalService(): LeaderRetrievalService = {
     (jobManagerActorSystems, jobManagerActors) match {
       case (Some(jmActorSystems), Some(jmActors)) =>
-        if (recoveryMode == HighAvailabilityMode.NONE) {
+        if (haMode == HighAvailabilityMode.NONE) {
           new StandaloneLeaderRetrievalService(
             AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)))
         } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index bd4723f..8464d68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -68,9 +68,9 @@ public class BlobRecoveryITCase {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+			config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
 			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, recoveryDir.getPath());
+			config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, recoveryDir.getPath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
index cc1994a..e947744 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorRecoveryITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.client;
 
 import akka.actor.PoisonPill;
 import org.apache.curator.test.TestingServer;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -82,7 +81,7 @@ public class JobClientActorRecoveryITCase extends TestLogger {
 	public void testJobClientRecovery() throws Exception {
 		File rootFolder = tempFolder.getRoot();
 
-		Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
+		Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
 			zkServer.getConnectString(),
 			rootFolder.getPath());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index a3fe0d4..f6bed56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -63,9 +63,9 @@ public class BlobLibraryCacheRecoveryITCase {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+			config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
 			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, temporaryFolder.getRoot().getAbsolutePath());
+			config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
new file mode 100644
index 0000000..04c0e48
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class HighAvailabilityModeTest {
+
+	// Default HA mode
+	private final static HighAvailabilityMode DEFAULT_HA_MODE = HighAvailabilityMode.valueOf(
+			ConfigConstants.DEFAULT_HA_MODE.toUpperCase());
+
+	/**
+	 * Tests HA mode configuration.
+	 */
+	@Test
+	public void testFromConfig() throws Exception {
+		Configuration config = new Configuration();
+
+		// Check default
+		assertEquals(DEFAULT_HA_MODE, HighAvailabilityMode.fromConfig(config));
+
+		// Check not equals default
+		config.setString(ConfigConstants.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+		assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config));
+	}
+
+	/**
+	 * Tests HA mode configuration with deprecated config values.
+	 */
+	@Test
+	public void testDeprecatedFromConfig() throws Exception {
+		Configuration config = new Configuration();
+
+		// Check mapping of old default to new default
+		config.setString(ConfigConstants.RECOVERY_MODE, ConfigConstants.DEFAULT_RECOVERY_MODE);
+		assertEquals(DEFAULT_HA_MODE, HighAvailabilityMode.fromConfig(config));
+
+		// Check deprecated config
+		config.setString(ConfigConstants.RECOVERY_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+		assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config));
+
+		// Check precedence over deprecated config
+		config.setString(ConfigConstants.HA_MODE, HighAvailabilityMode.NONE.name().toLowerCase());
+		config.setString(ConfigConstants.RECOVERY_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+
+		assertEquals(HighAvailabilityMode.NONE, HighAvailabilityMode.fromConfig(config));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index d980517..0e1c7c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -123,8 +123,8 @@ public class JobManagerHARecoveryTest {
 		ActorRef jobManager = null;
 		ActorRef taskManager = null;
 
-		flinkConfiguration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
-		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_HA_PATH, temporaryFolder.newFolder().toString());
+		flinkConfiguration.setString(ConfigConstants.HA_MODE, "zookeeper");
+		flinkConfiguration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, temporaryFolder.newFolder().toString());
 		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 5c696ce..ed4d530 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -101,7 +101,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 	@Test
 	public void testLeaderElection() throws Exception {
 		final Configuration configuration = ZooKeeperTestUtils
-			.createZooKeeperRecoveryModeConfig(
+			.createZooKeeperHAConfig(
 				testingServer.getConnectString(),
 				tempFolder.getRoot().getPath());
 
@@ -130,7 +130,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 	@Test
 	public void testLeaderReelection() throws Exception {
 		final Configuration configuration = ZooKeeperTestUtils
-			.createZooKeeperRecoveryModeConfig(
+			.createZooKeeperHAConfig(
 				testingServer.getConnectString(),
 				tempFolder.getRoot().getPath());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 048fbee..e20985b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -90,7 +90,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	public void testZooKeeperLeaderElectionRetrieval() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -135,7 +135,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	public void testZooKeeperReelection() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 
 		Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
@@ -218,7 +218,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	public void testZooKeeperReelectionWithReplacement() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 
 		int num = 3;
 		int numTries = 30;
@@ -296,7 +296,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
@@ -380,7 +380,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	public void testExceptionForwarding() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -449,7 +449,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	public void testEphemeralZooKeeperNodes() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-		configuration.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
 
 		ZooKeeperLeaderElectionService leaderElectionService;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 5aace34..0fe0644 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -23,7 +23,6 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -44,7 +43,6 @@ import java.net.UnknownHostException;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
@@ -84,7 +82,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 
 		long sleepingTime = 1000;
 
-		config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		config.setString(ConfigConstants.HA_MODE, "zookeeper");
 		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
 
 		LeaderElectionService leaderElectionService = null;
@@ -181,7 +179,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 	@Test
 	public void testTimeoutOfFindConnectingAddress() throws Exception {
 		Configuration config = new Configuration();
-		config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+		config.setString(ConfigConstants.HA_MODE, "zookeeper");
 		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
 
 		FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
@@ -192,46 +190,6 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 		assertEquals(InetAddress.getLocalHost(), result);
 	}
 
-	@Test
-	public void testConnectionToZookeeperOverridingOldConfig() throws Exception {
-		Configuration config = new Configuration();
-		// The new config will be taken into effect
-		config.setString(ConfigConstants.RECOVERY_MODE, "standalone");
-		config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
-		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-
-		FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
-		LeaderRetrievalService leaderRetrievalService =
-			LeaderRetrievalUtils.createLeaderRetrievalService(config);
-		InetAddress result = LeaderRetrievalUtils.findConnectingAddress(leaderRetrievalService, timeout);
-
-		assertEquals(InetAddress.getLocalHost(), result);
-	}
-
-	@Test
-	public void testConnectionToStandAloneLeaderOverridingOldConfig() throws Exception {
-		Configuration config = new Configuration();
-		// The new config will be taken into effect
-		config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		config.setString(ConfigConstants.HIGH_AVAILABILITY, "none");
-		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-
-		HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(config);
-		assertTrue(mode == HighAvailabilityMode.NONE);
-	}
-
-	@Test
-	public void testConnectionToZookeeperUsingOldConfig() throws Exception {
-		Configuration config = new Configuration();
-		// The new config will be taken into effect
-		config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
-		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
-
-		HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(config);
-		assertTrue(mode == HighAvailabilityMode.ZOOKEEPER);
-	}
-
 	class FindConnectingAddress implements Runnable {
 
 		private final Configuration config;

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
index 66d523f..e8981a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -212,7 +212,7 @@ public class JobManagerProcess extends TestJvmProcess {
 		 * <code>--port PORT</code>.
 		 *
 		 * <p>Other arguments are parsed to a {@link Configuration} and passed to the
-		 * JobManager, for instance: <code>--high-availability ZOOKEEPER --recovery.zookeeper.quorum
+		 * JobManager, for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum
 		 * "xyz:123:456"</code>.
 		 */
 		public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
index 417dc88..58bc50e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -96,7 +96,7 @@ public class TaskManagerProcess extends TestJvmProcess {
 
 		/**
 		 * All arguments are parsed to a {@link Configuration} and passed to the Taskmanager,
-		 * for instance: <code>--high-availability ZOOKEEPER --recovery.zookeeper.quorum "xyz:123:456"</code>.
+		 * for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum "xyz:123:456"</code>.
 		 */
 		public static void main(String[] args) throws Exception {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index c94842f..7dd7067 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -38,10 +38,10 @@ public class ZooKeeperTestUtils {
 	 *                          recovery)
 	 * @return A new configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 	 */
-	public static Configuration createZooKeeperRecoveryModeConfig(
+	public static Configuration createZooKeeperHAConfig(
 			String zooKeeperQuorum, String fsStateHandlePath) {
 
-		return setZooKeeperRecoveryMode(new Configuration(), zooKeeperQuorum, fsStateHandlePath);
+		return configureZooKeeperHA(new Configuration(), zooKeeperQuorum, fsStateHandlePath);
 	}
 
 	/**
@@ -53,7 +53,7 @@ public class ZooKeeperTestUtils {
 	 *                          recovery)
 	 * @return The modified configuration to operate in {@link HighAvailabilityMode#ZOOKEEPER}.
 	 */
-	public static Configuration setZooKeeperRecoveryMode(
+	public static Configuration configureZooKeeperHA(
 			Configuration config,
 			String zooKeeperQuorum,
 			String fsStateHandlePath) {
@@ -66,7 +66,7 @@ public class ZooKeeperTestUtils {
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
 
 		// ZooKeeper recovery mode
-		config.setString(ConfigConstants.HIGH_AVAILABILITY, "ZOOKEEPER");
+		config.setString(ConfigConstants.HA_MODE, "ZOOKEEPER");
 		config.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, zooKeeperQuorum);
 
 		int connTimeout = 5000;
@@ -81,7 +81,7 @@ public class ZooKeeperTestUtils {
 		// File system state backend
 		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
-		config.setString(ConfigConstants.ZOOKEEPER_HA_PATH, fsStateHandlePath + "/recovery");
+		config.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, fsStateHandlePath + "/recovery");
 
 		// Akka failure detection and execution retries
 		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 7d2b86c..5628f3c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -423,8 +423,8 @@ object TestingUtils {
       prefix: String)
     : ActorGateway = {
 
-    configuration.setString(ConfigConstants.HIGH_AVAILABILITY,
-      ConfigConstants.DEFAULT_HIGH_AVAILABILTY)
+    configuration.setString(ConfigConstants.HA_MODE,
+      ConfigConstants.DEFAULT_HA_MODE)
 
       val (actor, _) = JobManager.startJobManagerActors(
         configuration,
@@ -503,8 +503,8 @@ object TestingUtils {
       configuration: Configuration)
   : ActorGateway = {
 
-    configuration.setString(ConfigConstants.HIGH_AVAILABILITY,
-      ConfigConstants.DEFAULT_HIGH_AVAILABILTY)
+    configuration.setString(ConfigConstants.HA_MODE,
+      ConfigConstants.DEFAULT_HA_MODE)
 
     val actor = FlinkResourceManager.startResourceManagerActors(
       configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/58165d69/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 7e5acee..4014b80 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -120,7 +120,7 @@ public class TestBaseUtils extends TestLogger {
 
 		if (startZooKeeper) {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-			config.setString(ConfigConstants.HIGH_AVAILABILITY, "zookeeper");
+			config.setString(ConfigConstants.HA_MODE, "zookeeper");
 		}
 
 		return startCluster(config, singleActorSystem);