You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/24 12:51:50 UTC

[2/2] flink git commit: [FLINK-6690] Fix meta-data restore in RocksDBKeyedStateBackend under rescaling

[FLINK-6690] Fix meta-data restore in RocksDBKeyedStateBackend under rescaling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36830ada
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36830ada
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36830ada

Branch: refs/heads/master
Commit: 36830adacecc94beb8968eaacabda11cd91bb2de
Parents: 7639d49
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed May 24 10:15:19 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Wed May 24 14:51:27 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 51 ++++++++++--------
 .../state/heap/HeapKeyedStateBackend.java       |  2 +-
 .../test/checkpointing/RescalingITCase.java     | 55 +++++++++++++-------
 3 files changed, 65 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/36830ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 51255ab..053c820 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -155,7 +155,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * Information about the k/v states as we create them. This is used to retrieve the
 	 * column family that is used for a state and also for sanity checks when restoring.
 	 */
-	private Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
+	private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
 
 	/**
 	 * Map of state names to their corresponding restored state meta info.
@@ -163,7 +163,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * TODO this map can be removed when eager-state registration is in place.
 	 * TODO we currently need this cached to check state migration strategies when new serializers are registered.
 	 */
-	private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
+	private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
 
 	/** Number of bytes required to prefix the key groups. */
 	private final int keyGroupPrefixBytes;
@@ -172,7 +172,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private final boolean enableIncrementalCheckpointing;
 
 	/** The state handle ids of all sst files materialized in snapshots for previous checkpoints */
-	private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>();
+	private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
 
 	/** The identifier of the last completed checkpoint */
 	private long lastCompletedCheckpointId = -1;
@@ -221,8 +221,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			throw new IOException("Error cleaning RocksDB data directory.", e);
 		}
 
