You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/09/25 17:38:16 UTC
[bookkeeper] branch master updated: [TOOLS] Add a perf tool for
benchmarking bookkeeper
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new c02e0fb [TOOLS] Add a perf tool for benchmarking bookkeeper
c02e0fb is described below
commit c02e0fbd2a0618b9a4e40c995a186f003ab9070b
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Tue Sep 25 10:38:12 2018 -0700
[TOOLS] Add a perf tool for benchmarking bookkeeper
Descriptions of the changes in this PR:
### Motivation
when investigating performance on scanning entries in bookkeeper, it is hard to know what is the expectation of what kind of throughput that application can get. we need a perf tool to understand the baseline of performance that bookkeeper can provide.
### Changes
this tool is following what pulsar perf tool is doing and using dlog library for evaluating the write and read throughput on bookkeeper using one (or a few) dlog streams.
Author: Sijie Guo <si...@apache.org>
Author: Qi Wang <42...@users.noreply.github.com>
Author: Charan Reddy Guttapalem <re...@gmail.com>
Author: Sijie Guo <gu...@gmail.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
This closes #1697 from sijie/bkperf
---
bin/bkperf | 66 ++++
conf/log4j.cli.properties | 1 +
pom.xml | 8 +
tools/perf/README.md | 100 +++++
tools/perf/pom.xml | 43 +++
.../org/apache/bookkeeper/tools/perf/BKPerf.java | 46 +++
.../tools/perf/DlogPerfCommandGroup.java | 43 +++
.../bookkeeper/tools/perf/PerfCommandGroup.java | 25 ++
.../bookkeeper/tools/perf/dlog/PerfReader.java | 290 +++++++++++++++
.../bookkeeper/tools/perf/dlog/PerfWriter.java | 413 +++++++++++++++++++++
.../bookkeeper/tools/perf/dlog/ReadCommand.java | 58 +++
.../bookkeeper/tools/perf/dlog/WriteCommand.java | 58 +++
.../bookkeeper/tools/perf/dlog/package-info.java | 18 +
.../apache/bookkeeper/tools/perf/package-info.java | 18 +
.../tools/perf/utils/PaddingDecimalFormat.java | 85 +++++
.../bookkeeper/tools/perf/utils/package-info.java | 18 +
tools/pom.xml | 1 +
17 files changed, 1291 insertions(+)
diff --git a/bin/bkperf b/bin/bkperf
new file mode 100755
index 0000000..6ec2909
--- /dev/null
+++ b/bin/bkperf
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+# BookKeeper Perf Tool (experimental)
+
+BINDIR=`dirname "$0"`
+BK_HOME=`cd ${BINDIR}/..;pwd`
+
+source ${BK_HOME}/bin/common.sh
+source ${BK_HOME}/conf/bk_cli_env.sh
+
+CLI_MODULE_PATH=tools/perf
+CLI_MODULE_NAME="(org.apache.bookkeeper-)?bookkeeper-perf"
+CLI_MODULE_HOME=${BK_HOME}/${CLI_MODULE_PATH}
+
+# find the module jar
+CLI_JAR=$(find_module_jar ${CLI_MODULE_PATH} ${CLI_MODULE_NAME})
+
+# set up the classpath
+CLI_CLASSPATH=$(set_module_classpath ${CLI_MODULE_PATH})
+
+DEFAULT_CONF=${BK_HOME}/conf/bk_server.conf
+if [ -z "${CLI_CONF}" ]; then
+ CLI_CONF=${DEFAULT_CONF}
+fi
+
+DEFAULT_LOG_CONF=${BK_HOME}/conf/log4j.cli.properties
+if [ -z "${CLI_LOG_CONF}" ]; then
+ CLI_LOG_CONF=${DEFAULT_LOG_CONF}
+fi
+CLI_LOG_DIR=${CLI_LOG_DIR:-"$BK_HOME/logs"}
+CLI_LOG_FILE=${CLI_LOG_FILE:-"bkperf.log"}
+CLI_ROOT_LOGGER=${CLI_ROOT_LOGGER:-"INFO,ROLLINGFILE"}
+
+# Configure the classpath
+CLI_CLASSPATH="$CLI_JAR:$CLI_CLASSPATH:$CLI_EXTRA_CLASSPATH"
+CLI_CLASSPATH="`dirname $CLI_LOG_CONF`:$CLI_CLASSPATH"
+
+# Build the OPTs
+BOOKIE_OPTS=$(build_bookie_opts)
+GC_OPTS=$(build_cli_jvm_opts ${CLI_LOG_DIR} "bkperf-gc.log")
+NETTY_OPTS=$(build_netty_opts)
+LOGGING_OPTS=$(build_cli_logging_opts ${CLI_LOG_CONF} ${CLI_LOG_DIR} ${CLI_LOG_FILE} ${CLI_ROOT_LOGGER})
+
+OPTS="${OPTS} -cp ${CLI_CLASSPATH} ${BOOKIE_OPTS} ${GC_OPTS} ${NETTY_OPTS} ${LOGGING_OPTS} ${CLI_EXTRA_OPTS}"
+
+#Change to BK_HOME to support relative paths
+cd "$BK_HOME"
+exec ${JAVA} ${OPTS} org.apache.bookkeeper.tools.perf.BKPerf --conf ${CLI_CONF} $@
diff --git a/conf/log4j.cli.properties b/conf/log4j.cli.properties
index 51c95f5..ceb77cc 100644
--- a/conf/log4j.cli.properties
+++ b/conf/log4j.cli.properties
@@ -55,5 +55,6 @@ log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{
log4j.logger.verbose=INFO,VERBOSECONSOLE
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.bookkeeper=ERROR
+log4j.logger.org.apache.bookkeeper.tools=INFO
log4j.logger.org.apache.bookkeeper.bookie.BookieShell=INFO
log4j.logger.org.apache.bookkeeper.client.BookKeeperAdmin=INFO
diff --git a/pom.xml b/pom.xml
index e5a3ffd..5570259 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,6 +130,7 @@
<guava.version>21.0</guava.version>
<hadoop.version>2.7.3</hadoop.version>
<hamcrest.version>1.3</hamcrest.version>
+ <hdrhistogram.version>2.1.10</hdrhistogram.version>
<jackson.version>2.8.9</jackson.version>
<jackson-mapper-asl.version>1.9.11</jackson-mapper-asl.version>
<jcommander.version>1.48</jcommander.version>
@@ -543,6 +544,13 @@
<version>${jcommander.version}</version>
</dependency>
+ <!-- pref dependencies -->
+ <dependency>
+ <groupId>org.hdrhistogram</groupId>
+ <artifactId>HdrHistogram</artifactId>
+ <version>${hdrhistogram.version}</version>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
<groupId>junit</groupId>
diff --git a/tools/perf/README.md b/tools/perf/README.md
new file mode 100644
index 0000000..945d80c
--- /dev/null
+++ b/tools/perf/README.md
@@ -0,0 +1,100 @@
+## BookKeeper Perf Tool
+
+### Dlog
+
+```shell
+$ bin/bkperf dlog
+Commands on evaluating performance of distributedlog library
+
+Usage: bkperf dlog [command] [command options]
+
+Commands:
+
+ read Read log records to distributedlog streams
+ write Write log records to distributedlog streams
+
+ help Display help information about it
+```
+
+#### Write records to logs
+
+```shell
+$ bin/bkperf dlog write -h
+Write log records to distributedlog streams
+
+Usage: bkperf dlog write [flags]
+
+Flags:
+
+ -a, --ack-quorum-size
+ Ledger ack quorum size
+
+ -e, --ensemble-size
+ Ledger ensemble size
+
+ -ln, --log-name
+ Log name or log name pattern if more than 1 log is specified at
+ `--num-logs`
+
+ -b, --num-bytes
+ Number of bytes to write in total. If 0, it will keep writing
+
+ -l, --num-logs
+ Number of log streams
+
+ -n, --num-records
+ Number of records to write in total. If 0, it will keep writing
+
+ -r, --rate
+ Write rate bytes/s across log streams
+
+ -rs, --record-size
+ Log record size
+
+ --threads
+ Number of threads writing
+
+ -w, --write-quorum-size
+ Ledger write quorum size
+
+
+ -h, --help
+ Display help information
+```
+
+Example: write to log stream `test-log` at `100mb/second`, using 1-bookie ensemble.
+
+```shell
+$ bin/bkperf dlog write -w 1 -a 1 -e 1 -r 104857600 --log-name test-log
+```
+
+### Read records from logs
+
+```shell
+$ bin/bkperf dlog read -h
+Read log records from distributedlog streams
+
+Usage: bkperf dlog read [flags]
+
+Flags:
+
+ -ln, --log-name
+ Log name or log name pattern if more than 1 log is specified at
+ `--num-logs`
+
+ -l, --num-logs
+ Number of log streams
+
+ --threads
+ Number of threads reading
+
+
+ -h, --help
+ Display help information
+```
+
+Example: read from log stream `test-log-000000`.
+
+```shell
+$ bin/bkperf dlog read --log-name test-log-000000
+```
diff --git a/tools/perf/pom.xml b/tools/perf/pom.xml
new file mode 100644
index 0000000..d3a617f
--- /dev/null
+++ b/tools/perf/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-tools-parent</artifactId>
+ <version>4.9.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>bookkeeper-perf</artifactId>
+ <name>Apache BookKeeper :: Tools :: Perf</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-tools-framework</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hdrhistogram</groupId>
+ <artifactId>HdrHistogram</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java
new file mode 100644
index 0000000..23021ee
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.Cli;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+
+/**
+ * <b>bkperf</b> evaluates the performance of <i>Apache BookKeeper</i> cluster.
+ */
+@Slf4j
+public class BKPerf {
+
+ public static final String NAME = "bkperf";
+
+ @SuppressWarnings("unchecked")
+ public static void main(String[] args) {
+ CliSpec.Builder<BKFlags> specBuilder = CliSpec.<BKFlags>newBuilder()
+ .withName(NAME)
+ .withUsage(NAME + " [flags] [command group] [commands]")
+ .withDescription(NAME + " evaluates the performance of Apache BookKeeper clusters")
+ .withFlags(new BKFlags())
+ .withConsole(System.out)
+ .addCommand(new DlogPerfCommandGroup());
+
+ CliSpec<BKFlags> spec = specBuilder.build();
+
+ int retCode = Cli.runCli(spec, args);
+ Runtime.getRuntime().exit(retCode);
+ }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
new file mode 100644
index 0000000..75adc4b
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf;
+
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.dlog.ReadCommand;
+import org.apache.bookkeeper.tools.perf.dlog.WriteCommand;
+
+/**
+ * Commands that evaluate performance of distributedlog library.
+ */
+public class DlogPerfCommandGroup extends CliCommandGroup<BKFlags> implements PerfCommandGroup<BKFlags> {
+
+ private static final String NAME = "dlog";
+ private static final String DESC = "Commands on evaluating performance of distributedlog library";
+
+ private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+ .withName(NAME)
+ .withDescription(DESC)
+ .withParent(BKPerf.NAME)
+ .addCommand(new WriteCommand())
+ .addCommand(new ReadCommand())
+ .build();
+
+ public DlogPerfCommandGroup() {
+ super(spec);
+ }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/PerfCommandGroup.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/PerfCommandGroup.java
new file mode 100644
index 0000000..c64d77f
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/PerfCommandGroup.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf;
+
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CommandGroup;
+
+/**
+ * A command group that group commands together for performance evaluations.
+ */
+public interface PerfCommandGroup<GlobalFlagsT extends CliFlags>
+ extends CommandGroup<GlobalFlagsT> {
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
new file mode 100644
index 0000000..7497d7d
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import com.beust.jcommander.Parameter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+
+/**
+ * A perf writer to evaluate write performance.
+ */
+@Slf4j
+public class PerfReader implements Runnable {
+
+ /**
+ * Flags for the write command.
+ */
+ public static class Flags extends CliFlags {
+
+ @Parameter(
+ names = {
+ "-ln", "--log-name"
+ },
+ description = "Log name or log name pattern if more than 1 log is specified at `--num-logs`")
+ public String logName = "test-log-%06d";
+
+ @Parameter(
+ names = {
+ "-l", "--num-logs"
+ },
+ description = "Number of log streams")
+ public int numLogs = 1;
+
+ @Parameter(
+ names = {
+ "-t", "--threads"
+ },
+ description = "Number of threads reading")
+ public int numThreads = 1;
+
+ @Parameter(
+ names = {
+ "-mr", "--max-readahead-records"
+ },
+ description = "Max readhead records")
+ public int maxReadAheadRecords = 1000000;
+
+ @Parameter(
+ names = {
+ "-bs", "--readahead-batch-size"
+ },
+ description = "ReadAhead Batch Size, in entries"
+ )
+ public int readAheadBatchSize = 4;
+
+ }
+
+
+ // stats
+ private final LongAdder recordsRead = new LongAdder();
+ private final LongAdder bytesRead = new LongAdder();
+
+ private final ServiceURI serviceURI;
+ private final Flags flags;
+ private final Recorder recorder = new Recorder(
+ TimeUnit.SECONDS.toMillis(120000), 5
+ );
+ private final Recorder cumulativeRecorder = new Recorder(
+ TimeUnit.SECONDS.toMillis(120000), 5
+ );
+ private final AtomicBoolean isDone = new AtomicBoolean(false);
+
+ PerfReader(ServiceURI serviceURI, Flags flags) {
+ this.serviceURI = serviceURI;
+ this.flags = flags;
+ }
+
+ @Override
+ public void run() {
+ try {
+ execute();
+ } catch (Exception e) {
+ log.error("Encountered exception at running dlog perf writer", e);
+ }
+ }
+
+ void execute() throws Exception {
+ ObjectMapper m = new ObjectMapper();
+ ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+ log.info("Starting dlog perf reader with config : {}", w.writeValueAsString(flags));
+
+ DistributedLogConfiguration conf = newDlogConf(flags);
+ try (Namespace namespace = NamespaceBuilder.newBuilder()
+ .conf(conf)
+ .uri(serviceURI.getUri())
+ .build()) {
+ execute(namespace);
+ }
+ }
+
+ void execute(Namespace namespace) throws Exception {
+ List<Pair<Integer, DistributedLogManager>> managers = new ArrayList<>(flags.numLogs);
+ for (int i = 0; i < flags.numLogs; i++) {
+ String logName = String.format(flags.logName, i);
+ managers.add(Pair.of(i, namespace.openLog(logName)));
+ }
+ log.info("Successfully open {} logs", managers.size());
+
+ // register shutdown hook to aggregate stats
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ isDone.set(true);
+ printAggregatedStats(cumulativeRecorder);
+ }));
+
+ ExecutorService executor = Executors.newFixedThreadPool(flags.numThreads);
+ try {
+ for (int i = 0; i < flags.numThreads; i++) {
+ final int idx = i;
+ final List<DistributedLogManager> logsThisThread = managers
+ .stream()
+ .filter(pair -> pair.getLeft() % flags.numThreads == idx)
+ .map(pair -> pair.getRight())
+ .collect(Collectors.toList());
+ executor.submit(() -> {
+ try {
+ read(logsThisThread);
+ } catch (Exception e) {
+ log.error("Encountered error at writing records", e);
+ }
+ });
+ }
+ log.info("Started {} write threads", flags.numThreads);
+ reportStats();
+ } finally {
+ executor.shutdown();
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ managers.forEach(manager -> manager.getRight().asyncClose());
+ }
+ }
+
+ void read(List<DistributedLogManager> logs) throws Exception {
+ log.info("Read thread started with : logs = {}",
+ logs.stream().map(l -> l.getStreamName()).collect(Collectors.toList()));
+
+ List<LogReader> readers = logs.stream()
+ .map(manager -> {
+ try {
+ return manager.openLogReader(DLSN.InitialDLSN);
+ } catch (IOException e) {
+ log.error("Failed to open reader for log stream {}", manager.getStreamName(), e);
+ throw new UncheckedIOException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
+ final int numLogs = logs.size();
+ while (true) {
+ for (int i = 0; i < numLogs; i++) {
+ LogRecordWithDLSN record = readers.get(i).readNext(true);
+ if (null != record) {
+ recordsRead.increment();
+ bytesRead.add(record.getPayloadBuf().readableBytes());
+ }
+ }
+ }
+ }
+
+ void reportStats() {
+ // Print report stats
+ long oldTime = System.nanoTime();
+
+ Histogram reportHistogram = null;
+
+ while (true) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ if (isDone.get()) {
+ break;
+ }
+
+ long now = System.nanoTime();
+ double elapsed = (now - oldTime) / 1e9;
+
+ double rate = recordsRead.sumThenReset() / elapsed;
+ double throughput = bytesRead.sumThenReset() / elapsed / 1024 / 1024;
+
+ reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+
+ log.info("Throughput read : {} records/s --- {} MB/s --- Latency: mean:"
+ + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+ throughputFormat.format(rate), throughputFormat.format(throughput),
+ dec.format(reportHistogram.getMean() / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+ dec.format(reportHistogram.getMaxValue() / 1000.0));
+
+ reportHistogram.reset();
+
+ oldTime = now;
+ }
+
+ }
+
+ private static DistributedLogConfiguration newDlogConf(Flags flags) {
+ return new DistributedLogConfiguration()
+ .setReadAheadBatchSize(flags.readAheadBatchSize)
+ .setReadAheadMaxRecords(flags.maxReadAheadRecords)
+ .setReadAheadWaitTime(200);
+ }
+
+
+ private static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
+ private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+
+ private static void printAggregatedStats(Recorder recorder) {
+ Histogram reportHistogram = recorder.getIntervalHistogram();
+
+ log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {}"
+ + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+ dec.format(reportHistogram.getMean() / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99.999) / 1000.0),
+ dec.format(reportHistogram.getMaxValue() / 1000.0));
+ }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
new file mode 100644
index 0000000..e29dc64
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+
+import com.beust.jcommander.Parameter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.util.concurrent.RateLimiter;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.netty.buffer.Unpooled;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+
+/**
+ * A perf writer to evaluate write performance.
+ */
+@Slf4j
+public class PerfWriter implements Runnable {
+
+ /**
+ * Flags for the write command.
+ */
+ public static class Flags extends CliFlags {
+
+ @Parameter(
+ names = {
+ "-r", "--rate"
+ },
+ description = "Write rate bytes/s across log streams")
+ public int writeRate = 0;
+
+ @Parameter(
+ names = {
+ "-rs", "--record-size"
+ },
+ description = "Log record size")
+ public int recordSize = 1024;
+
+ @Parameter(
+ names = {
+ "-ln", "--log-name"
+ },
+ description = "Log name or log name pattern if more than 1 log is specified at `--num-logs`")
+ public String logName = "test-log-%06d";
+
+ @Parameter(
+ names = {
+ "-l", "--num-logs"
+ },
+ description = "Number of log streams")
+ public int numLogs = 1;
+
+ @Parameter(
+ names = {
+ "-t", "--threads"
+ },
+ description = "Number of threads writing")
+ public int numThreads = 1;
+
+ @Parameter(
+ names = {
+ "-mob", "--max-outstanding-megabytes"
+ },
+ description = "Number of threads writing")
+ public long maxOutstandingMB = 200;
+
+ @Parameter(
+ names = {
+ "-n", "--num-records"
+ },
+ description = "Number of records to write in total. If 0, it will keep writing")
+ public long numRecords = 0;
+
+ @Parameter(
+ names = {
+ "-b", "--num-bytes"
+ },
+ description = "Number of bytes to write in total. If 0, it will keep writing")
+ public long numBytes = 0;
+
+ @Parameter(
+ names = {
+ "-e", "--ensemble-size"
+ },
+ description = "Ledger ensemble size")
+ public int ensembleSize = 1;
+
+ @Parameter(
+ names = {
+ "-w", "--write-quorum-size"
+ },
+ description = "Ledger write quorum size")
+ public int writeQuorumSize = 1;
+
+ @Parameter(
+ names = {
+ "-a", "--ack-quorum-size"
+ },
+ description = "Ledger ack quorum size")
+ public int ackQuorumSize = 1;
+
+ }
+
+
+ // stats
+ private final LongAdder recordsWritten = new LongAdder();
+ private final LongAdder bytesWritten = new LongAdder();
+
+ private final byte[] payload;
+ private final ServiceURI serviceURI;
+ private final Flags flags;
+ private final Recorder recorder = new Recorder(
+ TimeUnit.SECONDS.toMillis(120000), 5
+ );
+ private final Recorder cumulativeRecorder = new Recorder(
+ TimeUnit.SECONDS.toMillis(120000), 5
+ );
+ private final AtomicBoolean isDone = new AtomicBoolean(false);
+
+ PerfWriter(ServiceURI serviceURI, Flags flags) {
+ this.serviceURI = serviceURI;
+ this.flags = flags;
+ this.payload = new byte[flags.recordSize];
+ ThreadLocalRandom.current().nextBytes(payload);
+ }
+
+ @Override
+ public void run() {
+ try {
+ execute();
+ } catch (Exception e) {
+ log.error("Encountered exception at running dlog perf writer", e);
+ }
+ }
+
+ void execute() throws Exception {
+ ObjectMapper m = new ObjectMapper();
+ ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+ log.info("Starting dlog perf writer with config : {}", w.writeValueAsString(flags));
+
+ DistributedLogConfiguration conf = newDlogConf(flags);
+ try (Namespace namespace = NamespaceBuilder.newBuilder()
+ .conf(conf)
+ .uri(serviceURI.getUri())
+ .build()) {
+ execute(namespace);
+ }
+ }
+
+ void execute(Namespace namespace) throws Exception {
+ List<Pair<Integer, DistributedLogManager>> managers = new ArrayList<>(flags.numLogs);
+ for (int i = 0; i < flags.numLogs; i++) {
+ String logName = String.format(flags.logName, i);
+ managers.add(Pair.of(i, namespace.openLog(logName)));
+ }
+ log.info("Successfully open {} logs", managers.size());
+
+ // register shutdown hook to aggregate stats
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ isDone.set(true);
+ printAggregatedStats(cumulativeRecorder);
+ }));
+
+ ExecutorService executor = Executors.newFixedThreadPool(flags.numThreads);
+ try {
+ for (int i = 0; i < flags.numThreads; i++) {
+ final int idx = i;
+ final List<DistributedLogManager> logsThisThread = managers
+ .stream()
+ .filter(pair -> pair.getLeft() % flags.numThreads == idx)
+ .map(pair -> pair.getRight())
+ .collect(Collectors.toList());
+ final long numRecordsForThisThread = flags.numRecords / flags.numThreads;
+ final long numBytesForThisThread = flags.numBytes / flags.numThreads;
+ final double writeRateForThisThread = flags.writeRate / (double) flags.numThreads;
+ final long maxOutstandingBytesForThisThread = flags.maxOutstandingMB * 1024 * 1024 / flags.numThreads;
+ executor.submit(() -> {
+ try {
+ write(
+ logsThisThread,
+ writeRateForThisThread,
+ (int) maxOutstandingBytesForThisThread,
+ numRecordsForThisThread,
+ numBytesForThisThread);
+ } catch (Exception e) {
+ log.error("Encountered error at writing records", e);
+ }
+ });
+ }
+ log.info("Started {} write threads", flags.numThreads);
+ reportStats();
+ } finally {
+ executor.shutdown();
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ managers.forEach(manager -> manager.getRight().asyncClose());
+ }
+ }
+
+ void write(List<DistributedLogManager> logs,
+ double writeRate,
+ int maxOutstandingBytesForThisThread,
+ long numRecordsForThisThread,
+ long numBytesForThisThread) throws Exception {
+ log.info("Write thread started with : logs = {}, rate = {},"
+ + " num records = {}, num bytes = {}, max outstanding bytes = {}",
+ logs.stream().map(l -> l.getStreamName()).collect(Collectors.toList()),
+ writeRate,
+ numRecordsForThisThread,
+ numBytesForThisThread,
+ maxOutstandingBytesForThisThread);
+
+ List<CompletableFuture<AsyncLogWriter>> writerFutures = logs.stream()
+ .map(manager -> manager.openAsyncLogWriter())
+ .collect(Collectors.toList());
+ List<AsyncLogWriter> writers = result(FutureUtils.collect(writerFutures));
+
+ long txid = writers
+ .stream()
+ .mapToLong(writer -> writer.getLastTxId())
+ .max()
+ .orElse(0L);
+ txid = Math.max(0L, txid);
+
+ RateLimiter limiter;
+ if (writeRate > 0) {
+ limiter = RateLimiter.create(writeRate);
+ } else {
+ limiter = null;
+ }
+ final Semaphore semaphore;
+ if (maxOutstandingBytesForThisThread > 0) {
+ semaphore = new Semaphore(maxOutstandingBytesForThisThread);
+ } else {
+ semaphore = null;
+ }
+
+ // Acquire 1 second worth of records to have a slower ramp-up
+ if (limiter != null) {
+ limiter.acquire((int) writeRate);
+ }
+
+ long totalWritten = 0L;
+ long totalBytesWritten = 0L;
+ final int numLogs = logs.size();
+ while (true) {
+ for (int i = 0; i < numLogs; i++) {
+ if (numRecordsForThisThread > 0
+ && totalWritten >= numRecordsForThisThread) {
+ markPerfDone();
+ }
+ if (numBytesForThisThread > 0
+ && totalBytesWritten >= numBytesForThisThread) {
+ markPerfDone();
+ }
+ if (null != semaphore) {
+ semaphore.acquire(payload.length);
+ }
+
+ totalWritten++;
+ totalBytesWritten += payload.length;
+ if (null != limiter) {
+ limiter.acquire(payload.length);
+ }
+ final long sendTime = System.nanoTime();
+ writers.get(i).write(
+ new LogRecord(++txid, Unpooled.wrappedBuffer(payload))
+ ).thenAccept(dlsn -> {
+ if (null != semaphore) {
+ semaphore.release(payload.length);
+ }
+
+ recordsWritten.increment();
+ bytesWritten.add(payload.length);
+
+ long latencyMicros = TimeUnit.NANOSECONDS.toMicros(
+ System.nanoTime() - sendTime
+ );
+ recorder.recordValue(latencyMicros);
+ cumulativeRecorder.recordValue(latencyMicros);
+ }).exceptionally(cause -> {
+ log.warn("Error at writing records", cause);
+ System.exit(-1);
+ return null;
+ });
+ }
+ }
+ }
+
+ @SuppressFBWarnings("DM_EXIT")
+ void markPerfDone() throws Exception {
+ log.info("------------------- DONE -----------------------");
+ printAggregatedStats(cumulativeRecorder);
+ isDone.set(true);
+ Thread.sleep(5000);
+ System.exit(0);
+ }
+
+ void reportStats() {
+ // Print report stats
+ long oldTime = System.nanoTime();
+
+ Histogram reportHistogram = null;
+
+ while (true) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ if (isDone.get()) {
+ break;
+ }
+
+ long now = System.nanoTime();
+ double elapsed = (now - oldTime) / 1e9;
+
+ double rate = recordsWritten.sumThenReset() / elapsed;
+ double throughput = bytesWritten.sumThenReset() / elapsed / 1024 / 1024;
+
+ reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+
+ log.info(
+ "Throughput written : {} records/s --- {} MB/s --- Latency: mean:"
+ + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+ throughputFormat.format(rate), throughputFormat.format(throughput),
+ dec.format(reportHistogram.getMean() / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+ dec.format(reportHistogram.getMaxValue() / 1000.0));
+
+ reportHistogram.reset();
+
+ oldTime = now;
+ }
+
+ }
+
+ private static DistributedLogConfiguration newDlogConf(Flags flags) {
+ return new DistributedLogConfiguration()
+ .setEnsembleSize(flags.ensembleSize)
+ .setWriteQuorumSize(flags.writeQuorumSize)
+ .setAckQuorumSize(flags.ackQuorumSize)
+ .setOutputBufferSize(512 * 1024)
+ .setPeriodicFlushFrequencyMilliSeconds(2)
+ .setWriteLockEnabled(false)
+ .setMaxLogSegmentBytes(512 * 1024 * 1024) // 512MB
+ .setExplicitTruncationByApplication(true);
+ }
+
+
+ private static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
+ private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+
+ private static void printAggregatedStats(Recorder recorder) {
+ Histogram reportHistogram = recorder.getIntervalHistogram();
+
+ log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {}"
+ + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+ dec.format(reportHistogram.getMean() / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+ dec.format(reportHistogram.getValueAtPercentile(99.999) / 1000.0),
+ dec.format(reportHistogram.getMaxValue() / 1000.0));
+ }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
new file mode 100644
index 0000000..001cacb
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.dlog.PerfReader.Flags;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * Command to read log records to distributedlog streams.
+ */
+@Slf4j
+public class ReadCommand extends BKCommand<Flags> {
+
+ private static final String NAME = "read";
+ private static final String DESC = "Read log records from distributedlog streams";
+
+ public ReadCommand() {
+ super(CliSpec.<Flags>newBuilder()
+ .withName(NAME)
+ .withDescription(DESC)
+ .withFlags(new Flags())
+ .build());
+ }
+
+ @Override
+ protected boolean apply(ServiceURI serviceURI,
+ CompositeConfiguration conf,
+ BKFlags globalFlags, Flags cmdFlags) {
+
+
+ if (serviceURI == null) {
+ log.warn("No service uri is provided. Use default 'distributedlog://localhost/distributedlog'.");
+ serviceURI = ServiceURI.create("distributedlog://localhost/distributedlog");
+ }
+
+ PerfReader reader = new PerfReader(serviceURI, cmdFlags);
+ reader.run();
+ return true;
+ }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/WriteCommand.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/WriteCommand.java
new file mode 100644
index 0000000..aa7c92e
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/WriteCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.dlog.PerfWriter.Flags;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * Command to write log records to distributedlog streams.
+ */
+@Slf4j
+public class WriteCommand extends BKCommand<Flags> {
+
+ private static final String NAME = "write";
+ private static final String DESC = "Write log records to distributedlog streams";
+
+ public WriteCommand() {
+ super(CliSpec.<Flags>newBuilder()
+ .withName(NAME)
+ .withDescription(DESC)
+ .withFlags(new Flags())
+ .build());
+ }
+
+ @Override
+ protected boolean apply(ServiceURI serviceURI,
+ CompositeConfiguration conf,
+ BKFlags globalFlags, Flags cmdFlags) {
+
+
+ if (serviceURI == null) {
+ log.warn("No service uri is provided. Use default 'distributedlog://localhost/distributedlog'.");
+ serviceURI = ServiceURI.create("distributedlog://localhost/distributedlog");
+ }
+
+ PerfWriter writer = new PerfWriter(serviceURI, cmdFlags);
+ writer.run();
+ return true;
+ }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/package-info.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/package-info.java
new file mode 100644
index 0000000..596d419
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Dlog related perf command.
+ */
+package org.apache.bookkeeper.tools.perf.dlog;
\ No newline at end of file
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/package-info.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/package-info.java
new file mode 100644
index 0000000..ca7aee6
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * BookKeeper Perf Tool.
+ */
+package org.apache.bookkeeper.tools.perf;
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/PaddingDecimalFormat.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/PaddingDecimalFormat.java
new file mode 100644
index 0000000..0bc92af
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/PaddingDecimalFormat.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.utils;
+
+import java.text.DecimalFormat;
+import java.text.DecimalFormatSymbols;
+import java.text.FieldPosition;
+
+/**
+ * A decimal format that adds padding zeros.
+ */
+public class PaddingDecimalFormat extends DecimalFormat {
+ private int minimumLength;
+
+ /**
+ * Creates a PaddingDecimalFormat using the given pattern and minimum minimumLength and the symbols for the default
+ * locale.
+ */
+ public PaddingDecimalFormat(String pattern, int minLength) {
+ super(pattern);
+ minimumLength = minLength;
+ }
+
+ /**
+ * Creates a PaddingDecimalFormat using the given pattern, symbols and minimum minimumLength.
+ */
+ public PaddingDecimalFormat(String pattern, DecimalFormatSymbols symbols, int minLength) {
+ super(pattern, symbols);
+ minimumLength = minLength;
+ }
+
+ @Override
+ public StringBuffer format(double number, StringBuffer toAppendTo, FieldPosition pos) {
+ int initLength = toAppendTo.length();
+ super.format(number, toAppendTo, pos);
+ return pad(toAppendTo, initLength);
+ }
+
+ @Override
+ public StringBuffer format(long number, StringBuffer toAppendTo, FieldPosition pos) {
+ int initLength = toAppendTo.length();
+ super.format(number, toAppendTo, pos);
+ return pad(toAppendTo, initLength);
+ }
+
+ private StringBuffer pad(StringBuffer toAppendTo, int initLength) {
+ int numLength = toAppendTo.length() - initLength;
+ int padLength = minimumLength - numLength;
+ if (padLength > 0) {
+ StringBuffer pad = new StringBuffer(padLength);
+ for (int i = 0; i < padLength; i++) {
+ pad.append(' ');
+ }
+ toAppendTo.insert(initLength, pad);
+ }
+ return toAppendTo;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof PaddingDecimalFormat)) {
+ return false;
+ }
+
+ PaddingDecimalFormat other = (PaddingDecimalFormat) obj;
+ return minimumLength == other.minimumLength && super.equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * super.hashCode() + minimumLength;
+ }
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/package-info.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/package-info.java
new file mode 100644
index 0000000..76a7e42
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utils used in the perf tool.
+ */
+package org.apache.bookkeeper.tools.perf.utils;
\ No newline at end of file
diff --git a/tools/pom.xml b/tools/pom.xml
index 988de81..37e6375 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -27,6 +27,7 @@
<packaging>pom</packaging>
<modules>
<module>framework</module>
+ <module>perf</module>
<module>all</module>
</modules>
<profiles>