You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/09/11 13:23:55 UTC

[flink] branch master updated: [FLINK-10267][state] Fix arbitrary iterator access on RocksDBMapIterator

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 16dc597  [FLINK-10267][state] Fix arbitrary iterator access on RocksDBMapIterator
16dc597 is described below

commit 16dc5978c7a845a598b672dacca565e870f1a5c7
Author: Yun Tang <my...@live.com>
AuthorDate: Fri Aug 31 01:30:56 2018 +0800

    [FLINK-10267][state] Fix arbitrary iterator access on RocksDBMapIterator
    
    This closes #6638.
---
 .../flink/runtime/state/StateBackendTestBase.java  | 47 ++++++++++++++++++++++
 .../contrib/streaming/state/RocksDBMapState.java   | 30 +++++++-------
 2 files changed, 63 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 649c6d0..2634268 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -117,6 +117,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
@@ -2917,6 +2918,52 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	/**
+	 * Verify iterator of {@link MapState} supporting arbitrary access, see [FLINK-10267] to know more details.
+	 */
+	@Test
+	public void testMapStateIteratorArbitraryAccess() throws Exception {
+		MapStateDescriptor<Integer, Long> kvId = new MapStateDescriptor<>("id", Integer.class, Long.class);
+
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		try {
+			MapState<Integer, Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			backend.setCurrentKey(1);
+			int stateSize = 4096;
+			for (int i = 0; i < stateSize; i++) {
+				state.put(i, i * 2L);
+			}
+			Iterator<Map.Entry<Integer, Long>> iterator = state.iterator();
+			int iteratorCount = 0;
+			while (iterator.hasNext()) {
+				Map.Entry<Integer, Long> entry = iterator.next();
+				assertEquals(iteratorCount, (int) entry.getKey());
+				switch (ThreadLocalRandom.current().nextInt() % 3) {
+					case 0: // remove twice
+						iterator.remove();
+						try {
+							iterator.remove();
+							fail();
+						} catch (IllegalStateException e) {
+							// ignore expected exception
+						}
+						break;
+					case 1: // hasNext -> remove
+						iterator.hasNext();
+						iterator.remove();
+						break;
+					case 2: // nothing to do
+						break;
+				}
+				iteratorCount++;
+			}
+			assertEquals(stateSize, iteratorCount);
+		} finally {
+			backend.dispose();
+		}
+	}
+
+	/**
 	 * Verify that {@link ValueStateDescriptor} allows {@code null} as default.
 	 */
 	@Test
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 5c9f7f9..cb656b5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -498,6 +498,7 @@ class RocksDBMapState<K, N, UK, UV>
 		 * have the same prefix, hence we can stop iterating once coming across an
 		 * entry with a different prefix.
 		 */
+		@Nonnull
 		private final byte[] keyPrefixBytes;
 
 		/**
@@ -508,6 +509,9 @@ class RocksDBMapState<K, N, UK, UV>
 
 		/** A in-memory cache for the entries in the rocksdb. */
 		private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();
+
+		/** The entry pointing to the current position which is last returned by calling {@link #nextEntry()}. */
+		private RocksDBMapEntry currentEntry;
 		private int cacheIndex = 0;
 
 		private final TypeSerializer<UK> keySerializer;
@@ -537,12 +541,11 @@ class RocksDBMapState<K, N, UK, UV>
 
 		@Override
 		public void remove() {
-			if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) {
+			if (currentEntry == null || currentEntry.deleted) {
 				throw new IllegalStateException("The remove operation must be called after a valid next operation.");
 			}
 
-			RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1);
-			lastEntry.remove();
+			currentEntry.remove();
 		}
 
 		final RocksDBMapEntry nextEntry() {
@@ -556,10 +559,10 @@ class RocksDBMapState<K, N, UK, UV>
 				return null;
 			}
 
-			RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
+			this.currentEntry = cacheEntries.get(cacheIndex);
 			cacheIndex++;
 
-			return entry;
+			return currentEntry;
 		}
 
 		private void loadCache() {
@@ -577,12 +580,11 @@ class RocksDBMapState<K, N, UK, UV>
 			try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {
 
 				/*
-				 * The iteration starts from the prefix bytes at the first loading. The cache then is
-				 * reloaded when the next entry to return is the last one in the cache. At that time,
-				 * we will start the iterating from the last returned entry.
- 				 */
-				RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
-				byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
+				 * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
+				 * the currentEntry points to the last returned entry, and at that time, we will start
+				 * the iterating from currentEntry if reloading cache is needed.
+				 */
+				byte[] startBytes = (currentEntry == null ? keyPrefixBytes : currentEntry.rawKeyBytes);
 
 				cacheEntries.clear();
 				cacheIndex = 0;
@@ -590,10 +592,10 @@ class RocksDBMapState<K, N, UK, UV>
 				iterator.seek(startBytes);
 
 				/*
-				 * If the last returned entry is not deleted, it will be the first entry in the
-				 * iterating. Skip it to avoid redundant access in such cases.
+				 * If the entry pointing to the current position is not removed, it will be the first entry in the
+				 * new iterating. Skip it to avoid redundant access in such cases.
 				 */
-				if (lastEntry != null && !lastEntry.deleted) {
+				if (currentEntry != null && !currentEntry.deleted) {
 					iterator.next();
 				}