You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/05/23 10:29:15 UTC

flink git commit: [FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to improve batched write performance in RocksDB backend.

Repository: flink
Updated Branches:
  refs/heads/master 5563681bc -> 1c7341ad1


[FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to improve batched write performance in RocksDB backend.

This closes #5650.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c7341ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c7341ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c7341ad

Branch: refs/heads/master
Commit: 1c7341ad1ad323746b88a4a2975dc91a7d9c5c0d
Parents: 5563681
Author: sihuazhou <su...@163.com>
Authored: Wed Mar 7 13:58:45 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Wed May 23 12:27:54 2018 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  74 +++++-----
 .../streaming/state/RocksDBMapState.java        |   8 +-
 .../state/RocksDBWriteBatchWrapper.java         | 101 +++++++++++++
 .../state/RocksDBWriteBatchWrapperTest.java     |  68 +++++++++
 .../RocksDBWriteBatchPerformanceTest.java       | 145 +++++++++++++++++++
 5 files changed, 358 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1c7341ad/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0de16c2..52362c1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -658,41 +658,43 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 */
 		private void restoreKVStateData() throws IOException, RocksDBException {
 			//for all key-groups in the current state handle...
-			for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
-				int keyGroup = keyGroupOffset.f0;
-
-				// Check that restored key groups all belong to the backend
-				Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
-					"The key group must belong to the backend");
-
-				long offset = keyGroupOffset.f1;
-				//not empty key-group?
-				if (0L != offset) {
-					currentStateHandleInStream.seek(offset);
-					try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
-						DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
-						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-						int kvStateId = compressedKgInputView.readShort();
-						ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
-						//insert all k/v pairs into DB
-						boolean keyGroupHasMoreKeys = true;
-						while (keyGroupHasMoreKeys) {
-							byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
-							byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
-							if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
-								//clear the signal bit in the key to make it ready for insertion again
-								RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
-								rocksDBKeyedStateBackend.db.put(handle, key, value);
-								//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-								kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
-									& compressedKgInputView.readShort();
-								if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
-									keyGroupHasMoreKeys = false;
+			try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(rocksDBKeyedStateBackend.db)) {
+				for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
+					int keyGroup = keyGroupOffset.f0;
+
+					// Check that restored key groups all belong to the backend
+					Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
+						"The key group must belong to the backend");
+
+					long offset = keyGroupOffset.f1;
+					//not empty key-group?
+					if (0L != offset) {
+						currentStateHandleInStream.seek(offset);
+						try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
+							DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
+							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+							int kvStateId = compressedKgInputView.readShort();
+							ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+							//insert all k/v pairs into DB
+							boolean keyGroupHasMoreKeys = true;
+							while (keyGroupHasMoreKeys) {
+								byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+								byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+								if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+									//clear the signal bit in the key to make it ready for insertion again
+									RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
+									writeBatchWrapper.put(handle, key, value);
+									//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+									kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
+										& compressedKgInputView.readShort();
+									if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+										keyGroupHasMoreKeys = false;
+									} else {
+										handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+									}
 								} else {
-									handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+									writeBatchWrapper.put(handle, key, value);
 								}
-							} else {
-								rocksDBKeyedStateBackend.db.put(handle, key, value);
 							}
 						}
 					}
@@ -1042,7 +1044,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			try (RocksDB restoreDb = stateBackend.openDB(
 				restoreInstancePath.getPath(),
 				columnFamilyDescriptors,
-				columnFamilyHandles)) {
+				columnFamilyHandles);
+				RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db)) {
 
 				final ColumnFamilyHandle defaultColumnFamily = columnFamilyHandles.remove(0);
 
@@ -1096,8 +1099,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 								}
 
 								if (stateBackend.keyGroupRange.contains(keyGroup)) {
-									stateBackend.db.put(targetColumnFamilyHandle,
-										iterator.key(), iterator.value());
+									writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
 								}
 
 								iterator.next();

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7341ad/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 5474a1c..04b4af3 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -134,8 +134,12 @@ public class RocksDBMapState<K, N, UK, UV>
 			return;
 		}
 
