You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/16 14:05:46 UTC
[5/8] flink git commit: [FLINK-5756] [rocksdb] Add mini benchmarks to
reproduce 'merge' performance problems
[FLINK-5756] [rocksdb] Add mini benchmarks to reproduce 'merge' performance problems
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/677b508a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/677b508a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/677b508a
Branch: refs/heads/master
Commit: 677b508a962c5c7df9308ac3531e799cddec27f6
Parents: d160b5e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 19:13:07 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100
----------------------------------------------------------------------
.../ListViaMergeSpeedMiniBenchmark.java | 104 ++++++++++++++++
.../ListViaRangeSpeedMiniBenchmark.java | 121 +++++++++++++++++++
2 files changed, 225 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/677b508a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
new file mode 100644
index 0000000..2a530e1
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.util.FileUtils;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+
+public class ListViaMergeSpeedMiniBenchmark {
+
+ public static void main(String[] args) throws Exception {
+ final File rocksDir = new File("/tmp/rdb");
+ FileUtils.deleteDirectory(rocksDir);
+
+ final Options options = new Options()
+ .setCompactionStyle(CompactionStyle.LEVEL)
+ .setLevelCompactionDynamicLevelBytes(true)
+ .setIncreaseParallelism(4)
+ .setUseFsync(false)
+ .setMaxOpenFiles(-1)
+ .setDisableDataSync(true)
+ .setCreateIfMissing(true)
+ .setMergeOperator(new StringAppendOperator());
+
+ final WriteOptions write_options = new WriteOptions()
+ .setSync(false)
+ .setDisableWAL(true);
+
+ final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
+
+ final String key = "key";
+ final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+ final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+ final int num = 50000;
+
+ // ----- insert -----
+ System.out.println("begin insert");
+
+ final long beginInsert = System.nanoTime();
+ for (int i = 0; i < num; i++) {
+ rocksDB.merge(write_options, keyBytes, valueBytes);
+ }
+ final long endInsert = System.nanoTime();
+ System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
+
+ // ----- read (attempt 1) -----
+
+ final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
+ final long beginGet1 = System.nanoTime();
+ rocksDB.get(keyBytes, resultHolder);
+ final long endGet1 = System.nanoTime();
+
+ System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
+
+ // ----- read (attempt 2) -----
+
+ final long beginGet2 = System.nanoTime();
+ rocksDB.get(keyBytes, resultHolder);
+ final long endGet2 = System.nanoTime();
+
+ System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
+
+ // ----- compact -----
+ System.out.println("compacting...");
+ final long beginCompact = System.nanoTime();
+ rocksDB.compactRange();
+ final long endCompact = System.nanoTime();
+
+ System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
+
+ // ----- read (attempt 3) -----
+
+ final long beginGet3 = System.nanoTime();
+ rocksDB.get(keyBytes, resultHolder);
+ final long endGet3 = System.nanoTime();
+
+ System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/677b508a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
new file mode 100644
index 0000000..793a35b
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.util.FileUtils;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+public class ListViaRangeSpeedMiniBenchmark {
+
+ public static void main(String[] args) throws Exception {
+ final File rocksDir = new File("/tmp/rdb");
+ FileUtils.deleteDirectory(rocksDir);
+
+ final Options options = new Options()
+ .setCompactionStyle(CompactionStyle.LEVEL)
+ .setLevelCompactionDynamicLevelBytes(true)
+ .setIncreaseParallelism(4)
+ .setUseFsync(false)
+ .setMaxOpenFiles(-1)
+ .setDisableDataSync(true)
+ .setCreateIfMissing(true)
+ .setMergeOperator(new StringAppendOperator());
+
+ final WriteOptions write_options = new WriteOptions()
+ .setSync(false)
+ .setDisableWAL(true);
+
+ final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
+
+ final String key = "key";
+ final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+ final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+ final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
+
+ final Unsafe unsafe = MemoryUtils.UNSAFE;
+ final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
+
+ final int num = 50000;
+ System.out.println("begin insert");
+
+ final long beginInsert = System.nanoTime();
+ for (int i = 0; i < num; i++) {
+ unsafe.putInt(keyTemplate, offset, i);
+ rocksDB.put(write_options, keyTemplate, valueBytes);
+ }
+ final long endInsert = System.nanoTime();
+ System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
+
+ final byte[] resultHolder = new byte[num * valueBytes.length];
+
+ final long beginGet = System.nanoTime();
+
+ final RocksIterator iterator = rocksDB.newIterator();
+ int pos = 0;
+
+ // seek to start
+ unsafe.putInt(keyTemplate, offset, 0);
+ iterator.seek(keyTemplate);
+
+ // mark end
+ unsafe.putInt(keyTemplate, offset, -1);
+
+ // iterate
+ while (iterator.isValid()) {
+ byte[] currKey = iterator.key();
+ if (samePrefix(keyBytes, currKey)) {
+ byte[] currValue = iterator.value();
+ System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
+ pos += currValue.length;
+ iterator.next();
+ }
+ else {
+ break;
+ }
+ }
+
+ final long endGet = System.nanoTime();
+
+ System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
+ }
+
+ private static boolean samePrefix(byte[] prefix, byte[] key) {
+ for (int i = 0; i < prefix.length; i++) {
+ if (prefix[i] != key [i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}