You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/01/13 14:08:45 UTC

[flink-benchmarks] 02/03: [FLINK-24918][Runtime/State Backends]Support to specify the data dir for state benchmark

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git

commit 9af64a13fa94a5ca2c4767948823ed3ea8513805
Author: 愚鲤 <yu...@alibaba-inc.com>
AuthorDate: Sun Nov 21 16:14:05 2021 +0800

    [FLINK-24918][Runtime/State Backends]Support to specify the data dir for state benchmark
---
 README.md                                          |  28 +++++-
 .../java/org/apache/flink/config/ConfigUtil.java   | 102 +++++++++++++++++++++
 .../apache/flink/config/StateBenchmarkOptions.java |  32 +++++++
 .../flink/state/benchmark/ListStateBenchmark.java  |   3 +-
 .../flink/state/benchmark/MapStateBenchmark.java   |   3 +-
 .../flink/state/benchmark/StateBenchmarkBase.java  |  21 ++++-
 .../flink/state/benchmark/ValueStateBenchmark.java |   3 +-
 src/main/resources/benchmark-conf.yaml             |  21 +++++
 .../org/apache/flink/config/ConfigUtilTest.java    |  34 +++++++
 src/test/resources/benchmark-conf.yaml             |  19 ++++
 10 files changed, 257 insertions(+), 9 deletions(-)

diff --git a/README.md b/README.md
index 9dc583c..3af9581 100644
--- a/README.md
+++ b/README.md
@@ -21,7 +21,7 @@ mvn clean package exec:exec \
 ```
 
 If you want to execute just one benchmark, the best approach is to execute selected main function manually.
-There're mainly two ways:
+There're mainly three ways:
 
 1. From your IDE (hint there is a plugin for Intellij IDEA).
    * In this case don't forget about selecting `flink.version`, default value for the property is defined in pom.xml.
@@ -31,7 +31,14 @@ There're mainly two ways:
    mvn -Dflink.version=<FLINK_VERSION> clean package exec:exec \
     -Dbenchmarks="<benchmark_class>"
    ```
-An example flink version can be -Dflink.version=1.12-SNAPSHOT.
+
+    An example flink version can be -Dflink.version=1.12-SNAPSHOT.
+
+3. Run the uber jar directly like:
+
+    ```
+    java -jar target/benchmarks.jar -rf csv "<benchmark_class>"
+    ```
 
 We also support to run each benchmark once (with only one fork and one iteration) for testing, with below command:
 
