You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/09/11 13:45:38 UTC
[flink] branch release-1.5 updated: [FLINK-10267][state] Fix
arbitrary iterator access on RocksDBMapIterator
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.5 by this push:
new d8e7741 [FLINK-10267][state] Fix arbitrary iterator access on RocksDBMapIterator
d8e7741 is described below
commit d8e7741bf4fa5d625d381dea0d5b7512b2a56372
Author: Yun Tang <my...@live.com>
AuthorDate: Thu Aug 30 19:30:56 2018 +0200
[FLINK-10267][state] Fix arbitrary iterator access on RocksDBMapIterator
---
.../flink/runtime/state/StateBackendTestBase.java | 47 ++++++++++++++++++++++
.../contrib/streaming/state/RocksDBMapState.java | 30 +++++++-------
2 files changed, 63 insertions(+), 14 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 5aec36f..d897360 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -111,6 +111,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -2749,6 +2750,52 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
/**
+ * Verify iterator of {@link MapState} supporting arbitrary access, see [FLINK-10267] to know more details.
+ */
+ @Test
+ public void testMapStateIteratorArbitraryAccess() throws Exception {
+ MapStateDescriptor<Integer, Long> kvId = new MapStateDescriptor<>("id", Integer.class, Long.class);
+
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+ try {
+ MapState<Integer, Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+ backend.setCurrentKey(1);
+ int stateSize = 4096;
+ for (int i = 0; i < stateSize; i++) {
+ state.put(i, i * 2L);
+ }
+ Iterator<Map.Entry<Integer, Long>> iterator = state.iterator();
+ int iteratorCount = 0;
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Long> entry = iterator.next();
+ assertEquals(iteratorCount, (int) entry.getKey());
+ switch (ThreadLocalRandom.current().nextInt() % 3) {
+ case 0: // remove twice
+ iterator.remove();
+ try {
+ iterator.remove();
+ fail();
+ } catch (IllegalStateException e) {
+ // ignore expected exception
+ }
+ break;
+ case 1: // hasNext -> remove
+ iterator.hasNext();
+ iterator.remove();
+ break;
+ case 2: // nothing to do
+ break;
+ }
+ iteratorCount++;
+ }
+ assertEquals(stateSize, iteratorCount);
+ } finally {
+ backend.dispose();
+ }
+ }
+
+ /**
* Verify that {@link ValueStateDescriptor} allows {@code null} as default.
*/
@Test
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 04b4af3..d1e95a0 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -506,6 +506,7 @@ public class RocksDBMapState<K, N, UK, UV>
* have the same prefix, hence we can stop iterating once coming across an
* entry with a different prefix.
*/
+ @Nonnull
private final byte[] keyPrefixBytes;
/**
@@ -516,6 +517,9 @@ public class RocksDBMapState<K, N, UK, UV>
/** A in-memory cache for the entries in the rocksdb. */
private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();
+
+ /** The entry pointing to the current position which is last returned by calling {@link #nextEntry()}. */
+ private RocksDBMapEntry currentEntry;
private int cacheIndex = 0;
private final TypeSerializer<UK> keySerializer;
@@ -542,12 +546,11 @@ public class RocksDBMapState<K, N, UK, UV>
@Override
public void remove() {
- if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) {
+ if (currentEntry == null || currentEntry.deleted) {
throw new IllegalStateException("The remove operation must be called after a valid next operation.");
}
- RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1);
- lastEntry.remove();
+ currentEntry.remove();
}
final RocksDBMapEntry nextEntry() {
@@ -561,10 +564,10 @@ public class RocksDBMapState<K, N, UK, UV>
return null;
}
- RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
+ this.currentEntry = cacheEntries.get(cacheIndex);
cacheIndex++;
- return entry;
+ return currentEntry;
}
private void loadCache() {
@@ -582,12 +585,11 @@ public class RocksDBMapState<K, N, UK, UV>
try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {
/*
- * The iteration starts from the prefix bytes at the first loading. The cache then is
- * reloaded when the next entry to return is the last one in the cache. At that time,
- * we will start the iterating from the last returned entry.
- */
- RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
- byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
+ * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
+ * the currentEntry points to the last returned entry, and at that time, we will start
+ * the iterating from currentEntry if reloading cache is needed.
+ */
+ byte[] startBytes = (currentEntry == null ? keyPrefixBytes : currentEntry.rawKeyBytes);
cacheEntries.clear();
cacheIndex = 0;
@@ -595,10 +597,10 @@ public class RocksDBMapState<K, N, UK, UV>
iterator.seek(startBytes);
/*
- * If the last returned entry is not deleted, it will be the first entry in the
- * iterating. Skip it to avoid redundant access in such cases.
+ * If the entry pointing to the current position is not removed, it will be the first entry in the
+ * new iterating. Skip it to avoid redundant access in such cases.
*/
- if (lastEntry != null && !lastEntry.deleted) {
+ if (currentEntry != null && !currentEntry.deleted) {
iterator.next();
}