You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/14 15:51:35 UTC

[1/2] flink git commit: [hotfix][tests] Fix minor mocking issues in AbstractStreamOperatorTest

Repository: flink
Updated Branches:
  refs/heads/master 14518067c -> bcd028d75


[hotfix][tests] Fix minor mocking issues in AbstractStreamOperatorTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bcd028d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bcd028d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bcd028d7

Branch: refs/heads/master
Commit: bcd028d75b0e5c5c691e24640a2196b2fdaf85e0
Parents: 7f42259
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon May 14 15:09:50 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon May 14 17:50:52 2018 +0200

----------------------------------------------------------------------
 .../streaming/api/operators/AbstractStreamOperatorTest.java   | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bcd028d7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 904ff64..f0195a2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -495,7 +495,7 @@ public class AbstractStreamOperatorTest {
 
 		final CloseableRegistry closeableRegistry = new CloseableRegistry();
 
-		StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
+		StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(0L, 0L));
 
 		whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
 
@@ -573,7 +573,7 @@ public class AbstractStreamOperatorTest {
 		RunnableFuture<SnapshotResult<KeyedStateHandle>> futureKeyedStateHandle = mock(RunnableFuture.class);
 		RunnableFuture<SnapshotResult<OperatorStateHandle>> futureOperatorStateHandle = mock(RunnableFuture.class);
 
-		StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
+		StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(checkpointId, timestamp));
 		when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
 		when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);
 
