You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by si...@apache.org on 2018/07/12 14:30:17 UTC

flink git commit: [FLINK-9804][state] Fix KeyedStateBackend.getKeys() for RocksDBMapState.

Repository: flink
Updated Branches:
  refs/heads/master f1ac0f279 -> def2aed5c


[FLINK-9804][state] Fix KeyedStateBackend.getKeys() for RocksDBMapState.

This closes #6306.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/def2aed5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/def2aed5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/def2aed5

Branch: refs/heads/master
Commit: def2aed5c75b5a00815186d3343e66cb1dc01ac0
Parents: f1ac0f2
Author: sihuazhou <su...@163.com>
Authored: Wed Jul 11 20:35:22 2018 +0800
Committer: sihuazhou <su...@163.com>
Committed: Thu Jul 12 22:23:54 2018 +0800

----------------------------------------------------------------------
 .../runtime/state/StateBackendTestBase.java     | 63 ++++++++++++++++++++
 .../state/RocksDBKeyedStateBackend.java         | 35 +++++++----
 2 files changed, 85 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/def2aed5/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index ad67171..3c5756b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -3841,6 +3841,69 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
+	public void testMapStateGetKeys() throws Exception {
+		final int namespace1ElementsNum = 1000;
+		final int namespace2ElementsNum = 1000;
+		String fieldName = "get-keys-test";
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+		try {
+			final String ns1 = "ns1";
+			MapState<String, Integer> keyedState1 = backend.getPartitionedState(
+				ns1,
+				StringSerializer.INSTANCE,
+				new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
+			);
+
+			for (int key = 0; key < namespace1ElementsNum; key++) {
+				backend.setCurrentKey(key);
+				keyedState1.put("he", key * 2);
+				keyedState1.put("ho", key * 2);
+			}
+
+			final String ns2 = "ns2";
+			MapState<String, Integer> keyedState2 = backend.getPartitionedState(
+				ns2,
+				StringSerializer.INSTANCE,
+				new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
+			);
+
+			for (int key = namespace1ElementsNum; key < namespace1ElementsNum + namespace2ElementsNum; key++) {
+				backend.setCurrentKey(key);
+				keyedState2.put("he", key * 2);
+				keyedState2.put("ho", key * 2);
+			}
+
+			// valid for namespace1
+			try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns1).sorted()) {
+				PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
+
+				for (int expectedKey = 0; expectedKey < namespace1ElementsNum; expectedKey++) {
+					assertTrue(actualIterator.hasNext());
+					assertEquals(expectedKey, actualIterator.nextInt());
+				}
+
+				assertFalse(actualIterator.hasNext());
+			}
+
+			// valid for namespace2
+			try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns2).sorted()) {
+				PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
+
+				for (int expectedKey = namespace1ElementsNum; expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
+					assertTrue(actualIterator.hasNext());
+					assertEquals(expectedKey, actualIterator.nextInt());
+				}
+
+				assertFalse(actualIterator.hasNext());
+			}
+		}
+		finally {
+			IOUtils.closeQuietly(backend);
+			backend.dispose();
+		}
+	}
+
+	@Test
 	public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {
 
 		CheckpointStreamFactory streamFactory = createStreamFactory();

http://git-wip-us.apache.org/repos/asf/flink/blob/def2aed5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ad26b9f..6b4e3e9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1641,6 +1641,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private final byte[] namespaceBytes;
 		private final boolean ambiguousKeyPossible;
 		private K nextKey;
+		private K previousKey;
 
 		RocksIteratorForKeysWrapper(
 			RocksIteratorWrapper iterator,
@@ -1655,6 +1656,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
 			this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes);
 			this.nextKey = null;
+			this.previousKey = null;
 			this.ambiguousKeyPossible = ambiguousKeyPossible;
 		}
 
@@ -1664,15 +1666,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				while (nextKey == null && iterator.isValid()) {
 
 					byte[] key = iterator.key();
-					if (isMatchingNameSpace(key)) {
-						ByteArrayInputStreamWithPos inputStream =
-							new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
-						DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
-						K value = RocksDBKeySerializationUtils.readKey(
-							keySerializer,
-							inputStream,
-							dataInput,
-							ambiguousKeyPossible);
+
+					ByteArrayInputStreamWithPos inputStream =
+						new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
+
+					DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
+
+					K value = RocksDBKeySerializationUtils.readKey(
+						keySerializer,
+						inputStream,
+						dataInput,
+						ambiguousKeyPossible);
+
+					int namespaceByteStartPos = inputStream.getPosition();
+
+					if (isMatchingNameSpace(key, namespaceByteStartPos) && !Objects.equals(previousKey, value)) {
+						previousKey = value;
 						nextKey = value;
 					}
 					iterator.next();
@@ -1694,12 +1703,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			return tmpKey;
 		}
 
-		private boolean isMatchingNameSpace(@Nonnull byte[] key) {
+		private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
 			final int namespaceBytesLength = namespaceBytes.length;
-			final int basicLength = namespaceBytesLength + keyGroupPrefixBytes;
+			final int basicLength = namespaceBytesLength + beginPos;
 			if (key.length >= basicLength) {
-				for (int i = 1; i <= namespaceBytesLength; ++i) {
-					if (key[key.length - i] != namespaceBytes[namespaceBytesLength - i]) {
+				for (int i = 0; i < namespaceBytesLength; ++i) {
+					if (key[beginPos + i] != namespaceBytes[i]) {
 						return false;
 					}
 				}