You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by si...@apache.org on 2018/07/12 14:30:17 UTC
flink git commit: [FLINK-9804][state] Fix KeyedStateBackend.getKeys()
for RocksDBMapState.
Repository: flink
Updated Branches:
refs/heads/master f1ac0f279 -> def2aed5c
[FLINK-9804][state] Fix KeyedStateBackend.getKeys() for RocksDBMapState.
This closes #6306.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/def2aed5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/def2aed5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/def2aed5
Branch: refs/heads/master
Commit: def2aed5c75b5a00815186d3343e66cb1dc01ac0
Parents: f1ac0f2
Author: sihuazhou <su...@163.com>
Authored: Wed Jul 11 20:35:22 2018 +0800
Committer: sihuazhou <su...@163.com>
Committed: Thu Jul 12 22:23:54 2018 +0800
----------------------------------------------------------------------
.../runtime/state/StateBackendTestBase.java | 63 ++++++++++++++++++++
.../state/RocksDBKeyedStateBackend.java | 35 +++++++----
2 files changed, 85 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/def2aed5/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index ad67171..3c5756b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -3841,6 +3841,69 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
@Test
+ public void testMapStateGetKeys() throws Exception {
+ final int namespace1ElementsNum = 1000;
+ final int namespace2ElementsNum = 1000;
+ String fieldName = "get-keys-test";
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+ try {
+ final String ns1 = "ns1";
+ MapState<String, Integer> keyedState1 = backend.getPartitionedState(
+ ns1,
+ StringSerializer.INSTANCE,
+ new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
+ );
+
+ for (int key = 0; key < namespace1ElementsNum; key++) {
+ backend.setCurrentKey(key);
+ keyedState1.put("he", key * 2);
+ keyedState1.put("ho", key * 2);
+ }
+
+ final String ns2 = "ns2";
+ MapState<String, Integer> keyedState2 = backend.getPartitionedState(
+ ns2,
+ StringSerializer.INSTANCE,
+ new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
+ );
+
+ for (int key = namespace1ElementsNum; key < namespace1ElementsNum + namespace2ElementsNum; key++) {
+ backend.setCurrentKey(key);
+ keyedState2.put("he", key * 2);
+ keyedState2.put("ho", key * 2);
+ }
+
+ // valid for namespace1
+ try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns1).sorted()) {
+ PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
+
+ for (int expectedKey = 0; expectedKey < namespace1ElementsNum; expectedKey++) {
+ assertTrue(actualIterator.hasNext());
+ assertEquals(expectedKey, actualIterator.nextInt());
+ }
+
+ assertFalse(actualIterator.hasNext());
+ }
+
+ // valid for namespace2
+ try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns2).sorted()) {
+ PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
+
+ for (int expectedKey = namespace1ElementsNum; expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
+ assertTrue(actualIterator.hasNext());
+ assertEquals(expectedKey, actualIterator.nextInt());
+ }
+
+ assertFalse(actualIterator.hasNext());
+ }
+ }
+ finally {
+ IOUtils.closeQuietly(backend);
+ backend.dispose();
+ }
+ }
+
+ @Test
public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
http://git-wip-us.apache.org/repos/asf/flink/blob/def2aed5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ad26b9f..6b4e3e9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1641,6 +1641,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final byte[] namespaceBytes;
private final boolean ambiguousKeyPossible;
private K nextKey;
+ private K previousKey;
RocksIteratorForKeysWrapper(
RocksIteratorWrapper iterator,
@@ -1655,6 +1656,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes);
this.nextKey = null;
+ this.previousKey = null;
this.ambiguousKeyPossible = ambiguousKeyPossible;
}
@@ -1664,15 +1666,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
while (nextKey == null && iterator.isValid()) {
byte[] key = iterator.key();
- if (isMatchingNameSpace(key)) {
- ByteArrayInputStreamWithPos inputStream =
- new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
- DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
- K value = RocksDBKeySerializationUtils.readKey(
- keySerializer,
- inputStream,
- dataInput,
- ambiguousKeyPossible);
+
+ ByteArrayInputStreamWithPos inputStream =
+ new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
+
+ DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
+
+ K value = RocksDBKeySerializationUtils.readKey(
+ keySerializer,
+ inputStream,
+ dataInput,
+ ambiguousKeyPossible);
+
+ int namespaceByteStartPos = inputStream.getPosition();
+
+ if (isMatchingNameSpace(key, namespaceByteStartPos) && !Objects.equals(previousKey, value)) {
+ previousKey = value;
nextKey = value;
}
iterator.next();
@@ -1694,12 +1703,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return tmpKey;
}
- private boolean isMatchingNameSpace(@Nonnull byte[] key) {
+ private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
final int namespaceBytesLength = namespaceBytes.length;
- final int basicLength = namespaceBytesLength + keyGroupPrefixBytes;
+ final int basicLength = namespaceBytesLength + beginPos;
if (key.length >= basicLength) {
- for (int i = 1; i <= namespaceBytesLength; ++i) {
- if (key[key.length - i] != namespaceBytes[namespaceBytesLength - i]) {
+ for (int i = 0; i < namespaceBytesLength; ++i) {
+ if (key[beginPos + i] != namespaceBytes[i]) {
return false;
}
}