-		keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
-		kvStateInformation = new HashMap<>();
+		this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
+		this.kvStateInformation = new HashMap<>();
+		this.restoredKvStateMetaInfos = new HashMap<>();
+		this.materializedSstFiles = new TreeMap<>();
 	}
 
 	/**
@@ -249,6 +251,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				}
 
 				kvStateInformation.clear();
+				restoredKvStateMetaInfos.clear();
 
 				try {
 					db.close();
@@ -826,7 +829,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
 
 			// use the last completed checkpoint as the comparison base.
-			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+			synchronized (stateBackend.materializedSstFiles) {
+				baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+			}
 
 			// save meta data
 			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
@@ -885,7 +890,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 
 
-			synchronized (stateBackend.asyncSnapshotLock) {
+			synchronized (stateBackend.materializedSstFiles) {
 				stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
 			}
 
@@ -943,6 +948,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			LOG.debug("Restoring snapshot from state handles: {}.", restoreState);
 		}
 
+		// clear all meta data
+		kvStateInformation.clear();
+		restoredKvStateMetaInfos.clear();
+
 		try {
 			if (restoreState == null || restoreState.isEmpty()) {
 				createDB();
@@ -964,7 +973,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	@Override
 	public void notifyCheckpointComplete(long completedCheckpointId) {
-		synchronized (asyncSnapshotLock) {
+		synchronized (materializedSstFiles) {
 			if (completedCheckpointId < lastCompletedCheckpointId) {
 				return;
 			}
@@ -1125,13 +1134,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
 					serializationProxy.getStateMetaInfoSnapshots();
-
 			currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
-			rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size());
+			//rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size());
 
 			for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {
 
-				if (!rocksDBKeyedStateBackend.kvStateInformation.containsKey(restoredMetaInfo.getName())) {
+				Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
+					rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
+
+				if (registeredColumn == null) {
 					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
 						restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
 						rocksDBKeyedStateBackend.columnOptions);
@@ -1147,14 +1158,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 					ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
 
-					rocksDBKeyedStateBackend.kvStateInformation.put(
-						stateMetaInfo.getName(),
-						new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(columnFamily, stateMetaInfo));
+					registeredColumn = new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(columnFamily, stateMetaInfo);
+					rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn);
 
-					currentStateHandleKVStateColumnFamilies.add(columnFamily);
 				} else {
 					// TODO with eager state registration in place, check here for serializer migration strategies
 				}
+				currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
 			}
 		}
 
@@ -1313,8 +1323,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
 
-				stateBackend.restoredKvStateMetaInfos = new HashMap<>(stateMetaInfoSnapshots.size());
-
 				for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
 
 					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
@@ -1424,7 +1432,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 
 					// use the restore sst files as the base for succeeding checkpoints
-					stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
+					synchronized (stateBackend.materializedSstFiles) {
+						stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
+					}
 
 					stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
 				}
@@ -1890,11 +1900,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		Preconditions.checkState(1 == namedStates.size(), "Only one element expected here.");
 		DataInputView inputView = namedStates.values().iterator().next().stateHandle.getState(userCodeClassLoader);
 
-		// clear k/v state information before filling it
-		kvStateInformation.clear();
-
-		restoredKvStateMetaInfos = new HashMap<>(namedStates.size());
-
 		// first get the column family mapping
 		int numColumns = inputView.readInt();
 		Map<Byte, StateDescriptor<?, ?>> columnFamilyMapping = new HashMap<>(numColumns);

http://git-wip-us.apache.org/repos/asf/flink/blob/36830ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index ada6377..d4ba204 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -399,7 +399,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						.isRequiresMigration()) {
 
 						// TODO replace with state migration; note that key hash codes need to remain the same after migration
-						throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+						throw new IllegalStateException("The new key serializer is not compatible to read previous keys. " +
 							"Aborting now since state migration is currently not available");
 					}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/36830ada/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 88dd1dd..9df0d1a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -57,10 +56,12 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -81,15 +82,23 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-/**
- * TODO : parameterize to test all different state backends!
- */
+@RunWith(Parameterized.class)
 public class RescalingITCase extends TestLogger {
 
 	private static final int numTaskManagers = 2;
 	private static final int slotsPerTaskManager = 2;
 	private static final int numSlots = numTaskManagers * slotsPerTaskManager;
 
+	@Parameterized.Parameters
+	public static Object[] data() {
+		return new Object[]{"filesystem", "rocksdb"};
+	}
+
+	@Parameterized.Parameter
+	public String backend;
+
+	private String currentBackend = null;
+
 	enum OperatorCheckpointMethod {
 		NON_PARTITIONED, CHECKPOINTED_FUNCTION, CHECKPOINTED_FUNCTION_BROADCAST, LIST_CHECKPOINTED
 	}
@@ -99,25 +108,32 @@ public class RescalingITCase extends TestLogger {
 	@ClassRule
 	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-	@BeforeClass
-	public static void setup() throws Exception {
-		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager);
+	@Before
+	public void setup() throws Exception {
+		// detect parameter change
+		if (currentBackend != backend) {
+			shutDownExistingCluster();
+
+			currentBackend = backend;
 
-		final File checkpointDir = temporaryFolder.newFolder();
-		final File savepointDir = temporaryFolder.newFolder();
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager);
 
-		config.setString(CoreOptions.STATE_BACKEND, "filesystem");
-		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
-		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());
+			final File checkpointDir = temporaryFolder.newFolder();
+			final File savepointDir = temporaryFolder.newFolder();
 
-		cluster = new TestingCluster(config);
-		cluster.start();
+			config.setString(CoreOptions.STATE_BACKEND, currentBackend);
+			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
+			config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());
+
+			cluster = new TestingCluster(config);
+			cluster.start();
+		}
 	}
 
 	@AfterClass
-	public static void teardown() {
+	public static void shutDownExistingCluster() {
 		if (cluster != null) {
 			cluster.shutdown();
 			cluster.awaitTermination();
@@ -867,6 +883,7 @@ public class RescalingITCase extends TestLogger {
 
 	private static class StateSourceBase extends RichParallelSourceFunction<Integer> {
 
+		private static final long serialVersionUID = 7512206069681177940L;
 		private static volatile CountDownLatch workStartedLatch = new CountDownLatch(1);
 
 		protected volatile int counter = 0;
@@ -959,7 +976,7 @@ public class RescalingITCase extends TestLogger {
 		private static final long serialVersionUID = -359715965103593462L;
 		private static final int NUM_PARTITIONS = 7;
 
-		private ListState<Integer> counterPartitions;
+		private transient ListState<Integer> counterPartitions;
 		private boolean broadcast;
 
 		private static int[] CHECK_CORRECT_SNAPSHOT;