You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/23 10:29:15 UTC
flink git commit: [FLINK-8845][state] Introduce
RocksDBWriteBatchWrapper to improve batched write performance in RocksDB
backend.
Repository: flink
Updated Branches:
refs/heads/master 5563681bc -> 1c7341ad1
[FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to improve batched write performance in RocksDB backend.
This closes #5650.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c7341ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c7341ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c7341ad
Branch: refs/heads/master
Commit: 1c7341ad1ad323746b88a4a2975dc91a7d9c5c0d
Parents: 5563681
Author: sihuazhou <su...@163.com>
Authored: Wed Mar 7 13:58:45 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Wed May 23 12:27:54 2018 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 74 +++++-----
.../streaming/state/RocksDBMapState.java | 8 +-
.../state/RocksDBWriteBatchWrapper.java | 101 +++++++++++++
.../state/RocksDBWriteBatchWrapperTest.java | 68 +++++++++
.../RocksDBWriteBatchPerformanceTest.java | 145 +++++++++++++++++++
5 files changed, 358 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1c7341ad/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0de16c2..52362c1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -658,41 +658,43 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*/
private void restoreKVStateData() throws IOException, RocksDBException {
//for all key-groups in the current state handle...
- for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
- int keyGroup = keyGroupOffset.f0;
-
- // Check that restored key groups all belong to the backend
- Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
- "The key group must belong to the backend");
-
- long offset = keyGroupOffset.f1;
- //not empty key-group?
- if (0L != offset) {
- currentStateHandleInStream.seek(offset);
- try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
- DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- int kvStateId = compressedKgInputView.readShort();
- ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
- //insert all k/v pairs into DB
- boolean keyGroupHasMoreKeys = true;
- while (keyGroupHasMoreKeys) {
- byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
- byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
- if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
- //clear the signal bit in the key to make it ready for insertion again
- RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
- rocksDBKeyedStateBackend.db.put(handle, key, value);
- //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
- kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
- & compressedKgInputView.readShort();
- if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
- keyGroupHasMoreKeys = false;
+ try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(rocksDBKeyedStateBackend.db)) {
+ for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
+ int keyGroup = keyGroupOffset.f0;
+
+ // Check that restored key groups all belong to the backend
+ Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
+ "The key group must belong to the backend");
+
+ long offset = keyGroupOffset.f1;
+ //not empty key-group?
+ if (0L != offset) {
+ currentStateHandleInStream.seek(offset);
+ try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
+ DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ int kvStateId = compressedKgInputView.readShort();
+ ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ //insert all k/v pairs into DB
+ boolean keyGroupHasMoreKeys = true;
+ while (keyGroupHasMoreKeys) {
+ byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+ byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+ if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+ //clear the signal bit in the key to make it ready for insertion again
+ RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
+ writeBatchWrapper.put(handle, key, value);
+ //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
+ & compressedKgInputView.readShort();
+ if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+ keyGroupHasMoreKeys = false;
+ } else {
+ handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ }
} else {
- handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ writeBatchWrapper.put(handle, key, value);
}
- } else {
- rocksDBKeyedStateBackend.db.put(handle, key, value);
}
}
}
@@ -1042,7 +1044,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try (RocksDB restoreDb = stateBackend.openDB(
restoreInstancePath.getPath(),
columnFamilyDescriptors,
- columnFamilyHandles)) {
+ columnFamilyHandles);
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db)) {
final ColumnFamilyHandle defaultColumnFamily = columnFamilyHandles.remove(0);
@@ -1096,8 +1099,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
if (stateBackend.keyGroupRange.contains(keyGroup)) {
- stateBackend.db.put(targetColumnFamilyHandle,
- iterator.key(), iterator.value());
+ writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
}
iterator.next();
http://git-wip-us.apache.org/repos/asf/flink/blob/1c7341ad/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 5474a1c..04b4af3 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -134,8 +134,12 @@ public class RocksDBMapState<K, N, UK, UV>
return;
}
- for (Map.Entry<UK, UV> entry : map.entrySet()) {
- put(entry.getKey(), entry.getValue());
+ try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, writeOptions)) {
+ for (Map.Entry<UK, UV> entry : map.entrySet()) {
+ byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
+ byte[] rawValueBytes = serializeUserValue(entry.getValue());
+ writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c7341ad/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
new file mode 100644
index 0000000..9a89259
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * It's a wrapper class around RocksDB's {@link WriteBatch} for writing in bulk.
+ *
+ * <p>IMPORTANT: This class is not thread safe.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+ private static final int MIN_CAPACITY = 100;
+ private static final int MAX_CAPACITY = 1000;
+ private static final int PER_RECORD_BYTES = 100;
+
+ private final RocksDB db;
+
+ private final WriteBatch batch;
+
+ private final WriteOptions options;
+
+ private final int capacity;
+
+ public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB) {
+ this(rocksDB, null, 500);
+ }
+
+ public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable WriteOptions options) {
+ this(rocksDB, options, 500);
+ }
+
+ public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable WriteOptions options, int capacity) {
+ Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
+ "capacity should be between " + MIN_CAPACITY + " and " + MAX_CAPACITY);
+
+ this.db = rocksDB;
+ this.options = options;
+ this.capacity = capacity;
+ this.batch = new WriteBatch(this.capacity * PER_RECORD_BYTES);
+ }
+
+ public void put(
+ @Nonnull ColumnFamilyHandle handle,
+ @Nonnull byte[] key,
+ @Nonnull byte[] value) throws RocksDBException {
+
+ batch.put(handle, key, value);
+
+ if (batch.count() == capacity) {
+ flush();
+ }
+ }
+
+ public void flush() throws RocksDBException {
+ if (options != null) {
+ db.write(options, batch);
+ } else {
+ // use the default WriteOptions, if wasn't provided.
+ try (WriteOptions writeOptions = new WriteOptions()) {
+ db.write(writeOptions, batch);
+ }
+ }
+ batch.clear();
+ }
+
+ @Override
+ public void close() throws RocksDBException {
+ if (batch.count() != 0) {
+ flush();
+ }
+ IOUtils.closeQuietly(batch);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c7341ad/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
new file mode 100644
index 0000000..47983fb
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void basicTest() throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
+ for (int i = 0; i < 10000; ++i) {
+ data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
+ }
+
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(true);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
+
+ // insert data
+ for (Tuple2<byte[], byte[]> item : data) {
+ writeBatchWrapper.put(handle, item.f0, item.f1);
+ }
+ writeBatchWrapper.flush();
+
+ // valid result
+ for (Tuple2<byte[], byte[]> item : data) {
+ Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c7341ad/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
new file mode 100644
index 0000000..991c88c
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBMapState;
+import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test that validates that the performance of RocksDB's WriteBatch as expected.
+ *
+ * <p>Benchmarking:
+ * Computer: MacbookPro (Mid 2015), Flash Storage, Processor 2.5GHz Intel Core i5, Memory 16GB 1600MHz DDR3
+ *
+ * <p>With disableWAL is false
+ * Number of values added | time for Put | time for WriteBach | performance improvement of WriteBatch over Put
+ * 1000 10146397 ns 3546287 ns 2.86x
+ * 10000 118227077 ns 26040222 ns 4.54x
+ * 100000 1838593196 ns 375053755 ns 4.9x
+ * 1000000 8844612079 ns 2014077396 ns 4.39x
+ *
+ * <p>With disableWAL is true
+ * 1000 3955204 ns 2429725 ns 1.62x
+ * 10000 25618237 ns 16440113 ns 1.55x
+ * 100000 289153346 ns 183712685 ns 1.57x
+ * 1000000 2886298967 ns 1768688571 ns 1.63x
+ *
+ * <p>In summary:
+ *
+ * <p>WriteBatch gives users 2.5x-5x performance improvements when disableWAL is false(This is useful when
+ * restoring from savepoint, because we need to set disableWAL=true to avoid segfault bug, see FLINK-8859 for detail).
+ *
+ * <p>Write gives user 1.5x performance improvements when disableWAL is true, this is useful for batch writing scenario,
+ * e.g. {@link RocksDBMapState#putAll(Map)} & {@link RocksDBMapState#clear()}.
+ */
+public class RocksDBWriteBatchPerformanceTest extends TestLogger {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ private static final String KEY_PREFIX = "key";
+
+ private static final String VALUE = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+ @Test(timeout = 2000)
+ @RetryOnFailure(times = 3)
+ public void benchMark() throws Exception {
+
+ int num = 10000;
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(num);
+ for (int i = 0; i < num; ++i) {
+ data.add(new Tuple2<>((KEY_PREFIX + i).getBytes(), VALUE.getBytes()));
+ }
+
+ log.info("--------------> put VS WriteBatch with disableWAL=false <--------------");
+
+ long t1 = benchMarkHelper(data, false, WRITETYPE.PUT);
+ long t2 = benchMarkHelper(data, false, WRITETYPE.WRITE_BATCH);
+
+ log.info("Single Put with disableWAL is false for {} records costs {}" , num, t1);
+ log.info("WriteBatch with disableWAL is false for {} records costs {}" , num, t2);
+
+ Assert.assertTrue(t2 < t1);
+
+ log.info("--------------> put VS WriteBatch with disableWAL=true <--------------");
+
+ t1 = benchMarkHelper(data, true, WRITETYPE.PUT);
+ t2 = benchMarkHelper(data, true, WRITETYPE.WRITE_BATCH);
+
+ log.info("Single Put with disableWAL is true for {} records costs {}" , num, t1);
+ log.info("WriteBatch with disableWAL is true for {} records costs {}" , num, t2);
+
+ Assert.assertTrue(t2 < t1);
+ }
+
+ private enum WRITETYPE {PUT, WRITE_BATCH}
+
+ private long benchMarkHelper(List<Tuple2<byte[], byte[]>> data, boolean disableWAL, WRITETYPE type) throws Exception {
+ final File rocksDir = folder.newFolder();
+
+ // ensure the RocksDB library is loaded to a distinct location each retry
+ NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
+
+ switch (type) {
+ case PUT:
+ try (RocksDB db = RocksDB.open(rocksDir.getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()))) {
+ long t1 = System.nanoTime();
+ for (Tuple2<byte[], byte[]> item : data) {
+ db.put(handle, options, item.f0, item.f1);
+ }
+ return System.nanoTime() - t1;
+ }
+ case WRITE_BATCH:
+ try (RocksDB db = RocksDB.open(rocksDir.getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options)) {
+ long t1 = System.nanoTime();
+ for (Tuple2<byte[], byte[]> item : data) {
+ writeBatchWrapper.put(handle, item.f0, item.f1);
+ }
+ writeBatchWrapper.flush();
+ return System.nanoTime() - t1;
+ }
+ default:
+ throw new RuntimeException("Unknown benchmark type:" + type);
+ }
+ }
+}