You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/16 14:05:46 UTC

[5/8] flink git commit: [FLINK-5756] [rocksdb] Add mini benchmarks to reproduce 'merge' performance problems

[FLINK-5756] [rocksdb] Add mini benchmarks to reproduce 'merge' performance problems


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/677b508a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/677b508a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/677b508a

Branch: refs/heads/master
Commit: 677b508a962c5c7df9308ac3531e799cddec27f6
Parents: d160b5e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 19:13:07 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 .../ListViaMergeSpeedMiniBenchmark.java         | 104 ++++++++++++++++
 .../ListViaRangeSpeedMiniBenchmark.java         | 121 +++++++++++++++++++
 2 files changed, 225 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/677b508a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
new file mode 100644
index 0000000..2a530e1
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.util.FileUtils;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+
+public class ListViaMergeSpeedMiniBenchmark {
+	
+	public static void main(String[] args) throws Exception {
+		final File rocksDir = new File("/tmp/rdb");
+		FileUtils.deleteDirectory(rocksDir);
+
+		final Options options = new Options()
+				.setCompactionStyle(CompactionStyle.LEVEL)
+				.setLevelCompactionDynamicLevelBytes(true)
+				.setIncreaseParallelism(4)
+				.setUseFsync(false)
+				.setMaxOpenFiles(-1)
+				.setDisableDataSync(true)
+				.setCreateIfMissing(true)
+				.setMergeOperator(new StringAppendOperator());
+
+		final WriteOptions write_options = new WriteOptions()
+				.setSync(false)
+				.setDisableWAL(true);
+
+		final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
+
+		final String key = "key";
+		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+		final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+		final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+		final int num = 50000;
+
+		// ----- insert -----
+		System.out.println("begin insert");
+
+		final long beginInsert = System.nanoTime();
+		for (int i = 0; i < num; i++) {
+			rocksDB.merge(write_options, keyBytes, valueBytes);
+		}
+		final long endInsert = System.nanoTime();
+		System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
+
+		// ----- read (attempt 1) -----
+
+		final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
+		final long beginGet1 = System.nanoTime();
+		rocksDB.get(keyBytes, resultHolder);
+		final long endGet1 = System.nanoTime();
+
+		System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
+
+		// ----- read (attempt 2) -----
+
+		final long beginGet2 = System.nanoTime();
+		rocksDB.get(keyBytes, resultHolder);
+		final long endGet2 = System.nanoTime();
+
+		System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
+
+		// ----- compact -----
+		System.out.println("compacting...");
+		final long beginCompact = System.nanoTime();
+		rocksDB.compactRange();
+		final long endCompact = System.nanoTime();
+
+		System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
+
+		// ----- read (attempt 3) -----
+
+		final long beginGet3 = System.nanoTime();
+		rocksDB.get(keyBytes, resultHolder);
+		final long endGet3 = System.nanoTime();
+
+		System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/677b508a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
new file mode 100644
index 0000000..793a35b
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.util.FileUtils;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+public class ListViaRangeSpeedMiniBenchmark {
+	
+	public static void main(String[] args) throws Exception {
+		final File rocksDir = new File("/tmp/rdb");
+		FileUtils.deleteDirectory(rocksDir);
+
+		final Options options = new Options()
+				.setCompactionStyle(CompactionStyle.LEVEL)
+				.setLevelCompactionDynamicLevelBytes(true)
+				.setIncreaseParallelism(4)
+				.setUseFsync(false)
+				.setMaxOpenFiles(-1)
+				.setDisableDataSync(true)
+				.setCreateIfMissing(true)
+				.setMergeOperator(new StringAppendOperator());
+
+		final WriteOptions write_options = new WriteOptions()
+				.setSync(false)
+				.setDisableWAL(true);
+
+		final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
+
+		final String key = "key";
+		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+		final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+		final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+		final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
+
+		final Unsafe unsafe = MemoryUtils.UNSAFE;
+		final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
+
+		final int num = 50000;
+		System.out.println("begin insert");
+
+		final long beginInsert = System.nanoTime();
+		for (int i = 0; i < num; i++) {
+			unsafe.putInt(keyTemplate, offset, i);
+			rocksDB.put(write_options, keyTemplate, valueBytes);
+		}
+		final long endInsert = System.nanoTime();
+		System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
+
+		final byte[] resultHolder = new byte[num * valueBytes.length];
+
+		final long beginGet = System.nanoTime();
+
+		final RocksIterator iterator = rocksDB.newIterator();
+		int pos = 0;
+
+		// seek to start
+		unsafe.putInt(keyTemplate, offset, 0);
+		iterator.seek(keyTemplate);
+
+		// mark end
+		unsafe.putInt(keyTemplate, offset, -1);
+
+		// iterate
+		while (iterator.isValid()) {
+			byte[] currKey = iterator.key();
+			if (samePrefix(keyBytes, currKey)) {
+				byte[] currValue = iterator.value();
+				System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
+				pos += currValue.length;
+				iterator.next();
+			}
+			else {
+				break;
+			}
+		}
+
+		final long endGet = System.nanoTime();
+
+		System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
+	}
+
+	private static boolean samePrefix(byte[] prefix, byte[] key) {
+		for (int i = 0; i < prefix.length; i++) {
+			if (prefix[i] != key [i]) {
+				return false;
+			}
+		}
+
+		return true;
+	}
+}