You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/05/14 12:31:31 UTC

flink git commit: [FLINK-9336] [state] Set the userKeyOffset in every MapEntry.

Repository: flink
Updated Branches:
  refs/heads/master e2cc83176 -> 14518067c


[FLINK-9336] [state] Set the userKeyOffset in every MapEntry.

Previously the userKeyOffset in RocksDB  was set when serializing
the key and namespace. This was prone to failure as different code
paths followed by queryable state were not setting it appropriately.

This closes #5993.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14518067
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14518067
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14518067

Branch: refs/heads/master
Commit: 14518067cc22b928ba8e230413d0a74237a3ceec
Parents: e2cc831
Author: sihuazhou <su...@163.com>
Authored: Fri May 11 17:15:07 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Mon May 14 14:30:31 2018 +0200

----------------------------------------------------------------------
 .../runtime/state/StateBackendTestBase.java     | 84 +++++++++++---------
 .../streaming/state/RocksDBMapState.java        | 14 ++--
 2 files changed, 54 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/14518067/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 784b628..b809d84 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -2591,45 +2591,53 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	@SuppressWarnings("unchecked,rawtypes")
 	public void testMapState() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+		AbstractKeyedStateBackend<String> backend = createKeyedBackend(StringSerializer.INSTANCE);
 
 		MapStateDescriptor<Integer, String> kvId = new MapStateDescriptor<>("id", Integer.class, String.class);
 
-		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+		TypeSerializer<String> keySerializer = StringSerializer.INSTANCE;
 		TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
 
 		MapState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<Integer, VoidNamespace, Map<Integer, String>> kvState = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) state;
+		InternalKvState<String, VoidNamespace, Map<Integer, String>> kvState = (InternalKvState<String, VoidNamespace, Map<Integer, String>>) state;
 
 		// these are only available after the backend initialized the serializer
 		TypeSerializer<Integer> userKeySerializer = kvId.getKeySerializer();
 		TypeSerializer<String> userValueSerializer = kvId.getValueSerializer();
 
 		// some modifications to the state
-		backend.setCurrentKey(1);
+		backend.setCurrentKey("1");
 		assertNull(state.get(1));