@@ -582,7 +582,6 @@ public class AbstractStreamOperatorTest {
 		whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
 		whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn(operatorSnapshotResult);
 
-		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
 		StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
@@ -600,7 +599,7 @@ public class AbstractStreamOperatorTest {
 		when(operatorStateBackend.snapshot(
 			eq(checkpointId),
 			eq(timestamp),
-			eq(streamFactory),
+			any(CheckpointStreamFactory.class),
 			any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);
 
 		AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);


[2/2] flink git commit: [FLINK-9355][checkpointing] Simplify configuration of local recovery to a simple on/off switch

Posted by sr...@apache.org.
[FLINK-9355][checkpointing] Simplify configuration of local recovery to a simple on/off switch

This closes #6006.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f422598
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f422598
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f422598

Branch: refs/heads/master
Commit: 7f4225987a690e85284bb356dd5e63a996f136d0
Parents: 1451806
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon May 14 11:14:47 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon May 14 17:50:52 2018 +0200

----------------------------------------------------------------------
 .../generated/checkpointing_configuration.html  |  2 +-
 .../configuration/CheckpointingOptions.java     |  6 +--
 .../runtime/state/LocalRecoveryConfig.java      | 52 +++-----------------
 .../TaskExecutorLocalStateStoresManager.java    | 12 ++---
 .../state/heap/HeapKeyedStateBackend.java       |  3 +-
 .../taskexecutor/TaskManagerServices.java       |  8 +--
 .../TaskManagerServicesConfiguration.java       | 17 ++++---
 ...TaskExecutorLocalStateStoresManagerTest.java | 16 +++---
 .../state/TaskLocalStateStoreImplTest.java      |  3 +-
 .../runtime/state/TaskStateManagerImplTest.java |  6 +--
 .../runtime/state/TestLocalRecoveryConfig.java  |  2 +-
 .../NetworkBufferCalculationTest.java           |  3 +-
 .../taskexecutor/TaskExecutorITCase.java        |  3 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 27 +++++-----
 ...askManagerComponentsStartupShutdownTest.java |  3 +-
 .../state/RocksDBKeyedStateBackend.java         | 17 ++-----
 .../StreamOperatorSnapshotRestoreTest.java      |  4 +-
 .../runtime/tasks/LocalStateForwardingTest.java |  4 +-
 .../AbstractLocalRecoveryITCase.java            | 11 ++---
 .../checkpointing/LocalRecoveryHeapITCase.java  |  5 +-
 .../LocalRecoveryRocksDBFullITCase.java         |  5 +-
 .../LocalRecoveryRocksDBIncrementalITCase.java  |  5 +-
 .../ResumeCheckpointManuallyITCase.java         |  8 +--
 23 files changed, 72 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/docs/_includes/generated/checkpointing_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/checkpointing_configuration.html b/docs/_includes/generated/checkpointing_configuration.html
index 8b4233f..2894c16 100644
--- a/docs/_includes/generated/checkpointing_configuration.html
+++ b/docs/_includes/generated/checkpointing_configuration.html
@@ -29,7 +29,7 @@
         </tr>
         <tr>
             <td><h5>state.backend.local-recovery</h5></td>
-            <td style="word-wrap: break-word;">"DISABLED"</td>
+            <td style="word-wrap: break-word;">false</td>
             <td></td>
         </tr>
         <tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index c6af7dd..596e8dd 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -67,11 +67,11 @@ public class CheckpointingOptions {
 				" this option.");
 
 	/**
-	 * This option configures local recovery for this state backend.
+	 * This option configures local recovery for this state backend. By default, local recovery is deactivated.
 	 */
-	public static final ConfigOption<String> LOCAL_RECOVERY = ConfigOptions
+	public static final ConfigOption<Boolean> LOCAL_RECOVERY = ConfigOptions
 		.key("state.backend.local-recovery")
-		.defaultValue("DISABLED");
+		.defaultValue(false);
 
 	/**
 	 * The config parameter defining the root directories for storing file-based state for local recovery.

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
index c97fa0b..fc15f5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nonnull;
 
 /**
@@ -31,57 +26,22 @@ import javax.annotation.Nonnull;
  */
 public class LocalRecoveryConfig {
 
-	/**
-	 * Enum over modes of local recovery:
-	 * <p><ul>
-	 * <li>DISABLED: disables local recovery.
-	 * <li>ENABLE_FILE_BASED: enables local recovery in a variant that is based on local files.
-	 * </ul>
-	 */
-	public enum LocalRecoveryMode {
-		DISABLED,
-		ENABLE_FILE_BASED;
-
-		/**
-		 * Extracts the {@link LocalRecoveryMode} from the given configuration. Defaults to LocalRecoveryMode.DISABLED
-		 * if no configuration value is specified or parsing the value resulted in an exception.
-		 *
-		 * @param configuration the configuration that specifies the value for the local recovery mode.
-		 * @return the local recovery mode as found in the config, or LocalRecoveryMode.DISABLED if no mode was
-		 * configured or the specified mode could not be parsed.
-		 */
-		@Nonnull
-		public static LocalRecoveryMode fromConfig(@Nonnull Configuration configuration) {
-			String localRecoveryConfString = configuration.getString(CheckpointingOptions.LOCAL_RECOVERY);
-			try {
-				return LocalRecoveryConfig.LocalRecoveryMode.valueOf(localRecoveryConfString);
-			} catch (IllegalArgumentException ex) {
-				LoggerFactory.getLogger(LocalRecoveryConfig.class).warn(
-					"Exception while parsing configuration of local recovery mode. Local recovery will be disabled.",
-					ex);
-				return LocalRecoveryConfig.LocalRecoveryMode.DISABLED;
-			}
-		}
-	}
-
 	/** The local recovery mode. */
-	@Nonnull
-	private final LocalRecoveryMode localRecoveryMode;
+	private final boolean localRecoveryEnabled;
 
 	/** Encapsulates the root directories and the subtask-specific path. */
 	@Nonnull
 	private final LocalRecoveryDirectoryProvider localStateDirectories;
 
 	public LocalRecoveryConfig(
-		@Nonnull LocalRecoveryMode localRecoveryMode,
+		boolean localRecoveryEnabled,
 		@Nonnull LocalRecoveryDirectoryProvider directoryProvider) {
-		this.localRecoveryMode = localRecoveryMode;
+		this.localRecoveryEnabled = localRecoveryEnabled;
 		this.localStateDirectories = directoryProvider;
 	}
 
-	@Nonnull
-	public LocalRecoveryMode getLocalRecoveryMode() {
-		return localRecoveryMode;
+	public boolean isLocalRecoveryEnabled() {
+		return localRecoveryEnabled;
 	}
 
 	@Nonnull
@@ -92,7 +52,7 @@ public class LocalRecoveryConfig {
 	@Override
 	public String toString() {
 		return "LocalRecoveryConfig{" +
-			"localRecoveryMode=" + localRecoveryMode +
+			"localRecoveryMode=" + localRecoveryEnabled +
 			", localStateDirectories=" + localStateDirectories +
 			'}';
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index 518ad81..4919f80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -54,7 +54,7 @@ public class TaskExecutorLocalStateStoresManager {
 	private final Map<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> taskStateStoresByAllocationID;
 
 	/** The configured mode for local recovery on this task manager. */
-	private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
+	private final boolean localRecoveryEnabled;
 
 	/** This is the root directory for all local state of this task manager / executor. */
 	private final File[] localStateRootDirectories;
@@ -71,12 +71,12 @@ public class TaskExecutorLocalStateStoresManager {
 	private boolean closed;
 
 	public TaskExecutorLocalStateStoresManager(
-		@Nonnull LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode,
+		boolean localRecoveryEnabled,
 		@Nonnull File[] localStateRootDirectories,
 		@Nonnull Executor discardExecutor) throws IOException {
 
 		this.taskStateStoresByAllocationID = new HashMap<>();
-		this.localRecoveryMode = localRecoveryMode;
+		this.localRecoveryEnabled = localRecoveryEnabled;
 		this.localStateRootDirectories = localStateRootDirectories;
 		this.discardExecutor = discardExecutor;
 		this.lock = new Object();
@@ -140,7 +140,7 @@ public class TaskExecutorLocalStateStoresManager {
 					subtaskIndex);
 
 				LocalRecoveryConfig localRecoveryConfig =
-					new LocalRecoveryConfig(localRecoveryMode, directoryProvider);
+					new LocalRecoveryConfig(localRecoveryEnabled, directoryProvider);
 
 				taskLocalStateStore = new TaskLocalStateStoreImpl(
 					jobId,
@@ -217,8 +217,8 @@ public class TaskExecutorLocalStateStoresManager {
 	}
 
 	@VisibleForTesting
-	public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() {
-		return localRecoveryMode;
+	boolean isLocalRecoveryEnabled() {
+		return localRecoveryEnabled;
 	}
 
 	@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index a02edb0..ab91ee1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -622,8 +622,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier =
 
-				LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.equals(
-					localRecoveryConfig.getLocalRecoveryMode()) ?
+				localRecoveryConfig.isLocalRecoveryEnabled() ?
 
 					() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
 						checkpointId,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ad5e22d..ad19a57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.query.QueryableStateUtils;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -255,7 +254,6 @@ public class TaskManagerServices {
 
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 
-		LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode = taskManagerServicesConfiguration.getLocalRecoveryMode();
 
 		final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
 
@@ -265,8 +263,10 @@ public class TaskManagerServices {
 			stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
 		}
 
-		final TaskExecutorLocalStateStoresManager taskStateManager =
-			new TaskExecutorLocalStateStoresManager(localRecoveryMode, stateRootDirectoryFiles, taskIOExecutor);
+		final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
+			taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
+			stateRootDirectoryFiles,
+			taskIOExecutor);
 
 		return new TaskManagerServices(
 			taskManagerLocation,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index b80320c..bf1494e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
@@ -29,7 +30,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.NetUtils;
@@ -78,13 +78,13 @@ public class TaskManagerServicesConfiguration {
 
 	private final long timerServiceShutdownTimeout;
 
-	private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
+	private final boolean localRecoveryEnabled;
 
 	public TaskManagerServicesConfiguration(
 			InetAddress taskManagerAddress,
 			String[] tmpDirPaths,
 			String[] localRecoveryStateRootDirectories,
-			LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode,
+			boolean localRecoveryEnabled,
 			NetworkEnvironmentConfiguration networkConfig,
 			QueryableStateConfiguration queryableStateConfig,
 			int numberOfSlots,
@@ -97,7 +97,7 @@ public class TaskManagerServicesConfiguration {
 		this.taskManagerAddress = checkNotNull(taskManagerAddress);
 		this.tmpDirPaths = checkNotNull(tmpDirPaths);
 		this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories);
-		this.localRecoveryMode = checkNotNull(localRecoveryMode);
+		this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled);
 		this.networkConfig = checkNotNull(networkConfig);
 		this.queryableStateConfig = checkNotNull(queryableStateConfig);
 		this.numberOfSlots = checkNotNull(numberOfSlots);
@@ -128,8 +128,8 @@ public class TaskManagerServicesConfiguration {
 		return localRecoveryStateRootDirectories;
 	}
 
-	public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() {
-		return localRecoveryMode;
+	public boolean isLocalRecoveryEnabled() {
+		return localRecoveryEnabled;
 	}
 
 	public NetworkEnvironmentConfiguration getNetworkConfig() {
@@ -209,8 +209,9 @@ public class TaskManagerServicesConfiguration {
 			localStateRootDir = tmpDirs;
 		}
 
-		LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode =
-			LocalRecoveryConfig.LocalRecoveryMode.fromConfig(configuration);
+		boolean localRecoveryMode = configuration.getBoolean(
+			CheckpointingOptions.LOCAL_RECOVERY.key(),
+			CheckpointingOptions.LOCAL_RECOVERY.defaultValue());
 
 		final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
 			configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 2e9b107..97539bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -62,7 +62,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 		config.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, rootDirString);
 
 		// test configuration of the local state mode
-		config.setString(CheckpointingOptions.LOCAL_RECOVERY, "ENABLE_FILE_BASED");
+		config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
 
 		final ResourceID tmResourceID = ResourceID.generate();
 
@@ -88,9 +88,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 		}
 
 		// verify local recovery mode
-		Assert.assertEquals(
-			LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED,
-			taskStateManager.getLocalRecoveryMode());
+		Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled());
 
 		Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
 		for (File rootDirectory : rootDirectories) {
@@ -130,9 +128,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 				localStateRootDirectories[i]);
 		}
 
-		Assert.assertEquals(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
-			taskStateManager.getLocalRecoveryMode());
+		Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled());
 	}
 
 	/**
@@ -150,7 +146,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 
 		File[] rootDirs = {temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()};
 		TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED,
+			true,
 			rootDirs,
 			Executors.directExecutor());
 
@@ -187,8 +183,8 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 
 		// test that local recovery mode is forwarded to the created store
 		Assert.assertEquals(
-			storesManager.getLocalRecoveryMode(),
-			taskLocalStateStore.getLocalRecoveryConfig().getLocalRecoveryMode());
+			storesManager.isLocalRecoveryEnabled(),
+			taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled());
 
 		Assert.assertTrue(testFile.exists());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
index 618320e..7531783 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
@@ -64,8 +64,7 @@ public class TaskLocalStateStoreImplTest {
 		LocalRecoveryDirectoryProviderImpl directoryProvider =
 			new LocalRecoveryDirectoryProviderImpl(allocationBaseDirs, jobID, jobVertexID, subtaskIdx);
 
-		LocalRecoveryConfig localRecoveryConfig =
-			new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, directoryProvider);
+		LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(false, directoryProvider);
 
 		this.taskLocalStateStore = new TaskLocalStateStoreImpl(
 			jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index f58f3f4..71038c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -193,7 +193,7 @@ public class TaskStateManagerImplTest extends TestLogger {
 				new LocalRecoveryDirectoryProviderImpl(allocBaseDirs, jobID, jobVertexID, 0);
 
 			LocalRecoveryConfig localRecoveryConfig =
-				new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, directoryProvider);
+				new LocalRecoveryConfig(true, directoryProvider);
 
 			TaskLocalStateStore taskLocalStateStore =
 				new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, 13, localRecoveryConfig, directExecutor);
@@ -220,8 +220,8 @@ public class TaskStateManagerImplTest extends TestLogger {
 			}
 
 			Assert.assertEquals(
-				localRecoveryConfFromTaskLocalStateStore.getLocalRecoveryMode(),
-				localRecoveryConfFromTaskStateManager.getLocalRecoveryMode());
+				localRecoveryConfFromTaskLocalStateStore.isLocalRecoveryEnabled(),
+				localRecoveryConfFromTaskStateManager.isLocalRecoveryEnabled());
 		} finally {
 			tmpFolder.delete();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
index 7801720..58affc5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
@@ -28,7 +28,7 @@ public class TestLocalRecoveryConfig {
 	private static final LocalRecoveryDirectoryProvider INSTANCE = new TestDummyLocalDirectoryProvider();
 
 	public static LocalRecoveryConfig disabled() {
-		return new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, INSTANCE);
+		return new LocalRecoveryConfig(false, INSTANCE);
 	}
 
 	public static class TestDummyLocalDirectoryProvider implements LocalRecoveryDirectoryProvider {

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index 2116c2f..e88f9da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.testutils.category.LegacyAndNew;
 import org.apache.flink.util.TestLogger;
@@ -102,7 +101,7 @@ public class NetworkBufferCalculationTest extends TestLogger {
 			InetAddress.getLoopbackAddress(),
 			new String[] {},
 			new String[] {},
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			networkConfig,
 			QueryableStateConfiguration.disabled(),
 			1,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 885d99f..a740bff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -50,7 +50,6 @@ import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -138,7 +137,7 @@ public class TaskExecutorITCase extends TestLogger {
 			new File[]{new File(System.getProperty("java.io.tmpdir"), "localRecovery")};
 
 		final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			taskExecutorLocalStateRootDirs,
 			rpcService.getExecutor());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 465619e..7dedb9a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -59,9 +59,9 @@ import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -75,7 +75,6 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -236,7 +235,7 @@ public class TaskExecutorTest extends TestLogger {
 			CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId)));
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
@@ -325,7 +324,7 @@ public class TaskExecutorTest extends TestLogger {
 		HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
@@ -460,7 +459,7 @@ public class TaskExecutorTest extends TestLogger {
 		);
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
@@ -545,7 +544,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
@@ -608,7 +607,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
@@ -729,7 +728,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
@@ -832,7 +831,7 @@ public class TaskExecutorTest extends TestLogger {
 		final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
@@ -942,7 +941,7 @@ public class TaskExecutorTest extends TestLogger {
 		rpc.registerGateway(jobManagerAddress, jobMasterGateway);
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
@@ -1074,7 +1073,7 @@ public class TaskExecutorTest extends TestLogger {
 		final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
@@ -1193,7 +1192,7 @@ public class TaskExecutorTest extends TestLogger {
 		final JobManagerTable jobManagerTableMock = spy(new JobManagerTable());
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
@@ -1267,7 +1266,7 @@ public class TaskExecutorTest extends TestLogger {
 		rpc.registerGateway(rmAddress, rmGateway);
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 
@@ -1323,7 +1322,7 @@ public class TaskExecutorTest extends TestLogger {
 			timerService);
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
-			LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+			false,
 			new File[]{tmp.newFolder()},
 			Executors.directExecutor());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 581d8ed..8289930 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -46,7 +46,6 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -162,7 +161,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 			network.start();
 
 			TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager(
-				LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+				false,
 				ioManager.getSpillingDirectories(),
 				Executors.directExecutor());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 90d0fc6..0ec2ef0 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1663,9 +1663,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			final SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier =
 
-				isWithLocalRecovery(
-					checkpointOptions.getCheckpointType(),
-					localRecoveryConfig.getLocalRecoveryMode()) ?
+				localRecoveryConfig.isLocalRecoveryEnabled() &&
+					(CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ?
 
 					() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
 						checkpointId,
@@ -1745,14 +1744,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
 			return AsyncStoppableTaskWithCallback.from(ioCallable);
 		}
-
-		private boolean isWithLocalRecovery(
-			CheckpointType checkpointType,
-			LocalRecoveryConfig.LocalRecoveryMode recoveryMode) {
-			// we use local recovery when it is activated and we are not taking a savepoint.
-			return LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == recoveryMode
-				&& CheckpointType.SAVEPOINT != checkpointType;
-		}
 	}
 
 	private class IncrementalSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
@@ -1792,7 +1783,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			SnapshotDirectory snapshotDirectory;
 
-			if (LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == localRecoveryConfig.getLocalRecoveryMode()) {
+			if (localRecoveryConfig.isLocalRecoveryEnabled()) {
 				// create a "permanent" snapshot directory for local recovery.
 				LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
 				File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
@@ -2299,7 +2290,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			CheckpointStreamWithResultProvider streamWithResultProvider =
 
-				LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == localRecoveryConfig.getLocalRecoveryMode() ?
+				localRecoveryConfig.isLocalRecoveryEnabled() ?
 
 					CheckpointStreamWithResultProvider.createDuplicatingStream(
 						checkpointId,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index b2b568e..6d011a3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -146,9 +146,7 @@ public class StreamOperatorSnapshotRestoreTest extends TestLogger {
 			new LocalRecoveryDirectoryProviderImpl(temporaryFolder.newFolder(), jobID, jobVertexID, subtaskIdx);
 
 		LocalRecoveryConfig localRecoveryConfig =
-			mode != ONLY_JM_RECOVERY ?
-				new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, directoryProvider) :
-				new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, directoryProvider);
+			new LocalRecoveryConfig(mode != ONLY_JM_RECOVERY, directoryProvider);
 
 		MockEnvironment mockEnvironment = new MockEnvironment(
 			jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index e35f97c..bc864a2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -177,9 +177,7 @@ public class LocalStateForwardingTest extends TestLogger {
 			jobVertexID,
 			subtaskIdx);
 
-		LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(
-			LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED,
-			directoryProvider);
+		LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(true, directoryProvider);
 
 		TaskLocalStateStore taskLocalStateStore =
 			new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, subtaskIdx, localRecoveryConfig, executor) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
index 13040c9..4e454d7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
@@ -28,7 +28,6 @@ import org.junit.rules.TestName;
 
 import java.io.IOException;
 
-import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode;
 import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum;
 
 /**
@@ -40,14 +39,14 @@ import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo
 public abstract class AbstractLocalRecoveryITCase extends TestLogger {
 
 	private final StateBackendEnum backendEnum;
-	private final LocalRecoveryMode recoveryMode;
+	private final boolean localRecoveryEnabled;
 
 	@Rule
 	public TestName testName = new TestName();
 
-	AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, LocalRecoveryMode recoveryMode) {
+	AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, boolean localRecoveryEnabled) {
 		this.backendEnum = backendEnum;
-		this.recoveryMode = recoveryMode;
+		this.localRecoveryEnabled = localRecoveryEnabled;
 	}
 
 	@Test
@@ -64,9 +63,9 @@ public abstract class AbstractLocalRecoveryITCase extends TestLogger {
 				protected Configuration createClusterConfig() throws IOException {
 					Configuration config = super.createClusterConfig();
 
-					config.setString(
+					config.setBoolean(
 						CheckpointingOptions.LOCAL_RECOVERY,
-						recoveryMode.toString());
+						localRecoveryEnabled);
 
 					return config;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
index 2c0c294..6749366 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.checkpointing;
 
-import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
 import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
 
 /**
@@ -26,8 +25,6 @@ import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo
  */
 public class LocalRecoveryHeapITCase extends AbstractLocalRecoveryITCase {
 	public LocalRecoveryHeapITCase() {
-		super(
-			FILE_ASYNC,
-			ENABLE_FILE_BASED);
+		super(FILE_ASYNC, true);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
index 16bbbfc..2d12ae2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.checkpointing;
 
-import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
 import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
 
 /**
@@ -26,8 +25,6 @@ import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo
  */
 public class LocalRecoveryRocksDBFullITCase extends AbstractLocalRecoveryITCase {
 	public LocalRecoveryRocksDBFullITCase() {
-		super(
-			ROCKSDB_FULLY_ASYNC,
-			ENABLE_FILE_BASED);
+		super(ROCKSDB_FULLY_ASYNC, true);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
index fa8e139..718d4a3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.checkpointing;
 
-import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
 import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
 
 /**
@@ -26,8 +25,6 @@ import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo
  */
 public class LocalRecoveryRocksDBIncrementalITCase extends AbstractLocalRecoveryITCase {
 	public LocalRecoveryRocksDBIncrementalITCase() {
-		super(
-			ROCKSDB_INCREMENTAL_ZK,
-			ENABLE_FILE_BASED);
+		super(ROCKSDB_INCREMENTAL_ZK, true);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 6f5bbac..aebaa63 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -253,12 +252,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
 
 		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
-
-		if (localRecovery) {
-			config.setString(
-				CheckpointingOptions.LOCAL_RECOVERY,
-				LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.toString());
-		}
+		config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, localRecovery);
 
 		// ZooKeeper recovery mode?
 		if (zooKeeperQuorum != null) {