You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/11 13:25:42 UTC

[GitHub] asfgit closed pull request #6638: [FLINK-10267][State] Fix arbitrary iterator access on RocksDBMapIterator

asfgit closed pull request #6638: [FLINK-10267][State] Fix arbitrary iterator access on RocksDBMapIterator
URL: https://github.com/apache/flink/pull/6638
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 649c6d03a6a..2634268c947 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -117,6 +117,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
@@ -2916,6 +2917,52 @@ public void testMapState() throws Exception {
 		backend.dispose();
 	}
 
+	/**
+	 * Verify iterator of {@link MapState} supporting arbitrary access, see [FLINK-10267] to know more details.
+	 */
+	@Test
+	public void testMapStateIteratorArbitraryAccess() throws Exception {
+		MapStateDescriptor<Integer, Long> kvId = new MapStateDescriptor<>("id", Integer.class, Long.class);
+
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		try {
+			MapState<Integer, Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			backend.setCurrentKey(1);
+			int stateSize = 4096;
+			for (int i = 0; i < stateSize; i++) {
+				state.put(i, i * 2L);
+			}
+			Iterator<Map.Entry<Integer, Long>> iterator = state.iterator();
+			int iteratorCount = 0;
+			while (iterator.hasNext()) {
+				Map.Entry<Integer, Long> entry = iterator.next();
+				assertEquals(iteratorCount, (int) entry.getKey());
+				switch (ThreadLocalRandom.current().nextInt() % 3) {
+					case 0: // remove twice
+						iterator.remove();
+						try {
+							iterator.remove();
+							fail();
+						} catch (IllegalStateException e) {
+							// ignore expected exception
+						}
+						break;
+					case 1: // hasNext -> remove
+						iterator.hasNext();
+						iterator.remove();
+						break;
+					case 2: // nothing to do
+						break;
+				}
+				iteratorCount++;
+			}
+			assertEquals(stateSize, iteratorCount);
+		} finally {
+			backend.dispose();
+		}
+	}
+
 	/**
 	 * Verify that {@link ValueStateDescriptor} allows {@code null} as default.
 	 */
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 5c9f7f9f30c..cb656b53b1b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -498,6 +498,7 @@ public UV setValue(UV value) {
 		 * have the same prefix, hence we can stop iterating once coming across an
 		 * entry with a different prefix.
 		 */
+		@Nonnull
 		private final byte[] keyPrefixBytes;
 
 		/**
@@ -508,6 +509,9 @@ public UV setValue(UV value) {
 
 		/** A in-memory cache for the entries in the rocksdb. */
 		private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();
+
+		/** The entry pointing to the current position which is last returned by calling {@link #nextEntry()}. */
+		private RocksDBMapEntry currentEntry;
 		private int cacheIndex = 0;
 
 		private final TypeSerializer<UK> keySerializer;
@@ -537,12 +541,11 @@ public boolean hasNext() {
 
 		@Override
 		public void remove() {
-			if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) {
+			if (currentEntry == null || currentEntry.deleted) {
 				throw new IllegalStateException("The remove operation must be called after a valid next operation.");
 			}
 
-			RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1);
-			lastEntry.remove();
+			currentEntry.remove();
 		}
 
 		final RocksDBMapEntry nextEntry() {
@@ -556,10 +559,10 @@ final RocksDBMapEntry nextEntry() {
 				return null;
 			}
 
-			RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
+			this.currentEntry = cacheEntries.get(cacheIndex);
 			cacheIndex++;
 
-			return entry;
+			return currentEntry;
 		}
 
 		private void loadCache() {
@@ -577,12 +580,11 @@ private void loadCache() {
 			try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {
 
 				/*
-				 * The iteration starts from the prefix bytes at the first loading. The cache then is
-				 * reloaded when the next entry to return is the last one in the cache. At that time,
-				 * we will start the iterating from the last returned entry.
- 				 */
-				RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
-				byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
+				 * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
+				 * the currentEntry points to the last returned entry, and at that time, we will start
+				 * the iterating from currentEntry if reloading cache is needed.
+				 */
+				byte[] startBytes = (currentEntry == null ? keyPrefixBytes : currentEntry.rawKeyBytes);
 
 				cacheEntries.clear();
 				cacheIndex = 0;
@@ -590,10 +592,10 @@ private void loadCache() {
 				iterator.seek(startBytes);
 
 				/*
-				 * If the last returned entry is not deleted, it will be the first entry in the
-				 * iterating. Skip it to avoid redundant access in such cases.
+				 * If the entry pointing to the current position is not removed, it will be the first entry in the
+				 * new iterating. Skip it to avoid redundant access in such cases.
 				 */
-				if (lastEntry != null && !lastEntry.deleted) {
+				if (currentEntry != null && !currentEntry.deleted) {
 					iterator.next();
 				}
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services