You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/17 13:15:45 UTC
flink git commit: [FLINK-9373][statebackend] Introduce
RocksIteratorWrapper to wrap `Seek(), Next(), SeekToFirst(), SeekToLast(),
SeekForPrev(), and Prev()` to check the iterator status.
Repository: flink
Updated Branches:
refs/heads/master 3df780902 -> 105b30686
[FLINK-9373][statebackend] Introduce RocksIteratorWrapper to wrap `Seek(), Next(), SeekToFirst(), SeekToLast(), SeekForPrev(), and Prev()` to check the iterator status.
This closes #6020.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/105b3068
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/105b3068
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/105b3068
Branch: refs/heads/master
Commit: 105b30686f76dc2d6affcf6cdf3eb41f322df53a
Parents: 3df7809
Author: sihuazhou <su...@163.com>
Authored: Wed May 16 15:47:05 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu May 17 15:15:13 2018 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 68 ++++++---
.../streaming/state/RocksDBMapState.java | 3 +-
.../streaming/state/RocksIteratorWrapper.java | 103 +++++++++++++
.../state/RocksDBMergeIteratorTest.java | 8 +-
.../RocksDBRocksIteratorForKeysWrapperTest.java | 152 ++++++++++++++++++
.../state/RocksDBRocksIteratorWrapperTest.java | 153 -------------------
.../state/benchmark/RocksDBPerformanceTest.java | 4 +-
7 files changed, 304 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0ec2ef0..0de16c2 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -99,7 +99,6 @@ import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
@@ -322,7 +321,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
}
- RocksIterator iterator = db.newIterator(columnInfo.f0);
+ RocksIteratorWrapper iterator = getRocksIterator(db, columnInfo.f0);
iterator.seekToFirst();
final RocksIteratorForKeysWrapper<K> iteratorWrapper = new RocksIteratorForKeysWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
@@ -734,7 +733,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
restoreInstance((IncrementalLocalKeyedStateHandle) rawStateHandle);
} else {
throw new IllegalStateException("Unexpected state handle type, " +
- "expected " + IncrementalKeyedStateHandle.class +
+ "expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class +
", but found " + rawStateHandle.getClass());
}
}
@@ -1079,7 +1078,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
- try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
+ try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) {
int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
@@ -1309,7 +1308,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
int count = 0;
for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) {
- try (RocksIterator rocksIterator = db.newIterator(column.f0)) {
+ try (RocksIteratorWrapper rocksIterator = getRocksIterator(db, column.f0)) {
rocksIterator.seekToFirst();
while (rocksIterator.isValid()) {
@@ -1357,7 +1356,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) {
+ RocksDBMergeIterator(List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) throws RocksDBException {
Preconditions.checkNotNull(kvStateIterators);
Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
@@ -1369,8 +1368,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
PriorityQueue<MergeIterator> iteratorPriorityQueue =
new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
- for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
- final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0;
+ for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
+ final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0;
rocksIterator.seekToFirst();
if (rocksIterator.isValid()) {
iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
@@ -1398,15 +1397,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after
* calls to {@link #next()}.
*/
- public void next() {
+ public void next() throws RocksDBException {
newKeyGroup = false;
newKVState = false;
- final RocksIterator rocksIterator = currentSubIterator.getIterator();
+ final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator();
rocksIterator.next();
byte[] oldKey = currentSubIterator.getCurrentKey();
if (rocksIterator.isValid()) {
+
currentSubIterator.currentKey = rocksIterator.key();
if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
@@ -1523,13 +1523,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* @param iterator The #RocksIterator to wrap .
* @param kvStateId Id of the K/V state to which this iterator belongs.
*/
- MergeIterator(RocksIterator iterator, int kvStateId) {
+ MergeIterator(RocksIteratorWrapper iterator, int kvStateId) {
this.iterator = Preconditions.checkNotNull(iterator);
this.currentKey = iterator.key();
this.kvStateId = kvStateId;
}
- private final RocksIterator iterator;
+ private final RocksIteratorWrapper iterator;
private byte[] currentKey;
private final int kvStateId;
@@ -1541,7 +1541,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.currentKey = currentKey;
}
- public RocksIterator getIterator() {
+ public RocksIteratorWrapper getIterator() {
return iterator;
}
@@ -1556,13 +1556,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
/**
- * Adapter class to bridge between {@link RocksIterator} and {@link Iterator} to iterate over the keys. This class
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class
* is not thread safe.
*
* @param <K> the type of the iterated objects, which are keys in RocksDB.
*/
static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseable {
- private final RocksIterator iterator;
+ private final RocksIteratorWrapper iterator;
private final String state;
private final TypeSerializer<K> keySerializer;
private final int keyGroupPrefixBytes;
@@ -1571,7 +1571,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private K nextKey;
RocksIteratorForKeysWrapper(
- RocksIterator iterator,
+ RocksIteratorWrapper iterator,
String state,
TypeSerializer<K> keySerializer,
int keyGroupPrefixBytes,
@@ -1588,8 +1588,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
@Override
public boolean hasNext() {
- while (nextKey == null && iterator.isValid()) {
- try {
+ try {
+ while (nextKey == null && iterator.isValid()) {
+
byte[] key = iterator.key();
if (isMatchingNameSpace(key)) {
ByteArrayInputStreamWithPos inputStream =
@@ -1603,9 +1604,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
nextKey = value;
}
iterator.next();
- } catch (IOException e) {
- throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
}
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
}
return nextKey != null;
}
@@ -1869,7 +1870,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*/
private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
- private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
+ private List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators;
private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
private DataOutputView outputView;
@@ -1928,7 +1929,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*
* @throws IOException
*/
- public void writeDBSnapshot() throws IOException, InterruptedException {
+ public void writeDBSnapshot() throws IOException, InterruptedException, RocksDBException {
if (null == snapshot) {
throw new IOException("No snapshot available. Might be released due to cancellation.");
@@ -1966,7 +1967,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
checkpointStreamWithResultProvider = null;
if (null != kvStateIterators) {
- for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) {
+ for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : kvStateIterators) {
IOUtils.closeQuietly(kvStateIterator.f0);
}
kvStateIterators = null;
@@ -2001,7 +2002,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
for (ColumnFamilyHandle columnFamilyHandle : copiedColumnFamilyHandles) {
kvStateIterators.add(
- new Tuple2<>(stateBackend.db.newIterator(columnFamilyHandle, readOptions), kvStateId));
+ new Tuple2<>(getRocksIterator(stateBackend.db, columnFamilyHandle, readOptions), kvStateId));
++kvStateId;
}
@@ -2019,7 +2020,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.write(outputView);
}
- private void writeKVStateData() throws IOException, InterruptedException {
+ private void writeKVStateData() throws IOException, InterruptedException, RocksDBException {
byte[] previousKey = null;
byte[] previousValue = null;
@@ -2519,4 +2520,21 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
}
+
+ public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
+ return new RocksIteratorWrapper(db.newIterator());
+ }
+
+ public static RocksIteratorWrapper getRocksIterator(
+ RocksDB db,
+ ColumnFamilyHandle columnFamilyHandle) {
+ return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
+ }
+
+ public static RocksIteratorWrapper getRocksIterator(
+ RocksDB db,
+ ColumnFamilyHandle columnFamilyHandle,
+ ReadOptions readOptions) {
+ return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 219f3ae..b84b7a2 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -34,7 +34,6 @@ import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -549,7 +548,7 @@ public class RocksDBMapState<K, N, UK, UV>
// use try-with-resources to ensure RocksIterator can be release even some runtime exception
// occurred in the below code block.
- try (RocksIterator iterator = db.newIterator(columnFamily)) {
+ try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {
/*
* The iteration starts from the prefix bytes at the first loading. The cache then is
http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java
new file mode 100644
index 0000000..f98e3f5
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksIteratorInterface;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+
+/**
+ * This is a wrapper around {@link RocksIterator} to check the iterator status for all the methods mentioned
+ * to require this check in the wiki documentation: seek, next, seekToFirst, seekToLast, seekForPrev, and prev.
+ * This is required because the iterator may pass the blocks or files it had difficulties in reading (because
+ * of IO error, data corruption or other issues) and continue with the next available keys. The status flag may not be
+ * OK, even if the iterator is valid. More information can be found
+ * <a href="https://github.com/facebook/rocksdb/wiki/Iterator#error-handling">here</a>.
+ */
+public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
+
+ private RocksIterator iterator;
+
+ public RocksIteratorWrapper(@Nonnull RocksIterator iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean isValid() {
+ return this.iterator.isValid();
+ }
+
+ @Override
+ public void seekToFirst() {
+ iterator.seekToFirst();
+ status();
+ }
+
+ @Override
+ public void seekToLast() {
+ iterator.seekToFirst();
+ status();
+ }
+
+ @Override
+ public void seek(byte[] target) {
+ iterator.seek(target);
+ status();
+ }
+
+ @Override
+ public void next() {
+ iterator.next();
+ status();
+ }
+
+ @Override
+ public void prev() {
+ iterator.prev();
+ status();
+ }
+
+ @Override
+ public void status() {
+ try {
+ iterator.status();
+ } catch (RocksDBException ex) {
+ throw new FlinkRuntimeException("Internal exception found in RocksDB", ex);
+ }
+ }
+
+ public byte[] key() {
+ return iterator.key();
+ }
+
+ public byte[] value() {
+ return iterator.value();
+ }
+
+ @Override
+ public void close() {
+ iterator.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index 19f49f8..cb2b202 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -30,10 +30,8 @@ import org.junit.rules.TemporaryFolder;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
import java.io.DataOutputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -52,7 +50,7 @@ public class RocksDBMergeIteratorTest {
public TemporaryFolder tempFolder = new TemporaryFolder();
@Test
- public void testEmptyMergeIterator() throws IOException {
+ public void testEmptyMergeIterator() throws Exception {
RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.emptyList(), 2);
Assert.assertFalse(emptyIterator.isValid());
@@ -76,7 +74,7 @@ public class RocksDBMergeIteratorTest {
Random random = new Random(1234);
try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
- List<Tuple2<RocksIterator, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
+ List<Tuple2<RocksIteratorWrapper, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>();
int totalKeysExpected = 0;
@@ -109,7 +107,7 @@ public class RocksDBMergeIteratorTest {
int id = 0;
for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
- rocksIteratorsWithKVStateId.add(new Tuple2<>(rocksDB.newIterator(columnFamilyHandle.f0), id));
+ rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBKeyedStateBackend.getRocksIterator(rocksDB, columnFamilyHandle.f0), id));
++id;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
new file mode 100644
index 0000000..f560998
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the RocksIteratorWrapper.
+ */
+public class RocksDBRocksIteratorForKeysWrapperTest {
+
+ @Rule
+ public final TemporaryFolder tmp = new TemporaryFolder();
+
+ @Test
+ public void testIterator() throws Exception{
+
+ // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == false
+ testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> i);
+
+ // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == true
+ testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> String.valueOf(i));
+
+ // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == false
+ testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> i);
+
+ // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == true
+ testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> String.valueOf(i));
+ }
+
+ <K> void testIteratorHelper(
+ TypeSerializer<K> keySerializer,
+ TypeSerializer namespaceSerializer,
+ int maxKeyGroupNumber,
+ Function<Integer, K> getKeyFunc) throws Exception {
+
+ String testStateName = "aha";
+ String namespace = "ns";
+
+ String dbPath = tmp.newFolder().getAbsolutePath();
+ String checkpointPath = tmp.newFolder().toURI().toString();
+ RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true);
+ backend.setDbStoragePath(dbPath);
+
+ Environment env = new DummyEnvironment("TestTask", 1, 0);
+ RocksDBKeyedStateBackend<K> keyedStateBackend = (RocksDBKeyedStateBackend<K>) backend.createKeyedStateBackend(
+ env,
+ new JobID(),
+ "Test",
+ keySerializer,
+ maxKeyGroupNumber,
+ new KeyGroupRange(0, maxKeyGroupNumber - 1),
+ mock(TaskKvStateRegistry.class));
+
+ try {
+ keyedStateBackend.restore(null);
+ ValueState<String> testState = keyedStateBackend.getPartitionedState(
+ namespace,
+ namespaceSerializer,
+ new ValueStateDescriptor<String>(testStateName, String.class));
+
+ // insert record
+ for (int i = 0; i < 1000; ++i) {
+ keyedStateBackend.setCurrentKey(getKeyFunc.apply(i));
+ testState.update(String.valueOf(i));
+ }
+
+ ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
+ boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
+ RocksDBKeySerializationUtils.writeNameSpace(
+ namespace,
+ namespaceSerializer,
+ outputStream,
+ new DataOutputViewStreamWrapper(outputStream),
+ ambiguousKeyPossible);
+
+ byte[] nameSpaceBytes = outputStream.toByteArray();
+
+ try (
+ ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
+ RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(keyedStateBackend.db, handle);
+ RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper =
+ new RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>(
+ iterator,
+ testStateName,
+ keySerializer,
+ keyedStateBackend.getKeyGroupPrefixBytes(),
+ ambiguousKeyPossible,
+ nameSpaceBytes)) {
+
+ iterator.seekToFirst();
+
+ // valid record
+ List<Integer> fetchedKeys = new ArrayList<>(1000);
+ while (iteratorWrapper.hasNext()) {
+ fetchedKeys.add(Integer.parseInt(iteratorWrapper.next().toString()));
+ }
+
+ fetchedKeys.sort(Comparator.comparingInt(a -> a));
+ Assert.assertEquals(1000, fetchedKeys.size());
+
+ for (int i = 0; i < 1000; ++i) {
+ Assert.assertEquals(i, fetchedKeys.get(i).intValue());
+ }
+ }
+ } finally {
+ if (keyedStateBackend != null) {
+ keyedStateBackend.dispose();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
deleted file mode 100644
index 0cdda4b..0000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksIterator;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.function.Function;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the RocksIteratorWrapper.
- */
-public class RocksDBRocksIteratorWrapperTest {
-
- @Rule
- public final TemporaryFolder tmp = new TemporaryFolder();
-
- @Test
- public void testIterator() throws Exception{
-
- // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == false
- testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> i);
-
- // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == true
- testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> String.valueOf(i));
-
- // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == false
- testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> i);
-
- // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == true
- testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> String.valueOf(i));
- }
-
- <K> void testIteratorHelper(
- TypeSerializer<K> keySerializer,
- TypeSerializer namespaceSerializer,
- int maxKeyGroupNumber,
- Function<Integer, K> getKeyFunc) throws Exception {
-
- String testStateName = "aha";
- String namespace = "ns";
-
- String dbPath = tmp.newFolder().getAbsolutePath();
- String checkpointPath = tmp.newFolder().toURI().toString();
- RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true);
- backend.setDbStoragePath(dbPath);
-
- Environment env = new DummyEnvironment("TestTask", 1, 0);
- RocksDBKeyedStateBackend<K> keyedStateBackend = (RocksDBKeyedStateBackend<K>) backend.createKeyedStateBackend(
- env,
- new JobID(),
- "Test",
- keySerializer,
- maxKeyGroupNumber,
- new KeyGroupRange(0, maxKeyGroupNumber - 1),
- mock(TaskKvStateRegistry.class));
-
- try {
- keyedStateBackend.restore(null);
- ValueState<String> testState = keyedStateBackend.getPartitionedState(
- namespace,
- namespaceSerializer,
- new ValueStateDescriptor<String>(testStateName, String.class));
-
- // insert record
- for (int i = 0; i < 1000; ++i) {
- keyedStateBackend.setCurrentKey(getKeyFunc.apply(i));
- testState.update(String.valueOf(i));
- }
-
- ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8);
- boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
- RocksDBKeySerializationUtils.writeNameSpace(
- namespace,
- namespaceSerializer,
- outputStream,
- new DataOutputViewStreamWrapper(outputStream),
- ambiguousKeyPossible);
-
- byte[] nameSpaceBytes = outputStream.toByteArray();
-
- try (
- ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
- RocksIterator iterator = keyedStateBackend.db.newIterator(handle);
- RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper =
- new RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>(
- iterator,
- testStateName,
- keySerializer,
- keyedStateBackend.getKeyGroupPrefixBytes(),
- ambiguousKeyPossible,
- nameSpaceBytes)) {
-
- iterator.seekToFirst();
-
- // valid record
- List<Integer> fetchedKeys = new ArrayList<>(1000);
- while (iteratorWrapper.hasNext()) {
- fetchedKeys.add(Integer.parseInt(iteratorWrapper.next().toString()));
- }
-
- fetchedKeys.sort(Comparator.comparingInt(a -> a));
- Assert.assertEquals(1000, fetchedKeys.size());
-
- for (int i = 0; i < 1000; ++i) {
- Assert.assertEquals(i, fetchedKeys.get(i).intValue());
- }
- }
- } finally {
- if (keyedStateBackend != null) {
- keyedStateBackend.dispose();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/105b3068/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index e05e7ae..8724615 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.contrib.streaming.state.benchmark;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.core.memory.MemoryUtils;
import org.apache.flink.testutils.junit.RetryOnFailure;
import org.apache.flink.testutils.junit.RetryRule;
@@ -33,7 +34,6 @@ import org.rocksdb.CompactionStyle;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
import org.rocksdb.WriteOptions;
import sun.misc.Unsafe;
@@ -166,7 +166,7 @@ public class RocksDBPerformanceTest extends TestLogger {
int pos = 0;
- try (final RocksIterator iterator = rocksDB.newIterator()) {
+ try (final RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(rocksDB)) {
// seek to start
unsafe.putInt(keyTemplate, offset, 0);
iterator.seek(keyTemplate);