You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/01/13 14:08:45 UTC
[flink-benchmarks] 02/03: [FLINK-24918][Runtime/State Backends]Support to specify the data dir for state benchmark
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
commit 9af64a13fa94a5ca2c4767948823ed3ea8513805
Author: 愚鲤 <yu...@alibaba-inc.com>
AuthorDate: Sun Nov 21 16:14:05 2021 +0800
[FLINK-24918][Runtime/State Backends]Support to specify the data dir for state benchmark
---
README.md | 28 +++++-
.../java/org/apache/flink/config/ConfigUtil.java | 102 +++++++++++++++++++++
.../apache/flink/config/StateBenchmarkOptions.java | 32 +++++++
.../flink/state/benchmark/ListStateBenchmark.java | 3 +-
.../flink/state/benchmark/MapStateBenchmark.java | 3 +-
.../flink/state/benchmark/StateBenchmarkBase.java | 21 ++++-
.../flink/state/benchmark/ValueStateBenchmark.java | 3 +-
src/main/resources/benchmark-conf.yaml | 21 +++++
.../org/apache/flink/config/ConfigUtilTest.java | 34 +++++++
src/test/resources/benchmark-conf.yaml | 19 ++++
10 files changed, 257 insertions(+), 9 deletions(-)
diff --git a/README.md b/README.md
index 9dc583c..3af9581 100644
--- a/README.md
+++ b/README.md
@@ -21,7 +21,7 @@ mvn clean package exec:exec \
```
If you want to execute just one benchmark, the best approach is to execute selected main function manually.
-There're mainly two ways:
+There're mainly three ways:
1. From your IDE (hint there is a plugin for Intellij IDEA).
* In this case don't forget about selecting `flink.version`, default value for the property is defined in pom.xml.
@@ -31,7 +31,14 @@ There're mainly two ways:
mvn -Dflink.version=<FLINK_VERSION> clean package exec:exec \
-Dbenchmarks="<benchmark_class>"
```
-An example flink version can be -Dflink.version=1.12-SNAPSHOT.
+
+ An example flink version can be -Dflink.version=1.12-SNAPSHOT.
+
+3. Run the uber jar directly like:
+
+ ```
+ java -jar target/benchmarks.jar -rf csv "<benchmark_class>"
+ ```
We also support to run each benchmark once (with only one fork and one iteration) for testing, with below command:
@@ -39,6 +46,23 @@ We also support to run each benchmark once (with only one fork and one iteration
mvn test -P test
```
+## Parameters
+
+There are some built-in parameters to run different benchmarks, these can be shown/overridden from the command line.
+
+```
+# show all the parameters combination for the <benchmark_class>
+java -jar target/benchmarks.jar "<benchmark_class>" -lp
+
+# run benchmark for rocksdb state backend type
+java -jar target/benchmarks.jar "org.apache.flink.state.benchmark.*" -p "backendType=ROCKSDB"
+```
+
+## Configuration
+
+Besides the parameters, there is also a benchmark config file `benchmark-conf.yaml` to tune some basic parameters.
+For example, we can change the state data dir by putting `benchmark.state.data-dir: /data` in the config file. For more options, you can refer to the code in the `org.apache.flink.config` package.
+
## Prerequisites
The recent addition of OpenSSL-based benchmarks require one of two modes to be active:
diff --git a/src/main/java/org/apache/flink/config/ConfigUtil.java b/src/main/java/org/apache/flink/config/ConfigUtil.java
new file mode 100644
index 0000000..b4e670f
--- /dev/null
+++ b/src/main/java/org/apache/flink/config/ConfigUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+public class ConfigUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigUtil.class);
+
+ private static final String BENCHMARK_CONF = "benchmark-conf.yaml";
+
+ /** Load benchmark conf from classpath. */
+ public static Configuration loadBenchMarkConf() {
+ InputStream inputStream =
+ ConfigUtil.class.getClassLoader().getResourceAsStream(BENCHMARK_CONF);
+ return loadYAMLResource(inputStream);
+ }
+
+ /**
+ * This is copied from {@code GlobalConfiguration#loadYAMLResource} to avoid depending
+ * on @Internal api.
+ */
+ private static Configuration loadYAMLResource(InputStream inputStream) {
+ final Configuration config = new Configuration();
+
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
+
+ String line;
+ int lineNo = 0;
+ while ((line = reader.readLine()) != null) {
+ lineNo++;
+ // 1. check for comments
+ String[] comments = line.split("#", 2);
+ String conf = comments[0].trim();
+
+ // 2. get key and value
+ if (conf.length() > 0) {
+ String[] kv = conf.split(": ", 2);
+
+ // skip line with no valid key-value pair
+ if (kv.length == 1) {
+ LOG.warn(
+ "Error while trying to split key and value in configuration file "
+ + ":"
+ + lineNo
+ + ": \""
+ + line
+ + "\"");
+ continue;
+ }
+
+ String key = kv[0].trim();
+ String value = kv[1].trim();
+
+ // sanity check
+ if (key.length() == 0 || value.length() == 0) {
+ LOG.warn(
+ "Error after splitting key and value in configuration file "
+ + ":"
+ + lineNo
+ + ": \""
+ + line
+ + "\"");
+ continue;
+ }
+
+ LOG.info("Loading configuration property: {}, {}", key, value);
+ config.setString(key, value);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error parsing YAML configuration.", e);
+ }
+
+ return config;
+ }
+}
diff --git a/src/main/java/org/apache/flink/config/StateBenchmarkOptions.java b/src/main/java/org/apache/flink/config/StateBenchmarkOptions.java
new file mode 100644
index 0000000..d54e31a
--- /dev/null
+++ b/src/main/java/org/apache/flink/config/StateBenchmarkOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.config;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+public class StateBenchmarkOptions {
+
+ public static final ConfigOption<String> STATE_DATA_DIR =
+ key("benchmark.state.data-dir")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The dir to put state data.");
+}
diff --git a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
index 0690eb2..7fea1eb 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys;
import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.compactState;
-import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.createKeyedStateBackend;
import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.getListState;
import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount;
import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
@@ -64,7 +63,7 @@ public class ListStateBenchmark extends StateBenchmarkBase {
@Setup
public void setUp() throws Exception {
- keyedStateBackend = createKeyedStateBackend(backendType);
+ keyedStateBackend = createKeyedStateBackend();
listState = getListState(keyedStateBackend, STATE_DESC);
dummyLists = new ArrayList<>(listValueCount);
for (int i = 0; i < listValueCount; ++i) {
diff --git a/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java
index 52494c0..044d6b1 100644
--- a/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java
@@ -36,7 +36,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.createKeyedStateBackend;
import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.getMapState;
import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount;
import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys;
@@ -59,7 +58,7 @@ public class MapStateBenchmark extends StateBenchmarkBase {
@Setup
public void setUp() throws Exception {
- keyedStateBackend = createKeyedStateBackend(backendType);
+ keyedStateBackend = createKeyedStateBackend();
mapState =
getMapState(
keyedStateBackend,
diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
index c053f29..61f1c2c 100644
--- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
@@ -18,6 +18,9 @@
package org.apache.flink.state.benchmark;
import org.apache.flink.benchmark.BenchmarkBase;
+import org.apache.flink.config.ConfigUtil;
+import org.apache.flink.config.StateBenchmarkOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils;
import org.apache.flink.runtime.state.KeyedStateBackend;
@@ -28,7 +31,10 @@ import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
+import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@@ -52,10 +58,23 @@ public class StateBenchmarkBase extends BenchmarkBase {
final ThreadLocalRandom random = ThreadLocalRandom.current();
@Param({"HEAP", "ROCKSDB"})
- protected StateBackendBenchmarkUtils.StateBackendType backendType;
+ private StateBackendBenchmarkUtils.StateBackendType backendType;
KeyedStateBackend<Long> keyedStateBackend;
+ protected KeyedStateBackend<Long> createKeyedStateBackend() throws Exception {
+ Configuration benchMarkConfig = ConfigUtil.loadBenchMarkConf();
+ String stateDataDirPath = benchMarkConfig.getString(StateBenchmarkOptions.STATE_DATA_DIR);
+ File dataDir = null;
+ if (stateDataDirPath != null) {
+ dataDir = new File(stateDataDirPath);
+ if (!dataDir.exists()) {
+ Files.createDirectories(Paths.get(stateDataDirPath));
+ }
+ }
+ return StateBackendBenchmarkUtils.createKeyedStateBackend(backendType, dataDir);
+ }
+
private static int getCurrentIndex() {
int currentIndex = keyIndex.getAndIncrement();
if (currentIndex == Integer.MAX_VALUE) {
diff --git a/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java
index 92350d0..eacd12c 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java
@@ -32,7 +32,6 @@ import org.openjdk.jmh.runner.options.VerboseMode;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.createKeyedStateBackend;
import static org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils.getValueState;
import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
@@ -52,7 +51,7 @@ public class ValueStateBenchmark extends StateBenchmarkBase {
@Setup
public void setUp() throws Exception {
- keyedStateBackend = createKeyedStateBackend(backendType);
+ keyedStateBackend = createKeyedStateBackend();
valueState =
getValueState(keyedStateBackend, new ValueStateDescriptor<>("kvState", Long.class));
for (int i = 0; i < setupKeyCount; ++i) {
diff --git a/src/main/resources/benchmark-conf.yaml b/src/main/resources/benchmark-conf.yaml
new file mode 100644
index 0000000..174acc5
--- /dev/null
+++ b/src/main/resources/benchmark-conf.yaml
@@ -0,0 +1,21 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# The dir to put state data during test. If not set, the system default temp dir will be used.
+
+#benchmark.state.data-dir: /tmp/flink-benchmark
\ No newline at end of file
diff --git a/src/test/java/org/apache/flink/config/ConfigUtilTest.java b/src/test/java/org/apache/flink/config/ConfigUtilTest.java
new file mode 100644
index 0000000..9f3cbaa
--- /dev/null
+++ b/src/test/java/org/apache/flink/config/ConfigUtilTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConfigUtilTest {
+
+ @Test
+ public void testLoadConf() {
+ Configuration cfg = ConfigUtil.loadBenchMarkConf();
+ String dir = cfg.getString(StateBenchmarkOptions.STATE_DATA_DIR);
+ Assert.assertEquals("/tmp/data", dir);
+ }
+}
diff --git a/src/test/resources/benchmark-conf.yaml b/src/test/resources/benchmark-conf.yaml
new file mode 100644
index 0000000..267da86
--- /dev/null
+++ b/src/test/resources/benchmark-conf.yaml
@@ -0,0 +1,19 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+benchmark.state.data-dir: /tmp/data
\ No newline at end of file