@@ -39,6 +46,23 @@ We also support to run each benchmark once (with only one fork and one iteration
 mvn test -P test
 ```
 
+## Parameters
+
+There are some built-in parameters to run different benchmarks, these can be shown/overridden from the command line.
+
+```
+# show all the parameters combination for the <benchmark_class> 
+java -jar target/benchmarks.jar "<benchmark_class>" -lp
+
+# run benchmark for rocksdb state backend type 
+java -jar target/benchmarks.jar "org.apache.flink.state.benchmark.*" -p "backendType=ROCKSDB" 
+```
+
+## Configuration
+
+Besides the parameters, there is also a benchmark config file `benchmark-conf.yaml` to tune some basic parameters. 
+For example, we can change the state data dir by putting `benchmark.state.data-dir: /data` in the config file. For more options, you can refer to the code in the `org.apache.flink.config` package. 
+
 ## Prerequisites
 
 The recent addition of OpenSSL-based benchmarks require one of two modes to be active:
diff --git a/src/main/java/org/apache/flink/config/ConfigUtil.java b/src/main/java/org/apache/flink/config/ConfigUtil.java
new file mode 100644
index 0000000..b4e670f
--- /dev/null
+++ b/src/main/java/org/apache/flink/config/ConfigUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+public class ConfigUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigUtil.class);
+
+    private static final String BENCHMARK_CONF = "benchmark-conf.yaml";
+
+    /** Load benchmark conf from classpath. */
+    public static Configuration loadBenchMarkConf() {
+        InputStream inputStream =
+                ConfigUtil.class.getClassLoader().getResourceAsStream(BENCHMARK_CONF);
+        return loadYAMLResource(inputStream);
+    }
+
+    /**
+     * This is copied from {@code GlobalConfiguration#loadYAMLResource} to avoid depending
+     * on @Internal api.
+     */
+    private static Configuration loadYAMLResource(InputStream inputStream) {
+        final Configuration config = new Configuration();
+
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
+
+            String line;
+            int lineNo = 0;
+            while ((line = reader.readLine()) != null) {
+                lineNo++;
+                // 1. check for comments
+                String[] comments = line.split("#", 2);
+                String conf = comments[0].trim();
+
+                // 2. get key and value
+                if (conf.length() > 0) {
+                    String[] kv = conf.split(": ", 2);
+
+                    // skip line with no valid key-value pair
+                    if (kv.length == 1) {
+                        LOG.warn(
+                                "Error while trying to split key and value in configuration file "
+                                        + ":"
+                                        + lineNo
+                                        + ": \""
+                                        + line
+                                        + "\"");
+                        continue;
+                    }
+
+                    String key = kv[0].trim();
+                    String value = kv[1].trim();
+
+                    // sanity check
+                    if (key.length() == 0 || value.length() == 0) {
+                        LOG.warn(
+                                "Error after splitting key and value in configuration file "
+                                        + ":"
+                                        + lineNo
+                                        + ": \""
+                                        + line
+                                        + "\"");
+                        continue;
+                    }
+
+                    LOG.info("Loading configuration property: {}, {}", key, value);
+                    config.setString(key, value);
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Error parsing YAML configuration.", e);
+        }
+
+        return config;
+    }
+}
diff --git a/src/main/java/org/apache/flink/config/StateBenchmarkOptions.java b/src/main/java/org/apache/flink/config/StateBenchmarkOptions.java
new file mode 100644
index 0000000..d54e31a
--- /dev/null
+++ b/src/main/java/org/apache/flink/config/StateBenchmarkOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.config;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+public class StateBenchmarkOptions {
+
+    public static final ConfigOption<String> STATE_DATA_DIR =
+            key("benchmark.state.data-dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The dir to put state data.");
+}
diff --git a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
index 0690eb2..7fea1eb 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys;
 import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.compactState;
-import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.createKeyedStateBackend;
 import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.getListState;
 import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount;
 import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
@@ -64,7 +63,7 @@ public class ListStateBenchmark extends StateBenchmarkBase {
 
     @Setup
     public void setUp() throws Exception {
-        keyedStateBackend = createKeyedStateBackend(backendType);
+        keyedStateBackend = createKeyedStateBackend();
         listState = getListState(keyedStateBackend, STATE_DESC);
         dummyLists = new ArrayList<>(listValueCount);
         for (int i = 0; i < listValueCount; ++i) {
diff --git a/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java
index 52494c0..044d6b1 100644
--- a/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java
@@ -36,7 +36,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.createKeyedStateBackend;
 import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.getMapState;
 import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount;
 import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys;
@@ -59,7 +58,7 @@ public class MapStateBenchmark extends StateBenchmarkBase {
 
     @Setup
     public void setUp() throws Exception {
-        keyedStateBackend = createKeyedStateBackend(backendType);
+        keyedStateBackend = createKeyedStateBackend();
         mapState =
                 getMapState(
                         keyedStateBackend,
diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
index c053f29..61f1c2c 100644
--- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
@@ -18,6 +18,9 @@
 package org.apache.flink.state.benchmark;
 
 import org.apache.flink.benchmark.BenchmarkBase;
+import org.apache.flink.config.ConfigUtil;
+import org.apache.flink.config.StateBenchmarkOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 
@@ -28,7 +31,10 @@ import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.TearDown;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
@@ -52,10 +58,23 @@ public class StateBenchmarkBase extends BenchmarkBase {
     final ThreadLocalRandom random = ThreadLocalRandom.current();
 
     @Param({"HEAP", "ROCKSDB"})
-    protected StateBackendBenchmarkUtils.StateBackendType backendType;
+    private StateBackendBenchmarkUtils.StateBackendType backendType;
 
     KeyedStateBackend<Long> keyedStateBackend;
 
+    protected KeyedStateBackend<Long> createKeyedStateBackend() throws Exception {
+        Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
+        String stateDataDirPath = benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR);
+        File dataDir = null;
+        if (stateDataDirPath != null) {
+            dataDir = new File(stateDataDirPath);
+            if (!dataDir.exists()) {
+                Files.createDirectories(Paths.get(stateDataDirPath));
+            }
+        }
+        return StateBackendBenchmarkUtils.createKeyedStateBackend(backendType, dataDir);
+    }
+
     private static int getCurrentIndex() {
         int currentIndex = keyIndex.getAndIncrement();
         if (currentIndex == Integer.MAX_VALUE) {
diff --git a/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java
index 92350d0..eacd12c 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java
@@ -32,7 +32,6 @@ import org.openjdk.jmh.runner.options.VerboseMode;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.createKeyedStateBackend;
 import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.getValueState;
 import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
 
@@ -52,7 +51,7 @@ public class ValueStateBenchmark extends StateBenchmarkBase {
 
     @Setup
     public void setUp() throws Exception {
-        keyedStateBackend = createKeyedStateBackend(backendType);
+        keyedStateBackend = createKeyedStateBackend();
         valueState =
                 getValueState(keyedStateBackend, new ValueStateDescriptor<>("kvState", Long.class));
         for (int i = 0; i < setupKeyCount; ++i) {
diff --git a/src/main/resources/benchmark-conf.yaml b/src/main/resources/benchmark-conf.yaml
new file mode 100644
index 0000000..174acc5
--- /dev/null
+++ b/src/main/resources/benchmark-conf.yaml
@@ -0,0 +1,21 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# The dir to put state data during test. If not set, the system default temp dir will be used.
+
+#benchmark.state.data-dir: /tmp/flink-benchmark
\ No newline at end of file
diff --git a/src/test/java/org/apache/flink/config/ConfigUtilTest.java b/src/test/java/org/apache/flink/config/ConfigUtilTest.java
new file mode 100644
index 0000000..9f3cbaa
--- /dev/null
+++ b/src/test/java/org/apache/flink/config/ConfigUtilTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConfigUtilTest {
+
+    @Test
+    public void testLoadConf() {
+        Configuration cfg = ConfigUtil.loadBenchMarkConf();
+        String dir = cfg.getString(StateBenchmarkOptions.STATE_DATA_DIR);
+        Assert.assertEquals("/tmp/data", dir);
+    }
+}
diff --git a/src/test/resources/benchmark-conf.yaml b/src/test/resources/benchmark-conf.yaml
new file mode 100644
index 0000000..267da86
--- /dev/null
+++ b/src/test/resources/benchmark-conf.yaml
@@ -0,0 +1,19 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+benchmark.state.data-dir: /tmp/data
\ No newline at end of file