-		for (Map.Entry<UK, UV> entry : map.entrySet()) {
-			put(entry.getKey(), entry.getValue());
+		try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, writeOptions)) {
+			for (Map.Entry<UK, UV> entry : map.entrySet()) {
+				byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
+				byte[] rawValueBytes = serializeUserValue(entry.getValue());
+				writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7341ad/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
new file mode 100644
index 0000000..9a89259
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * It's a wrapper class around RocksDB's {@link WriteBatch} for writing in bulk.
+ *
+ * <p>IMPORTANT: This class is not thread safe.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+	private static final int MIN_CAPACITY = 100;
+	private static final int MAX_CAPACITY = 1000;
+	private static final int PER_RECORD_BYTES = 100;
+
+	private final RocksDB db;
+
+	private final WriteBatch batch;
+
+	private final WriteOptions options;
+
+	private final int capacity;
+
+	public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB) {
+		this(rocksDB, null, 500);
+	}
+
+	public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable WriteOptions options) {
+		this(rocksDB, options, 500);
+	}
+
+	public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB, @Nullable WriteOptions options, int capacity) {
+		Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
+			"capacity should be between " + MIN_CAPACITY + " and " + MAX_CAPACITY);
+
+		this.db = rocksDB;
+		this.options = options;
+		this.capacity = capacity;
+		this.batch = new WriteBatch(this.capacity * PER_RECORD_BYTES);
+	}
+
+	public void put(
+		@Nonnull ColumnFamilyHandle handle,
+		@Nonnull byte[] key,
+		@Nonnull byte[] value) throws RocksDBException {
+
+		batch.put(handle, key, value);
+
+		if (batch.count() == capacity) {
+			flush();
+		}
+	}
+
+	public void flush() throws RocksDBException {
+		if (options != null) {
+			db.write(options, batch);
+		} else {
+			// use the default WriteOptions, if wasn't provided.
+			try (WriteOptions writeOptions = new WriteOptions()) {
+				db.write(writeOptions, batch);
+			}
+		}
+		batch.clear();
+	}
+
+	@Override
+	public void close() throws RocksDBException {
+		if (batch.count() != 0) {
+			flush();
+		}
+		IOUtils.closeQuietly(batch);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7341ad/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
new file mode 100644
index 0000000..47983fb
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+	@Rule
+	public TemporaryFolder folder = new TemporaryFolder();
+
+	@Test
+	public void basicTest() throws Exception {
+
+		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
+		for (int i = 0; i < 10000; ++i) {
+			data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
+		}
+
+		try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+			WriteOptions options = new WriteOptions().setDisableWAL(true);
+			ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+			RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
+
+			// insert data
+			for (Tuple2<byte[], byte[]> item : data) {
+				writeBatchWrapper.put(handle, item.f0, item.f1);
+			}
+			writeBatchWrapper.flush();
+
+			// valid result
+			for (Tuple2<byte[], byte[]> item : data) {
+				Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1c7341ad/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
new file mode 100644
index 0000000..991c88c
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBWriteBatchPerformanceTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBMapState;
+import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test that validates that the performance of RocksDB's WriteBatch as expected.
+ *
+ * <p>Benchmarking:
+ * Computer: MacbookPro (Mid 2015), Flash Storage, Processor 2.5GHz Intel Core i5, Memory 16GB 1600MHz DDR3
+ *
+ * <p>With disableWAL is false
+ * Number of values added | time for Put		|  time for WriteBach | performance improvement of WriteBatch over Put
+ * 1000						10146397 ns			3546287 ns				2.86x
+ * 10000					118227077 ns		26040222 ns				4.54x
+ * 100000					1838593196 ns		375053755 ns			4.9x
+ * 1000000					8844612079 ns		2014077396 ns			4.39x
+ *
+ * <p>With disableWAL is true
+ * 1000						3955204 ns			2429725 ns				1.62x
+ * 10000					25618237 ns			16440113 ns				1.55x
+ * 100000					289153346 ns		183712685 ns			1.57x
+ * 1000000					2886298967 ns		1768688571 ns			1.63x
+ *
+ * <p>In summary:
+ *
+ * <p>WriteBatch gives users 2.5x-5x performance improvements when disableWAL is false(This is useful when
+ * restoring from savepoint, because we need to set disableWAL=true to avoid segfault bug, see FLINK-8859 for detail).
+ *
+ * <p>Write gives user 1.5x performance improvements when disableWAL is true, this is useful for batch writing scenario,
+ * e.g. {@link RocksDBMapState#putAll(Map)} & {@link RocksDBMapState#clear()}.
+ */
+public class RocksDBWriteBatchPerformanceTest extends TestLogger {
+
+	@Rule
+	public TemporaryFolder folder = new TemporaryFolder();
+
+	private static final String KEY_PREFIX = "key";
+
+	private static final String VALUE = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+	@Test(timeout = 2000)
+	@RetryOnFailure(times = 3)
+	public void benchMark() throws Exception {
+
+		int num = 10000;
+
+		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(num);
+		for (int i = 0; i < num; ++i) {
+			data.add(new Tuple2<>((KEY_PREFIX + i).getBytes(), VALUE.getBytes()));
+		}
+
+		log.info("--------------> put VS WriteBatch with disableWAL=false <--------------");
+
+		long t1 = benchMarkHelper(data, false, WRITETYPE.PUT);
+		long t2 = benchMarkHelper(data, false, WRITETYPE.WRITE_BATCH);
+
+		log.info("Single Put with disableWAL is false for {} records costs {}" , num, t1);
+		log.info("WriteBatch with disableWAL is false for {} records costs {}" , num, t2);
+
+		Assert.assertTrue(t2 < t1);
+
+		log.info("--------------> put VS WriteBatch with disableWAL=true <--------------");
+
+		t1 = benchMarkHelper(data, true, WRITETYPE.PUT);
+		t2 = benchMarkHelper(data, true, WRITETYPE.WRITE_BATCH);
+
+		log.info("Single Put with disableWAL is true for {} records costs {}" , num, t1);
+		log.info("WriteBatch with disableWAL is true for {} records costs {}" , num, t2);
+
+		Assert.assertTrue(t2 < t1);
+	}
+
+	private enum WRITETYPE {PUT, WRITE_BATCH}
+
+	private long benchMarkHelper(List<Tuple2<byte[], byte[]>> data, boolean disableWAL, WRITETYPE type) throws Exception {
+		final File rocksDir = folder.newFolder();
+
+		// ensure the RocksDB library is loaded to a distinct location each retry
+		NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
+
+		switch (type) {
+			case PUT:
+				try (RocksDB db = RocksDB.open(rocksDir.getAbsolutePath());
+					WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
+					ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()))) {
+					long t1 = System.nanoTime();
+					for (Tuple2<byte[], byte[]> item : data) {
+						db.put(handle, options, item.f0, item.f1);
+					}
+					return System.nanoTime() - t1;
+				}
+			case WRITE_BATCH:
+				try (RocksDB db = RocksDB.open(rocksDir.getAbsolutePath());
+					WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
+					ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+					RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options)) {
+					long t1 = System.nanoTime();
+					for (Tuple2<byte[], byte[]> item : data) {
+						writeBatchWrapper.put(handle, item.f0, item.f1);
+					}
+					writeBatchWrapper.flush();
+					return System.nanoTime() - t1;
+				}
+			default:
+				throw new RuntimeException("Unknown benchmark type:" + type);
+		}
+	}
+}