You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/17 13:15:45 UTC

flink git commit: [FLINK-9373][statebackend] Introduce RocksIteratorWrapper to wrap `Seek(), Next(), SeekToFirst(), SeekToLast(), SeekForPrev(), and Prev()` to check the iterator status.

Repository: flink
Updated Branches:
  refs/heads/master 3df780902 -> 105b30686


[FLINK-9373][statebackend] Introduce RocksIteratorWrapper to wrap `Seek(), Next(), SeekToFirst(), SeekToLast(), SeekForPrev(), and Prev()` to check the iterator status.

This closes #6020.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/105b3068
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/105b3068
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/105b3068

Branch: refs/heads/master
Commit: 105b30686f76dc2d6affcf6cdf3eb41f322df53a
Parents: 3df7809
Author: sihuazhou <su...@163.com>
Authored: Wed May 16 15:47:05 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 15:15:13 2018 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  68 ++++++---
 .../streaming/state/RocksDBMapState.java        |   3 +-
 .../streaming/state/RocksIteratorWrapper.java   | 103 +++++++++++++
 .../state/RocksDBMergeIteratorTest.java         |   8 +-
 .../RocksDBRocksIteratorForKeysWrapperTest.java | 152 ++++++++++++++++++
 .../state/RocksDBRocksIteratorWrapperTest.java  | 153 -------------------
 .../state/benchmark/RocksDBPerformanceTest.java |   4 +-
 7 files changed, 304 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0ec2ef0..0de16c2 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -99,7 +99,6 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
 import org.rocksdb.Snapshot;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
@@ -322,7 +321,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
 		}
 
-		RocksIterator iterator = db.newIterator(columnInfo.f0);
+		RocksIteratorWrapper iterator = getRocksIterator(db, columnInfo.f0);
 		iterator.seekToFirst();
 
 		final RocksIteratorForKeysWrapper<K> iteratorWrapper = new RocksIteratorForKeysWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
@@ -734,7 +733,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					restoreInstance((IncrementalLocalKeyedStateHandle) rawStateHandle);
 				} else {
 					throw new IllegalStateException("Unexpected state handle type, " +
-						"expected " + IncrementalKeyedStateHandle.class +
+						"expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
 						", but found " + rawStateHandle.getClass());
 				}
 			}
@@ -1079,7 +1078,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 						ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
 
