You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/14 15:51:36 UTC
[2/2] flink git commit: [FLINK-9355][checkpointing] Simplify
configuration of local recovery to a simple on/off switch
[FLINK-9355][checkpointing] Simplify configuration of local recovery to a simple on/off switch
This closes #6006.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f422598
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f422598
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f422598
Branch: refs/heads/master
Commit: 7f4225987a690e85284bb356dd5e63a996f136d0
Parents: 1451806
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon May 14 11:14:47 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon May 14 17:50:52 2018 +0200
----------------------------------------------------------------------
.../generated/checkpointing_configuration.html | 2 +-
.../configuration/CheckpointingOptions.java | 6 +--
.../runtime/state/LocalRecoveryConfig.java | 52 +++-----------------
.../TaskExecutorLocalStateStoresManager.java | 12 ++---
.../state/heap/HeapKeyedStateBackend.java | 3 +-
.../taskexecutor/TaskManagerServices.java | 8 +--
.../TaskManagerServicesConfiguration.java | 17 ++++---
...TaskExecutorLocalStateStoresManagerTest.java | 16 +++---
.../state/TaskLocalStateStoreImplTest.java | 3 +-
.../runtime/state/TaskStateManagerImplTest.java | 6 +--
.../runtime/state/TestLocalRecoveryConfig.java | 2 +-
.../NetworkBufferCalculationTest.java | 3 +-
.../taskexecutor/TaskExecutorITCase.java | 3 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 27 +++++-----
...askManagerComponentsStartupShutdownTest.java | 3 +-
.../state/RocksDBKeyedStateBackend.java | 17 ++-----
.../StreamOperatorSnapshotRestoreTest.java | 4 +-
.../runtime/tasks/LocalStateForwardingTest.java | 4 +-
.../AbstractLocalRecoveryITCase.java | 11 ++---
.../checkpointing/LocalRecoveryHeapITCase.java | 5 +-
.../LocalRecoveryRocksDBFullITCase.java | 5 +-
.../LocalRecoveryRocksDBIncrementalITCase.java | 5 +-
.../ResumeCheckpointManuallyITCase.java | 8 +--
23 files changed, 72 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/docs/_includes/generated/checkpointing_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/checkpointing_configuration.html b/docs/_includes/generated/checkpointing_configuration.html
index 8b4233f..2894c16 100644
--- a/docs/_includes/generated/checkpointing_configuration.html
+++ b/docs/_includes/generated/checkpointing_configuration.html
@@ -29,7 +29,7 @@
</tr>
<tr>
<td><h5>state.backend.local-recovery</h5></td>
- <td style="word-wrap: break-word;">"DISABLED"</td>
+ <td style="word-wrap: break-word;">false</td>
<td></td>
</tr>
<tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index c6af7dd..596e8dd 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -67,11 +67,11 @@ public class CheckpointingOptions {
" this option.");
/**
- * This option configures local recovery for this state backend.
+ * This option configures local recovery for this state backend. By default, local recovery is deactivated.
*/
- public static final ConfigOption<String> LOCAL_RECOVERY = ConfigOptions
+ public static final ConfigOption<Boolean> LOCAL_RECOVERY = ConfigOptions
.key("state.backend.local-recovery")
- .defaultValue("DISABLED");
+ .defaultValue(false);
/**
* The config parameter defining the root directories for storing file-based state for local recovery.
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
index c97fa0b..fc15f5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
@@ -18,11 +18,6 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nonnull;
/**
@@ -31,57 +26,22 @@ import javax.annotation.Nonnull;
*/
public class LocalRecoveryConfig {
- /**
- * Enum over modes of local recovery:
- * <p><ul>
- * <li>DISABLED: disables local recovery.
- * <li>ENABLE_FILE_BASED: enables local recovery in a variant that is based on local files.
- * </ul>
- */
- public enum LocalRecoveryMode {
- DISABLED,
- ENABLE_FILE_BASED;
-
- /**
- * Extracts the {@link LocalRecoveryMode} from the given configuration. Defaults to LocalRecoveryMode.DISABLED
- * if no configuration value is specified or parsing the value resulted in an exception.
- *
- * @param configuration the configuration that specifies the value for the local recovery mode.
- * @return the local recovery mode as found in the config, or LocalRecoveryMode.DISABLED if no mode was
- * configured or the specified mode could not be parsed.
- */
- @Nonnull
- public static LocalRecoveryMode fromConfig(@Nonnull Configuration configuration) {
- String localRecoveryConfString = configuration.getString(CheckpointingOptions.LOCAL_RECOVERY);
- try {
- return LocalRecoveryConfig.LocalRecoveryMode.valueOf(localRecoveryConfString);
- } catch (IllegalArgumentException ex) {
- LoggerFactory.getLogger(LocalRecoveryConfig.class).warn(
- "Exception while parsing configuration of local recovery mode. Local recovery will be disabled.",
- ex);
- return LocalRecoveryConfig.LocalRecoveryMode.DISABLED;
- }
- }
- }
-
/** The local recovery mode. */
- @Nonnull
- private final LocalRecoveryMode localRecoveryMode;
+ private final boolean localRecoveryEnabled;
/** Encapsulates the root directories and the subtask-specific path. */
@Nonnull
private final LocalRecoveryDirectoryProvider localStateDirectories;
public LocalRecoveryConfig(
- @Nonnull LocalRecoveryMode localRecoveryMode,
+ boolean localRecoveryEnabled,
@Nonnull LocalRecoveryDirectoryProvider directoryProvider) {
- this.localRecoveryMode = localRecoveryMode;
+ this.localRecoveryEnabled = localRecoveryEnabled;
this.localStateDirectories = directoryProvider;
}
- @Nonnull
- public LocalRecoveryMode getLocalRecoveryMode() {
- return localRecoveryMode;
+ public boolean isLocalRecoveryEnabled() {
+ return localRecoveryEnabled;
}
@Nonnull
@@ -92,7 +52,7 @@ public class LocalRecoveryConfig {
@Override
public String toString() {
return "LocalRecoveryConfig{" +
- "localRecoveryMode=" + localRecoveryMode +
+ "localRecoveryMode=" + localRecoveryEnabled +
", localStateDirectories=" + localStateDirectories +
'}';
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index 518ad81..4919f80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -54,7 +54,7 @@ public class TaskExecutorLocalStateStoresManager {
private final Map<AllocationID, Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl>> taskStateStoresByAllocationID;
/** The configured mode for local recovery on this task manager. */
- private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
+ private final boolean localRecoveryEnabled;
/** This is the root directory for all local state of this task manager / executor. */
private final File[] localStateRootDirectories;
@@ -71,12 +71,12 @@ public class TaskExecutorLocalStateStoresManager {
private boolean closed;
public TaskExecutorLocalStateStoresManager(
- @Nonnull LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode,
+ boolean localRecoveryEnabled,
@Nonnull File[] localStateRootDirectories,
@Nonnull Executor discardExecutor) throws IOException {
this.taskStateStoresByAllocationID = new HashMap<>();
- this.localRecoveryMode = localRecoveryMode;
+ this.localRecoveryEnabled = localRecoveryEnabled;
this.localStateRootDirectories = localStateRootDirectories;
this.discardExecutor = discardExecutor;
this.lock = new Object();
@@ -140,7 +140,7 @@ public class TaskExecutorLocalStateStoresManager {
subtaskIndex);
LocalRecoveryConfig localRecoveryConfig =
- new LocalRecoveryConfig(localRecoveryMode, directoryProvider);
+ new LocalRecoveryConfig(localRecoveryEnabled, directoryProvider);
taskLocalStateStore = new TaskLocalStateStoreImpl(
jobId,
@@ -217,8 +217,8 @@ public class TaskExecutorLocalStateStoresManager {
}
@VisibleForTesting
- public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() {
- return localRecoveryMode;
+ boolean isLocalRecoveryEnabled() {
+ return localRecoveryEnabled;
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index a02edb0..ab91ee1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -622,8 +622,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier =
- LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.equals(
- localRecoveryConfig.getLocalRecoveryMode()) ?
+ localRecoveryConfig.isLocalRecoveryEnabled() ?
() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ad5e22d..ad19a57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.query.QueryableStateUtils;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -255,7 +254,6 @@ public class TaskManagerServices {
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
- LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode = taskManagerServicesConfiguration.getLocalRecoveryMode();
final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
@@ -265,8 +263,10 @@ public class TaskManagerServices {
stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
}
- final TaskExecutorLocalStateStoresManager taskStateManager =
- new TaskExecutorLocalStateStoresManager(localRecoveryMode, stateRootDirectoryFiles, taskIOExecutor);
+ final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
+ taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
+ stateRootDirectoryFiles,
+ taskIOExecutor);
return new TaskManagerServices(
taskManagerLocation,
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index b80320c..bf1494e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.taskexecutor;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
@@ -29,7 +30,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.NetUtils;
@@ -78,13 +78,13 @@ public class TaskManagerServicesConfiguration {
private final long timerServiceShutdownTimeout;
- private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
+ private final boolean localRecoveryEnabled;
public TaskManagerServicesConfiguration(
InetAddress taskManagerAddress,
String[] tmpDirPaths,
String[] localRecoveryStateRootDirectories,
- LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode,
+ boolean localRecoveryEnabled,
NetworkEnvironmentConfiguration networkConfig,
QueryableStateConfiguration queryableStateConfig,
int numberOfSlots,
@@ -97,7 +97,7 @@ public class TaskManagerServicesConfiguration {
this.taskManagerAddress = checkNotNull(taskManagerAddress);
this.tmpDirPaths = checkNotNull(tmpDirPaths);
this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories);
- this.localRecoveryMode = checkNotNull(localRecoveryMode);
+ this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled);
this.networkConfig = checkNotNull(networkConfig);
this.queryableStateConfig = checkNotNull(queryableStateConfig);
this.numberOfSlots = checkNotNull(numberOfSlots);
@@ -128,8 +128,8 @@ public class TaskManagerServicesConfiguration {
return localRecoveryStateRootDirectories;
}
- public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() {
- return localRecoveryMode;
+ public boolean isLocalRecoveryEnabled() {
+ return localRecoveryEnabled;
}
public NetworkEnvironmentConfiguration getNetworkConfig() {
@@ -209,8 +209,9 @@ public class TaskManagerServicesConfiguration {
localStateRootDir = tmpDirs;
}
- LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode =
- LocalRecoveryConfig.LocalRecoveryMode.fromConfig(configuration);
+ boolean localRecoveryMode = configuration.getBoolean(
+ CheckpointingOptions.LOCAL_RECOVERY.key(),
+ CheckpointingOptions.LOCAL_RECOVERY.defaultValue());
final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
configuration,
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 2e9b107..97539bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -62,7 +62,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
config.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, rootDirString);
// test configuration of the local state mode
- config.setString(CheckpointingOptions.LOCAL_RECOVERY, "ENABLE_FILE_BASED");
+ config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
final ResourceID tmResourceID = ResourceID.generate();
@@ -88,9 +88,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
}
// verify local recovery mode
- Assert.assertEquals(
- LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED,
- taskStateManager.getLocalRecoveryMode());
+ Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled());
Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
for (File rootDirectory : rootDirectories) {
@@ -130,9 +128,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
localStateRootDirectories[i]);
}
- Assert.assertEquals(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
- taskStateManager.getLocalRecoveryMode());
+ Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled());
}
/**
@@ -150,7 +146,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
File[] rootDirs = {temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()};
TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED,
+ true,
rootDirs,
Executors.directExecutor());
@@ -187,8 +183,8 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
// test that local recovery mode is forwarded to the created store
Assert.assertEquals(
- storesManager.getLocalRecoveryMode(),
- taskLocalStateStore.getLocalRecoveryConfig().getLocalRecoveryMode());
+ storesManager.isLocalRecoveryEnabled(),
+ taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled());
Assert.assertTrue(testFile.exists());
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
index 618320e..7531783 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
@@ -64,8 +64,7 @@ public class TaskLocalStateStoreImplTest {
LocalRecoveryDirectoryProviderImpl directoryProvider =
new LocalRecoveryDirectoryProviderImpl(allocationBaseDirs, jobID, jobVertexID, subtaskIdx);
- LocalRecoveryConfig localRecoveryConfig =
- new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, directoryProvider);
+ LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(false, directoryProvider);
this.taskLocalStateStore = new TaskLocalStateStoreImpl(
jobID,
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index f58f3f4..71038c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -193,7 +193,7 @@ public class TaskStateManagerImplTest extends TestLogger {
new LocalRecoveryDirectoryProviderImpl(allocBaseDirs, jobID, jobVertexID, 0);
LocalRecoveryConfig localRecoveryConfig =
- new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, directoryProvider);
+ new LocalRecoveryConfig(true, directoryProvider);
TaskLocalStateStore taskLocalStateStore =
new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, 13, localRecoveryConfig, directExecutor);
@@ -220,8 +220,8 @@ public class TaskStateManagerImplTest extends TestLogger {
}
Assert.assertEquals(
- localRecoveryConfFromTaskLocalStateStore.getLocalRecoveryMode(),
- localRecoveryConfFromTaskStateManager.getLocalRecoveryMode());
+ localRecoveryConfFromTaskLocalStateStore.isLocalRecoveryEnabled(),
+ localRecoveryConfFromTaskStateManager.isLocalRecoveryEnabled());
} finally {
tmpFolder.delete();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
index 7801720..58affc5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
@@ -28,7 +28,7 @@ public class TestLocalRecoveryConfig {
private static final LocalRecoveryDirectoryProvider INSTANCE = new TestDummyLocalDirectoryProvider();
public static LocalRecoveryConfig disabled() {
- return new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, INSTANCE);
+ return new LocalRecoveryConfig(false, INSTANCE);
}
public static class TestDummyLocalDirectoryProvider implements LocalRecoveryDirectoryProvider {
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index 2116c2f..e88f9da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.testutils.category.LegacyAndNew;
import org.apache.flink.util.TestLogger;
@@ -102,7 +101,7 @@ public class NetworkBufferCalculationTest extends TestLogger {
InetAddress.getLoopbackAddress(),
new String[] {},
new String[] {},
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
networkConfig,
QueryableStateConfiguration.disabled(),
1,
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 885d99f..a740bff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -50,7 +50,6 @@ import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -138,7 +137,7 @@ public class TaskExecutorITCase extends TestLogger {
new File[]{new File(System.getProperty("java.io.tmpdir"), "localRecovery")};
final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
taskExecutorLocalStateRootDirs,
rpcService.getExecutor());
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 465619e..7dedb9a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -59,9 +59,9 @@ import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
-import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -75,7 +75,6 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -236,7 +235,7 @@ public class TaskExecutorTest extends TestLogger {
CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId)));
TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
@@ -325,7 +324,7 @@ public class TaskExecutorTest extends TestLogger {
HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
@@ -460,7 +459,7 @@ public class TaskExecutorTest extends TestLogger {
);
TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
@@ -545,7 +544,7 @@ public class TaskExecutorTest extends TestLogger {
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
@@ -608,7 +607,7 @@ public class TaskExecutorTest extends TestLogger {
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
@@ -729,7 +728,7 @@ public class TaskExecutorTest extends TestLogger {
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
@@ -832,7 +831,7 @@ public class TaskExecutorTest extends TestLogger {
final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
@@ -942,7 +941,7 @@ public class TaskExecutorTest extends TestLogger {
rpc.registerGateway(jobManagerAddress, jobMasterGateway);
TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
@@ -1074,7 +1073,7 @@ public class TaskExecutorTest extends TestLogger {
final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
@@ -1193,7 +1192,7 @@ public class TaskExecutorTest extends TestLogger {
final JobManagerTable jobManagerTableMock = spy(new JobManagerTable());
TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
@@ -1267,7 +1266,7 @@ public class TaskExecutorTest extends TestLogger {
rpc.registerGateway(rmAddress, rmGateway);
TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
@@ -1323,7 +1322,7 @@ public class TaskExecutorTest extends TestLogger {
timerService);
TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 581d8ed..8289930 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -46,7 +46,6 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -162,7 +161,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
network.start();
TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager(
- LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+ false,
ioManager.getSpillingDirectories(),
Executors.directExecutor());
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 90d0fc6..0ec2ef0 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1663,9 +1663,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
final SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier =
- isWithLocalRecovery(
- checkpointOptions.getCheckpointType(),
- localRecoveryConfig.getLocalRecoveryMode()) ?
+ localRecoveryConfig.isLocalRecoveryEnabled() &&
+ (CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ?
() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
@@ -1745,14 +1744,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
return AsyncStoppableTaskWithCallback.from(ioCallable);
}
-
- private boolean isWithLocalRecovery(
- CheckpointType checkpointType,
- LocalRecoveryConfig.LocalRecoveryMode recoveryMode) {
- // we use local recovery when it is activated and we are not taking a savepoint.
- return LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == recoveryMode
- && CheckpointType.SAVEPOINT != checkpointType;
- }
}
private class IncrementalSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
@@ -1792,7 +1783,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
SnapshotDirectory snapshotDirectory;
- if (LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == localRecoveryConfig.getLocalRecoveryMode()) {
+ if (localRecoveryConfig.isLocalRecoveryEnabled()) {
// create a "permanent" snapshot directory for local recovery.
LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
@@ -2299,7 +2290,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
CheckpointStreamWithResultProvider streamWithResultProvider =
- LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED == localRecoveryConfig.getLocalRecoveryMode() ?
+ localRecoveryConfig.isLocalRecoveryEnabled() ?
CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index b2b568e..6d011a3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -146,9 +146,7 @@ public class StreamOperatorSnapshotRestoreTest extends TestLogger {
new LocalRecoveryDirectoryProviderImpl(temporaryFolder.newFolder(), jobID, jobVertexID, subtaskIdx);
LocalRecoveryConfig localRecoveryConfig =
- mode != ONLY_JM_RECOVERY ?
- new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, directoryProvider) :
- new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, directoryProvider);
+ new LocalRecoveryConfig(mode != ONLY_JM_RECOVERY, directoryProvider);
MockEnvironment mockEnvironment = new MockEnvironment(
jobID,
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index e35f97c..bc864a2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -177,9 +177,7 @@ public class LocalStateForwardingTest extends TestLogger {
jobVertexID,
subtaskIdx);
- LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(
- LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED,
- directoryProvider);
+ LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(true, directoryProvider);
TaskLocalStateStore taskLocalStateStore =
new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, subtaskIdx, localRecoveryConfig, executor) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
index 13040c9..4e454d7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
@@ -28,7 +28,6 @@ import org.junit.rules.TestName;
import java.io.IOException;
-import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode;
import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum;
/**
@@ -40,14 +39,14 @@ import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo
public abstract class AbstractLocalRecoveryITCase extends TestLogger {
private final StateBackendEnum backendEnum;
- private final LocalRecoveryMode recoveryMode;
+ private final boolean localRecoveryEnabled;
@Rule
public TestName testName = new TestName();
- AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, LocalRecoveryMode recoveryMode) {
+ AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, boolean localRecoveryEnabled) {
this.backendEnum = backendEnum;
- this.recoveryMode = recoveryMode;
+ this.localRecoveryEnabled = localRecoveryEnabled;
}
@Test
@@ -64,9 +63,9 @@ public abstract class AbstractLocalRecoveryITCase extends TestLogger {
protected Configuration createClusterConfig() throws IOException {
Configuration config = super.createClusterConfig();
- config.setString(
+ config.setBoolean(
CheckpointingOptions.LOCAL_RECOVERY,
- recoveryMode.toString());
+ localRecoveryEnabled);
return config;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
index 2c0c294..6749366 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.checkpointing;
-import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
/**
@@ -26,8 +25,6 @@ import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo
*/
public class LocalRecoveryHeapITCase extends AbstractLocalRecoveryITCase {
public LocalRecoveryHeapITCase() {
- super(
- FILE_ASYNC,
- ENABLE_FILE_BASED);
+ super(FILE_ASYNC, true);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
index 16bbbfc..2d12ae2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.checkpointing;
-import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
/**
@@ -26,8 +25,6 @@ import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo
*/
public class LocalRecoveryRocksDBFullITCase extends AbstractLocalRecoveryITCase {
public LocalRecoveryRocksDBFullITCase() {
- super(
- ROCKSDB_FULLY_ASYNC,
- ENABLE_FILE_BASED);
+ super(ROCKSDB_FULLY_ASYNC, true);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
index fa8e139..718d4a3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.checkpointing;
-import static org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
/**
@@ -26,8 +25,6 @@ import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpo
*/
public class LocalRecoveryRocksDBIncrementalITCase extends AbstractLocalRecoveryITCase {
public LocalRecoveryRocksDBIncrementalITCase() {
- super(
- ROCKSDB_INCREMENTAL_ZK,
- ENABLE_FILE_BASED);
+ super(ROCKSDB_INCREMENTAL_ZK, true);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7f422598/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 6f5bbac..aebaa63 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -253,12 +252,7 @@ public class ResumeCheckpointManuallyITCase extends TestLogger {
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
-
- if (localRecovery) {
- config.setString(
- CheckpointingOptions.LOCAL_RECOVERY,
- LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.toString());
- }
+ config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, localRecovery);
// ZooKeeper recovery mode?
if (zooKeeperQuorum != null) {