You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/24 12:51:50 UTC
[2/2] flink git commit: [FLINK-6690] Fix meta-data restore in
RocksDBKeyedStateBackend under rescaling
[FLINK-6690] Fix meta-data restore in RocksDBKeyedStateBackend under rescaling
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36830ada
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36830ada
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36830ada
Branch: refs/heads/master
Commit: 36830adacecc94beb8968eaacabda11cd91bb2de
Parents: 7639d49
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed May 24 10:15:19 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Wed May 24 14:51:27 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 51 ++++++++++--------
.../state/heap/HeapKeyedStateBackend.java | 2 +-
.../test/checkpointing/RescalingITCase.java | 55 +++++++++++++-------
3 files changed, 65 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/36830ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 51255ab..053c820 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -155,7 +155,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* Information about the k/v states as we create them. This is used to retrieve the
* column family that is used for a state and also for sanity checks when restoring.
*/
- private Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
+ private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
/**
* Map of state names to their corresponding restored state meta info.
@@ -163,7 +163,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* TODO this map can be removed when eager-state registration is in place.
* TODO we currently need this cached to check state migration strategies when new serializers are registered.
*/
- private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
+ private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
/** Number of bytes required to prefix the key groups. */
private final int keyGroupPrefixBytes;
@@ -172,7 +172,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final boolean enableIncrementalCheckpointing;
/** The state handle ids of all sst files materialized in snapshots for previous checkpoints */
- private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>();
+ private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
/** The identifier of the last completed checkpoint */
private long lastCompletedCheckpointId = -1;
@@ -221,8 +221,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
throw new IOException("Error cleaning RocksDB data directory.", e);
}
- keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
- kvStateInformation = new HashMap<>();
+ this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
+ this.kvStateInformation = new HashMap<>();
+ this.restoredKvStateMetaInfos = new HashMap<>();
+ this.materializedSstFiles = new TreeMap<>();
}
/**
@@ -249,6 +251,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
kvStateInformation.clear();
+ restoredKvStateMetaInfos.clear();
try {
db.close();
@@ -826,7 +829,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
// use the last completed checkpoint as the comparison base.
- baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+ synchronized (stateBackend.materializedSstFiles) {
+ baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+ }
// save meta data
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
@@ -885,7 +890,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
- synchronized (stateBackend.asyncSnapshotLock) {
+ synchronized (stateBackend.materializedSstFiles) {
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
}
@@ -943,6 +948,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
LOG.debug("Restoring snapshot from state handles: {}.", restoreState);
}
+ // clear all meta data
+ kvStateInformation.clear();
+ restoredKvStateMetaInfos.clear();
+
try {
if (restoreState == null || restoreState.isEmpty()) {
createDB();
@@ -964,7 +973,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
- synchronized (asyncSnapshotLock) {
+ synchronized (materializedSstFiles) {
if (completedCheckpointId < lastCompletedCheckpointId) {
return;
}
@@ -1125,13 +1134,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();
-
currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
- rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size());
+ //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size());
for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {
- if (!rocksDBKeyedStateBackend.kvStateInformation.containsKey(restoredMetaInfo.getName())) {
+ Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
+ rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
+
+ if (registeredColumn == null) {
ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
rocksDBKeyedStateBackend.columnOptions);
@@ -1147,14 +1158,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
- rocksDBKeyedStateBackend.kvStateInformation.put(
- stateMetaInfo.getName(),
- new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(columnFamily, stateMetaInfo));
+ registeredColumn = new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(columnFamily, stateMetaInfo);
+ rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn);
- currentStateHandleKVStateColumnFamilies.add(columnFamily);
} else {
// TODO with eager state registration in place, check here for serializer migration strategies
}
+ currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
}
}
@@ -1313,8 +1323,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
- stateBackend.restoredKvStateMetaInfos = new HashMap<>(stateMetaInfoSnapshots.size());
-
for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
@@ -1424,7 +1432,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// use the restore sst files as the base for succeeding checkpoints
- stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
+ synchronized (stateBackend.materializedSstFiles) {
+ stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
+ }
stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
}
@@ -1890,11 +1900,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
Preconditions.checkState(1 == namedStates.size(), "Only one element expected here.");
DataInputView inputView = namedStates.values().iterator().next().stateHandle.getState(userCodeClassLoader);
- // clear k/v state information before filling it
- kvStateInformation.clear();
-
- restoredKvStateMetaInfos = new HashMap<>(namedStates.size());
-
// first get the column family mapping
int numColumns = inputView.readInt();
Map<Byte, StateDescriptor<?, ?>> columnFamilyMapping = new HashMap<>(numColumns);
http://git-wip-us.apache.org/repos/asf/flink/blob/36830ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index ada6377..d4ba204 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -399,7 +399,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
.isRequiresMigration()) {
// TODO replace with state migration; note that key hash codes need to remain the same after migration
- throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+ throw new IllegalStateException("The new key serializer is not compatible to read previous keys. " +
"Aborting now since state migration is currently not available");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/36830ada/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 88dd1dd..9df0d1a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -57,10 +56,12 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
@@ -81,15 +82,23 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-/**
- * TODO : parameterize to test all different state backends!
- */
+@RunWith(Parameterized.class)
public class RescalingITCase extends TestLogger {
private static final int numTaskManagers = 2;
private static final int slotsPerTaskManager = 2;
private static final int numSlots = numTaskManagers * slotsPerTaskManager;
+ @Parameterized.Parameters
+ public static Object[] data() {
+ return new Object[]{"filesystem", "rocksdb"};
+ }
+
+ @Parameterized.Parameter
+ public String backend;
+
+ private String currentBackend = null;
+
enum OperatorCheckpointMethod {
NON_PARTITIONED, CHECKPOINTED_FUNCTION, CHECKPOINTED_FUNCTION_BROADCAST, LIST_CHECKPOINTED
}
@@ -99,25 +108,32 @@ public class RescalingITCase extends TestLogger {
@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();
- @BeforeClass
- public static void setup() throws Exception {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager);
+ @Before
+ public void setup() throws Exception {
+ // detect parameter change
+ if (currentBackend != backend) {
+ shutDownExistingCluster();
+
+ currentBackend = backend;
- final File checkpointDir = temporaryFolder.newFolder();
- final File savepointDir = temporaryFolder.newFolder();
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager);
- config.setString(CoreOptions.STATE_BACKEND, "filesystem");
- config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
- config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());
+ final File checkpointDir = temporaryFolder.newFolder();
+ final File savepointDir = temporaryFolder.newFolder();
- cluster = new TestingCluster(config);
- cluster.start();
+ config.setString(CoreOptions.STATE_BACKEND, currentBackend);
+ config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
+ config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());
+
+ cluster = new TestingCluster(config);
+ cluster.start();
+ }
}
@AfterClass
- public static void teardown() {
+ public static void shutDownExistingCluster() {
if (cluster != null) {
cluster.shutdown();
cluster.awaitTermination();
@@ -867,6 +883,7 @@ public class RescalingITCase extends TestLogger {
private static class StateSourceBase extends RichParallelSourceFunction<Integer> {
+ private static final long serialVersionUID = 7512206069681177940L;
private static volatile CountDownLatch workStartedLatch = new CountDownLatch(1);
protected volatile int counter = 0;
@@ -959,7 +976,7 @@ public class RescalingITCase extends TestLogger {
private static final long serialVersionUID = -359715965103593462L;
private static final int NUM_PARTITIONS = 7;
- private ListState<Integer> counterPartitions;
+ private transient ListState<Integer> counterPartitions;
private boolean broadcast;
private static int[] CHECK_CORRECT_SNAPSHOT;