You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/09/11 13:45:38 UTC

[flink] branch release-1.5 updated: [FLINK-10267][state] Fix arbitrary iterator access on RocksDBMapIterator

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new d8e7741  [FLINK-10267][state] Fix arbitrary iterator access on RocksDBMapIterator
d8e7741 is described below

commit d8e7741bf4fa5d625d381dea0d5b7512b2a56372
Author: Yun Tang <my...@live.com>
AuthorDate: Thu Aug 30 19:30:56 2018 +0200

    [FLINK-10267][state] Fix arbitrary iterator access on RocksDBMapIterator
---
 .../flink/runtime/state/StateBackendTestBase.java  | 47 ++++++++++++++++++++++
 .../contrib/streaming/state/RocksDBMapState.java   | 30 +++++++-------
 2 files changed, 63 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 5aec36f..d897360 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -111,6 +111,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
@@ -2749,6 +2750,52 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	/**
+	 * Verify iterator of {@link MapState} supporting arbitrary access, see [FLINK-10267] to know more details.
+	 */
+	@Test
+	public void testMapStateIteratorArbitraryAccess() throws Exception {
+		MapStateDescriptor<Integer, Long> kvId = new MapStateDescriptor<>("id", Integer.class, Long.class);
+
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		try {
+			MapState<Integer, Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			backend.setCurrentKey(1);
+			int stateSize = 4096;
+			for (int i = 0; i < stateSize; i++) {
+				state.put(i, i * 2L);
+			}
+			Iterator<Map.Entry<Integer, Long>> iterator = state.iterator();
+			int iteratorCount = 0;
+			while (iterator.hasNext()) {
+				Map.Entry<Integer, Long> entry = iterator.next();
+				assertEquals(iteratorCount, (int) entry.getKey());
+				switch (ThreadLocalRandom.current().nextInt() % 3) {
+					case 0: // remove twice
+						iterator.remove();
+						try {
+							iterator.remove();
+							fail();
+						} catch (IllegalStateException e) {
+							// ignore expected exception
+						}
+						break;
+					case 1: // hasNext -> remove
+						iterator.hasNext();
+						iterator.remove();
+						break;
+					case 2: // nothing to do
+						break;
+				}
+				iteratorCount++;
+			}
+			assertEquals(stateSize, iteratorCount);
+		} finally {
+			backend.dispose();
+		}
+	}
+
+	/**
 	 * Verify that {@link ValueStateDescriptor} allows {@code null} as default.
 	 */
 	@Test
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 04b4af3..d1e95a0 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -506,6 +506,7 @@ public class RocksDBMapState<K, N, UK, UV>
 		 * have the same prefix, hence we can stop iterating once coming across an
 		 * entry with a different prefix.
 		 */
+		@Nonnull
 		private final byte[] keyPrefixBytes;
 
 		/**
@@ -516,6 +517,9 @@ public class RocksDBMapState<K, N, UK, UV>
 
 		/** A in-memory cache for the entries in the rocksdb. */
 		private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();
+
+		/** The entry pointing to the current position which is last returned by calling {@link #nextEntry()}. */
+		private RocksDBMapEntry currentEntry;
 		private int cacheIndex = 0;
 
 		private final TypeSerializer<UK> keySerializer;
@@ -542,12 +546,11 @@ public class RocksDBMapState<K, N, UK, UV>
 
 		@Override
 		public void remove() {
-			if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) {
+			if (currentEntry == null || currentEntry.deleted) {
 				throw new IllegalStateException("The remove operation must be called after a valid next operation.");
 			}
 
-			RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1);
-			lastEntry.remove();
+			currentEntry.remove();
 		}
 
 		final RocksDBMapEntry nextEntry() {
@@ -561,10 +564,10 @@ public class RocksDBMapState<K, N, UK, UV>
 				return null;
 			}
 
-			RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
+			this.currentEntry = cacheEntries.get(cacheIndex);
 			cacheIndex++;
 
-			return entry;
+			return currentEntry;
 		}
 
 		private void loadCache() {
@@ -582,12 +585,11 @@ public class RocksDBMapState<K, N, UK, UV>
 			try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {
 
 				/*
-				 * The iteration starts from the prefix bytes at the first loading. The cache then is
-				 * reloaded when the next entry to return is the last one in the cache. At that time,
-				 * we will start the iterating from the last returned entry.
- 				 */
-				RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
-				byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
+				 * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
+				 * the currentEntry points to the last returned entry, and at that time, we will start
+				 * the iterating from currentEntry if reloading cache is needed.
+				 */
+				byte[] startBytes = (currentEntry == null ? keyPrefixBytes : currentEntry.rawKeyBytes);
 
 				cacheEntries.clear();
 				cacheIndex = 0;
@@ -595,10 +597,10 @@ public class RocksDBMapState<K, N, UK, UV>
 				iterator.seek(startBytes);
 
 				/*
-				 * If the last returned entry is not deleted, it will be the first entry in the
-				 * iterating. Skip it to avoid redundant access in such cases.
+				 * If the entry pointing to the current position is not removed, it will be the first entry in the
+				 * new iterating. Skip it to avoid redundant access in such cases.
 				 */
-				if (lastEntry != null && !lastEntry.deleted) {
+				if (currentEntry != null && !currentEntry.deleted) {
 					iterator.next();
 				}