-						try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
+						try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) {
 
 							int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
 							byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
@@ -1309,7 +1308,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		int count = 0;
 
 		for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) {
-			try (RocksIterator rocksIterator = db.newIterator(column.f0)) {
+			try (RocksIteratorWrapper rocksIterator = getRocksIterator(db, column.f0)) {
 				rocksIterator.seekToFirst();
 
 				while (rocksIterator.isValid()) {
@@ -1357,7 +1356,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			}
 		}
 
-		RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) {
+		RocksDBMergeIterator(List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) throws RocksDBException {
 			Preconditions.checkNotNull(kvStateIterators);
 			Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
 
@@ -1369,8 +1368,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				PriorityQueue<MergeIterator> iteratorPriorityQueue =
 					new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
 
-				for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
-					final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0;
+				for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
+					final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
 					rocksIterator.seekToFirst();
 					if (rocksIterator.isValid()) {
 						iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
@@ -1398,15 +1397,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after
 		 * calls to {@link #next()}.
 		 */
-		public void next() {
+		public void next() throws RocksDBException {
 			newKeyGroup = false;
 			newKVState = false;
 
-			final RocksIterator rocksIterator = currentSubIterator.getIterator();
+			final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator();
 			rocksIterator.next();
 
 			byte[] oldKey = currentSubIterator.getCurrentKey();
 			if (rocksIterator.isValid()) {
+
 				currentSubIterator.currentKey = rocksIterator.key();
 
 				if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
@@ -1523,13 +1523,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @param iterator  The #RocksIterator to wrap .
 		 * @param kvStateId Id of the K/V state to which this iterator belongs.
 		 */
-		MergeIterator(RocksIterator iterator, int kvStateId) {
+		MergeIterator(RocksIteratorWrapper iterator, int kvStateId) {
 			this.iterator = Preconditions.checkNotNull(iterator);
 			this.currentKey = iterator.key();
 			this.kvStateId = kvStateId;
 		}
 
-		private final RocksIterator iterator;
+		private final RocksIteratorWrapper iterator;
 		private byte[] currentKey;
 		private final int kvStateId;
 
@@ -1541,7 +1541,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			this.currentKey = currentKey;
 		}
 
-		public RocksIterator getIterator() {
+		public RocksIteratorWrapper getIterator() {
 			return iterator;
 		}
 
@@ -1556,13 +1556,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	/**
-	 * Adapter class to bridge between {@link RocksIterator} and {@link Iterator} to iterate over the keys. This class
+	 * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class
 	 * is not thread safe.
 	 *
 	 * @param <K> the type of the iterated objects, which are keys in RocksDB.
 	 */
 	static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseable {
-		private final RocksIterator iterator;
+		private final RocksIteratorWrapper iterator;
 		private final String state;
 		private final TypeSerializer<K> keySerializer;
 		private final int keyGroupPrefixBytes;
@@ -1571,7 +1571,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private K nextKey;
 
 		RocksIteratorForKeysWrapper(
-			RocksIterator iterator,
+			RocksIteratorWrapper iterator,
 			String state,
 			TypeSerializer<K> keySerializer,
 			int keyGroupPrefixBytes,
@@ -1588,8 +1588,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		@Override
 		public boolean hasNext() {
-			while (nextKey == null && iterator.isValid()) {
-				try {
+			try {
+				while (nextKey == null && iterator.isValid()) {
+
 					byte[] key = iterator.key();
 					if (isMatchingNameSpace(key)) {
 						ByteArrayInputStreamWithPos inputStream =
@@ -1603,9 +1604,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						nextKey = value;
 					}
 					iterator.next();
-				} catch (IOException e) {
-					throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
 				}
+			} catch (Exception e) {
+				throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
 			}
 			return nextKey != null;
 		}
@@ -1869,7 +1870,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 */
 		private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
 
-		private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
+		private List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators;
 
 		private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
 		private DataOutputView outputView;
@@ -1928,7 +1929,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 *
 		 * @throws IOException
 		 */
-		public void writeDBSnapshot() throws IOException, InterruptedException {
+		public void writeDBSnapshot() throws IOException, InterruptedException, RocksDBException {
 
 			if (null == snapshot) {
 				throw new IOException("No snapshot available. Might be released due to cancellation.");
@@ -1966,7 +1967,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			checkpointStreamWithResultProvider = null;
 
 			if (null != kvStateIterators) {
-				for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) {
+				for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : kvStateIterators) {
 					IOUtils.closeQuietly(kvStateIterator.f0);
 				}
 				kvStateIterators = null;
@@ -2001,7 +2002,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			for (ColumnFamilyHandle columnFamilyHandle : copiedColumnFamilyHandles) {
 
 				kvStateIterators.add(
-					new Tuple2<>(stateBackend.db.newIterator(columnFamilyHandle, readOptions), kvStateId));
+					new Tuple2<>(getRocksIterator(stateBackend.db, columnFamilyHandle, readOptions), kvStateId));
 
 				++kvStateId;
 			}
@@ -2019,7 +2020,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			serializationProxy.write(outputView);
 		}
 
-		private void writeKVStateData() throws IOException, InterruptedException {
+		private void writeKVStateData() throws IOException, InterruptedException, RocksDBException {
 
 			byte[] previousKey = null;
 			byte[] previousValue = null;
@@ -2519,4 +2520,21 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			}
 		}
 	}
+
+	public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
+		return new RocksIteratorWrapper(db.newIterator());
+	}
+
+	public static RocksIteratorWrapper getRocksIterator(
+		RocksDB db,
+		ColumnFamilyHandle columnFamilyHandle) {
+		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
+	}
+
+	public static RocksIteratorWrapper getRocksIterator(
+		RocksDB db,
+		ColumnFamilyHandle columnFamilyHandle,
+		ReadOptions readOptions) {
+		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 219f3ae..b84b7a2 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -34,7 +34,6 @@ import org.apache.flink.util.Preconditions;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -549,7 +548,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
 			// use try-with-resources to ensure RocksIterator can be release even some runtime exception
 			// occurred in the below code block.
-			try (RocksIterator iterator = db.newIterator(columnFamily)) {
+			try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {
 
 				/*
 				 * The iteration starts from the prefix bytes at the first loading. The cache then is

http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java
new file mode 100644
index 0000000..f98e3f5
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksIteratorInterface;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+
+/**
+ * This is a wrapper around {@link RocksIterator} to check the iterator status for all the methods mentioned
+ * to require this check in the wiki documentation: seek, next, seekToFirst, seekToLast, seekForPrev, and prev.
+ * This is required because the iterator may pass the blocks or files it had difficulties in reading (because
+ * of IO error, data corruption or other issues) and continue with the next available keys. The status flag may not be
+ * OK, even if the iterator is valid. More information can be found
+ * <a href="https://github.com/facebook/rocksdb/wiki/Iterator#error-handling">here</a>.
+ */
+public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
+
+	private RocksIterator iterator;
+
+	public RocksIteratorWrapper(@Nonnull RocksIterator iterator) {
+		this.iterator = iterator;
+	}
+
+	@Override
+	public boolean isValid() {
+		return this.iterator.isValid();
+	}
+
+	@Override
+	public void seekToFirst() {
+		iterator.seekToFirst();
+		status();
+	}
+
+	@Override
+	public void seekToLast() {
+		iterator.seekToFirst();
+		status();
+	}
+
+	@Override
+	public void seek(byte[] target) {
+		iterator.seek(target);
+		status();
+	}
+
+	@Override
+	public void next() {
+		iterator.next();
+		status();
+	}
+
+	@Override
+	public void prev() {
+		iterator.prev();
+		status();
+	}
+
+	@Override
+	public void status() {
+		try {
+			iterator.status();
+		} catch (RocksDBException ex) {
+			throw new FlinkRuntimeException("Internal exception found in RocksDB", ex);
+		}
+	}
+
+	public byte[] key() {
+		return iterator.key();
+	}
+
+	public byte[] value() {
+		return iterator.value();
+	}
+
+	@Override
+	public void close() {
+		iterator.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index 19f49f8..cb2b202 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -30,10 +30,8 @@ import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
 
 import java.io.DataOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -52,7 +50,7 @@ public class RocksDBMergeIteratorTest {
 	public TemporaryFolder tempFolder = new TemporaryFolder();
 
 	@Test
-	public void testEmptyMergeIterator() throws IOException {
+	public void testEmptyMergeIterator() throws Exception {
 		RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
 				new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.emptyList(), 2);
 		Assert.assertFalse(emptyIterator.isValid());
@@ -76,7 +74,7 @@ public class RocksDBMergeIteratorTest {
 		Random random = new Random(1234);
 
 		try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
-			List<Tuple2<RocksIterator, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
+			List<Tuple2<RocksIteratorWrapper, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
 			List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>();
 
 			int totalKeysExpected = 0;
@@ -109,7 +107,7 @@ public class RocksDBMergeIteratorTest {
 
 			int id = 0;
 			for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
-				rocksIteratorsWithKVStateId.add(new Tuple2<>(rocksDB.newIterator(columnFamilyHandle.f0), id));
+				rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBKeyedStateBackend.getRocksIterator(rocksDB, columnFamilyHandle.f0), id));
 				++id;
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
new file mode 100644
index 0000000..f560998
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the RocksIteratorWrapper.
+ */
+public class RocksDBRocksIteratorForKeysWrapperTest {
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	@Test
+	public void testIterator() throws Exception{
+
+		// test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == false
+		testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> i);
+
+		// test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == true
+		testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> String.valueOf(i));
+
+		// test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == false
+		testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> i);
+
+		// test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == true
+		testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> String.valueOf(i));
+	}
+
+	<K> void testIteratorHelper(
+		TypeSerializer<K> keySerializer,
+		TypeSerializer namespaceSerializer,
+		int maxKeyGroupNumber,
+		Function<Integer, K> getKeyFunc) throws Exception {
+
+		String testStateName = "aha";
+		String namespace = "ns";
+
+		String dbPath = tmp.newFolder().getAbsolutePath();
+		String checkpointPath = tmp.newFolder().toURI().toString();
+		RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true);
+		backend.setDbStoragePath(dbPath);
+
+		Environment env = new DummyEnvironment("TestTask", 1, 0);
+		RocksDBKeyedStateBackend<K> keyedStateBackend = (RocksDBKeyedStateBackend<K>) backend.createKeyedStateBackend(
+			env,
+			new JobID(),
+			"Test",
+			keySerializer,
+			maxKeyGroupNumber,
+			new KeyGroupRange(0, maxKeyGroupNumber - 1),
+			mock(TaskKvStateRegistry.class));
+
+		try {
+			keyedStateBackend.restore(null);
+			ValueState<String> testState = keyedStateBackend.getPartitionedState(
+				namespace,
+				namespaceSerializer,
+				new ValueStateDescriptor<String>(testStateName, String.class));
+
+			// insert record
+			for (int i = 0; i < 1000; ++i) {
+				keyedStateBackend.setCurrentKey(getKeyFunc.apply(i));
+				testState.update(String.valueOf(i));
+			}
+
+			ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
+			boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
+			RocksDBKeySerializationUtils.writeNameSpace(
+				namespace,
+				namespaceSerializer,
+				outputStream,
+				new DataOutputViewStreamWrapper(outputStream),
+				ambiguousKeyPossible);
+
+			byte[] nameSpaceBytes = outputStream.toByteArray();
+
+			try (
+				ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
+				RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(keyedStateBackend.db, handle);
+				RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper =
+					new RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>(
+						iterator,
+						testStateName,
+						keySerializer,
+						keyedStateBackend.getKeyGroupPrefixBytes(),
+						ambiguousKeyPossible,
+						nameSpaceBytes)) {
+
+				iterator.seekToFirst();
+
+				// valid record
+				List<Integer> fetchedKeys = new ArrayList<>(1000);
+				while (iteratorWrapper.hasNext()) {
+					fetchedKeys.add(Integer.parseInt(iteratorWrapper.next().toString()));
+				}
+
+				fetchedKeys.sort(Comparator.comparingInt(a -> a));
+				Assert.assertEquals(1000, fetchedKeys.size());
+
+				for (int i = 0; i < 1000; ++i) {
+					Assert.assertEquals(i, fetchedKeys.get(i).intValue());
+				}
+			}
+		} finally {
+			if (keyedStateBackend != null) {
+				keyedStateBackend.dispose();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
deleted file mode 100644
index 0cdda4b..0000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksIterator;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.function.Function;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the RocksIteratorWrapper.
- */
-public class RocksDBRocksIteratorWrapperTest {
-
-	@Rule
-	public final TemporaryFolder tmp = new TemporaryFolder();
-
-	@Test
-	public void testIterator() throws Exception{
-
-		// test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == false
-		testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> i);
-
-		// test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == true
-		testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> String.valueOf(i));
-
-		// test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == false
-		testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> i);
-
-		// test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == true
-		testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> String.valueOf(i));
-	}
-
-	<K> void testIteratorHelper(
-		TypeSerializer<K> keySerializer,
-		TypeSerializer namespaceSerializer,
-		int maxKeyGroupNumber,
-		Function<Integer, K> getKeyFunc) throws Exception {
-
-		String testStateName = "aha";
-		String namespace = "ns";
-
-		String dbPath = tmp.newFolder().getAbsolutePath();
-		String checkpointPath = tmp.newFolder().toURI().toString();
-		RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true);
-		backend.setDbStoragePath(dbPath);
-
-		Environment env = new DummyEnvironment("TestTask", 1, 0);
-		RocksDBKeyedStateBackend<K> keyedStateBackend = (RocksDBKeyedStateBackend<K>) backend.createKeyedStateBackend(
-			env,
-			new JobID(),
-			"Test",
-			keySerializer,
-			maxKeyGroupNumber,
-			new KeyGroupRange(0, maxKeyGroupNumber - 1),
-			mock(TaskKvStateRegistry.class));
-
-		try {
-			keyedStateBackend.restore(null);
-			ValueState<String> testState = keyedStateBackend.getPartitionedState(
-				namespace,
-				namespaceSerializer,
-				new ValueStateDescriptor<String>(testStateName, String.class));
-
-			// insert record
-			for (int i = 0; i < 1000; ++i) {
-				keyedStateBackend.setCurrentKey(getKeyFunc.apply(i));
-				testState.update(String.valueOf(i));
-			}
-
-			ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
-			boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
-			RocksDBKeySerializationUtils.writeNameSpace(
-				namespace,
-				namespaceSerializer,
-				outputStream,
-				new DataOutputViewStreamWrapper(outputStream),
-				ambiguousKeyPossible);
-
-			byte[] nameSpaceBytes = outputStream.toByteArray();
-
-			try (
-				ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
-				RocksIterator iterator = keyedStateBackend.db.newIterator(handle);
-				RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper =
-					new RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>(
-						iterator,
-						testStateName,
-						keySerializer,
-						keyedStateBackend.getKeyGroupPrefixBytes(),
-						ambiguousKeyPossible,
-						nameSpaceBytes)) {
-
-				iterator.seekToFirst();
-
-				// valid record
-				List<Integer> fetchedKeys = new ArrayList<>(1000);
-				while (iteratorWrapper.hasNext()) {
-					fetchedKeys.add(Integer.parseInt(iteratorWrapper.next().toString()));
-				}
-
-				fetchedKeys.sort(Comparator.comparingInt(a -> a));
-				Assert.assertEquals(1000, fetchedKeys.size());
-
-				for (int i = 0; i < 1000; ++i) {
-					Assert.assertEquals(i, fetchedKeys.get(i).intValue());
-				}
-			}
-		} finally {
-			if (keyedStateBackend != null) {
-				keyedStateBackend.dispose();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index e05e7ae..8724615 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.contrib.streaming.state.benchmark;
 
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
 import org.apache.flink.core.memory.MemoryUtils;
 import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.testutils.junit.RetryRule;
@@ -33,7 +34,6 @@ import org.rocksdb.CompactionStyle;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
 import org.rocksdb.WriteOptions;
 import sun.misc.Unsafe;
 
@@ -166,7 +166,7 @@ public class RocksDBPerformanceTest extends TestLogger {
 
 			int pos = 0;
 
-			try (final RocksIterator iterator = rocksDB.newIterator()) {
+			try (final RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(rocksDB)) {
 				// seek to start
 				unsafe.putInt(keyTemplate, offset, 0);
 				iterator.seek(keyTemplate);