-		assertNull(getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		assertNull(getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 		state.put(1, "1");
-		backend.setCurrentKey(2);
+		backend.setCurrentKey("2");
 		assertNull(state.get(2));
-		assertNull(getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		assertNull(getSerializedMap(kvState, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 		state.put(2, "2");
-		backend.setCurrentKey(1);
+
+		// put entry with different userKeyOffset
+		backend.setCurrentKey("11");
+		state.put(11, "11");
+
+		backend.setCurrentKey("1");
 		assertTrue(state.contains(1));
 		assertEquals("1", state.get(1));
 		assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
-				getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+				getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+
+		assertEquals(new HashMap<Integer, String>() {{ put (11, "11"); }},
+			getSerializedMap(kvState, "11", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
 		// draw a snapshot
 		KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
 
 		// make some more modifications
-		backend.setCurrentKey(1);
+		backend.setCurrentKey("1");
 		state.put(1, "101");
-		backend.setCurrentKey(2);
+		backend.setCurrentKey("2");
 		state.put(102, "102");
-		backend.setCurrentKey(3);
+		backend.setCurrentKey("3");
 		state.put(103, "103");
 		state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
 
@@ -2637,19 +2645,19 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
 
 		// validate the original state
-		backend.setCurrentKey(1);
+		backend.setCurrentKey("1");
 		assertEquals("101", state.get(1));
 		assertEquals(new HashMap<Integer, String>() {{ put(1, "101"); }},
-				getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-		backend.setCurrentKey(2);
+				getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey("2");
 		assertEquals("102", state.get(102));
 		assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }},
-				getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-		backend.setCurrentKey(3);
+				getSerializedMap(kvState, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey("3");
 		assertTrue(state.contains(103));
 		assertEquals("103", state.get(103));
 		assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
-				getSerializedMap(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+				getSerializedMap(kvState, "3", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
 		List<Integer> keys = new ArrayList<>();
 		for (Integer key : state.keys()) {
@@ -2670,11 +2678,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertTrue(values.isEmpty());
 
 		// make some more modifications
-		backend.setCurrentKey(1);
+		backend.setCurrentKey("1");
 		state.clear();
-		backend.setCurrentKey(2);
+		backend.setCurrentKey("2");
 		state.remove(102);
-		backend.setCurrentKey(3);
+		backend.setCurrentKey("3");
 		final String updateSuffix = "_updated";
 		Iterator<Map.Entry<Integer, String>> iterator = state.iterator();
 		while (iterator.hasNext()) {
@@ -2687,10 +2695,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 
 		// validate the state
-		backend.setCurrentKey(1);
-		backend.setCurrentKey(2);
+		backend.setCurrentKey("1");
+		backend.setCurrentKey("2");
 		assertFalse(state.contains(102));
-		backend.setCurrentKey(3);
+		backend.setCurrentKey("3");
 		for (Map.Entry<Integer, String> entry : state.entries()) {
 			assertEquals(4 + updateSuffix.length(), entry.getValue().length());
 			assertTrue(entry.getValue().endsWith(updateSuffix));
@@ -2698,44 +2706,44 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		backend.dispose();
 		// restore the first snapshot and validate it
-		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+		backend = restoreKeyedBackend(StringSerializer.INSTANCE, snapshot1);
 		snapshot1.discardState();
 
 		MapState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<Integer, VoidNamespace, Map<Integer, String>> restoredKvState1 = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) restored1;
+		InternalKvState<String, VoidNamespace, Map<Integer, String>> restoredKvState1 = (InternalKvState<String, VoidNamespace, Map<Integer, String>>) restored1;
 
-		backend.setCurrentKey(1);
+		backend.setCurrentKey("1");
 		assertEquals("1", restored1.get(1));
 		assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
-				getSerializedMap(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-		backend.setCurrentKey(2);
+				getSerializedMap(restoredKvState1, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey("2");
 		assertEquals("2", restored1.get(2));
 		assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }},
-				getSerializedMap(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+				getSerializedMap(restoredKvState1, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
 		backend.dispose();
 		// restore the second snapshot and validate it
-		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+		backend = restoreKeyedBackend(StringSerializer.INSTANCE, snapshot2);
 		snapshot2.discardState();
 
 		@SuppressWarnings("unchecked")
 		MapState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
-		InternalKvState<Integer, VoidNamespace, Map<Integer, String>> restoredKvState2 = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) restored2;
+		InternalKvState<String, VoidNamespace, Map<Integer, String>> restoredKvState2 = (InternalKvState<String, VoidNamespace, Map<Integer, String>>) restored2;
 
-		backend.setCurrentKey(1);
+		backend.setCurrentKey("1");
 		assertEquals("101", restored2.get(1));
 		assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); }},
-				getSerializedMap(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-		backend.setCurrentKey(2);
+				getSerializedMap(restoredKvState2, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey("2");
 		assertEquals("102", restored2.get(102));
 		assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put (102, "102"); }},
-				getSerializedMap(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-		backend.setCurrentKey(3);
+				getSerializedMap(restoredKvState2, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey("3");
 		assertEquals("103", restored2.get(103));
 		assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
-				getSerializedMap(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+				getSerializedMap(restoredKvState2, "3", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
 		backend.dispose();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/14518067/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 56a7cc4..219f3ae 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -67,9 +67,6 @@ public class RocksDBMapState<K, N, UK, UV>
 	private final TypeSerializer<UK> userKeySerializer;
 	private final TypeSerializer<UV> userValueSerializer;
 
-	/** The offset of User Key offset in raw key bytes. */
-	private int userKeyOffset;
-
 	/**
 	 * Creates a new {@code RocksDBMapState}.
 	 *
@@ -305,7 +302,6 @@ public class RocksDBMapState<K, N, UK, UV>
 
 	private byte[] serializeCurrentKeyAndNamespace() throws IOException {
 		writeCurrentKeyWithGroupAndNamespace();
-		userKeyOffset = keySerializationStream.getPosition();
 
 		return keySerializationStream.toByteArray();
 	}
@@ -338,7 +334,7 @@ public class RocksDBMapState<K, N, UK, UV>
 		return keySerializationStream.toByteArray();
 	}
 
-	private UK deserializeUserKey(byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
+	private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
 		ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes);
 		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
 
@@ -380,18 +376,23 @@ public class RocksDBMapState<K, N, UK, UV>
 
 		private UV userValue;
 
+		/** The offset of User Key offset in raw key bytes. */
+		private final int userKeyOffset;
+
 		private TypeSerializer<UK> keySerializer;
 
 		private TypeSerializer<UV> valueSerializer;
 
 		RocksDBMapEntry(
 				@Nonnull final RocksDB db,
+				@Nonnull final int userKeyOffset,
 				@Nonnull final byte[] rawKeyBytes,
 				@Nonnull final byte[] rawValueBytes,
 				@Nonnull final TypeSerializer<UK> keySerializer,
 				@Nonnull final TypeSerializer<UV> valueSerializer) {
 			this.db = db;
 
+			this.userKeyOffset = userKeyOffset;
 			this.keySerializer = keySerializer;
 			this.valueSerializer = valueSerializer;
 
@@ -415,7 +416,7 @@ public class RocksDBMapState<K, N, UK, UV>
 		public UK getKey() {
 			if (userKey == null) {
 				try {
-					userKey = deserializeUserKey(rawKeyBytes, keySerializer);
+					userKey = deserializeUserKey(userKeyOffset, rawKeyBytes, keySerializer);
 				} catch (IOException e) {
 					throw new RuntimeException("Error while deserializing the user key.", e);
 				}
@@ -583,6 +584,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
 					RocksDBMapEntry entry = new RocksDBMapEntry(
 						db,
+						keyPrefixBytes.length,
 						iterator.key(),
 						iterator.value(),
 						keySerializer,