You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/09/25 17:38:16 UTC

[bookkeeper] branch master updated: [TOOLS] Add a perf tool for benchmarking bookkeeper

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new c02e0fb  [TOOLS] Add a perf tool for benchmarking bookkeeper
c02e0fb is described below

commit c02e0fbd2a0618b9a4e40c995a186f003ab9070b
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Tue Sep 25 10:38:12 2018 -0700

    [TOOLS] Add a perf tool for benchmarking bookkeeper
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    when investigating performance on scanning entries in bookkeeper, it is hard to know what is the expectation of what kind of throughput that application can get. we need a perf tool to understand the baseline of performance that bookkeeper can provide.
    
    ### Changes
    
    this tool is following what pulsar perf tool is doing and using dlog library for evaluating the write and read throughput on bookkeeper using one (or a few) dlog streams.
    
    
    
    
    Author: Sijie Guo <si...@apache.org>
    Author: Qi Wang <42...@users.noreply.github.com>
    Author: Charan Reddy Guttapalem <re...@gmail.com>
    Author: Sijie Guo <gu...@gmail.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1697 from sijie/bkperf
---
 bin/bkperf                                         |  66 ++++
 conf/log4j.cli.properties                          |   1 +
 pom.xml                                            |   8 +
 tools/perf/README.md                               | 100 +++++
 tools/perf/pom.xml                                 |  43 +++
 .../org/apache/bookkeeper/tools/perf/BKPerf.java   |  46 +++
 .../tools/perf/DlogPerfCommandGroup.java           |  43 +++
 .../bookkeeper/tools/perf/PerfCommandGroup.java    |  25 ++
 .../bookkeeper/tools/perf/dlog/PerfReader.java     | 290 +++++++++++++++
 .../bookkeeper/tools/perf/dlog/PerfWriter.java     | 413 +++++++++++++++++++++
 .../bookkeeper/tools/perf/dlog/ReadCommand.java    |  58 +++
 .../bookkeeper/tools/perf/dlog/WriteCommand.java   |  58 +++
 .../bookkeeper/tools/perf/dlog/package-info.java   |  18 +
 .../apache/bookkeeper/tools/perf/package-info.java |  18 +
 .../tools/perf/utils/PaddingDecimalFormat.java     |  85 +++++
 .../bookkeeper/tools/perf/utils/package-info.java  |  18 +
 tools/pom.xml                                      |   1 +
 17 files changed, 1291 insertions(+)

diff --git a/bin/bkperf b/bin/bkperf
new file mode 100755
index 0000000..6ec2909
--- /dev/null
+++ b/bin/bkperf
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+# BookKeeper Perf Tool (experimental)
+
+BINDIR=`dirname "$0"`
+BK_HOME=`cd ${BINDIR}/..;pwd`
+
+source ${BK_HOME}/bin/common.sh
+source ${BK_HOME}/conf/bk_cli_env.sh
+
+CLI_MODULE_PATH=tools/perf
+CLI_MODULE_NAME="(org.apache.bookkeeper-)?bookkeeper-perf"
+CLI_MODULE_HOME=${BK_HOME}/${CLI_MODULE_PATH}
+
+# find the module jar
+CLI_JAR=$(find_module_jar ${CLI_MODULE_PATH} ${CLI_MODULE_NAME})
+
+# set up the classpath
+CLI_CLASSPATH=$(set_module_classpath ${CLI_MODULE_PATH})
+
+DEFAULT_CONF=${BK_HOME}/conf/bk_server.conf
+if [ -z "${CLI_CONF}" ]; then
+  CLI_CONF=${DEFAULT_CONF}
+fi
+
+DEFAULT_LOG_CONF=${BK_HOME}/conf/log4j.cli.properties
+if [ -z "${CLI_LOG_CONF}" ]; then
+  CLI_LOG_CONF=${DEFAULT_LOG_CONF}
+fi
+CLI_LOG_DIR=${CLI_LOG_DIR:-"$BK_HOME/logs"}
+CLI_LOG_FILE=${CLI_LOG_FILE:-"bkperf.log"}
+CLI_ROOT_LOGGER=${CLI_ROOT_LOGGER:-"INFO,ROLLINGFILE"}
+
+# Configure the classpath
+CLI_CLASSPATH="$CLI_JAR:$CLI_CLASSPATH:$CLI_EXTRA_CLASSPATH"
+CLI_CLASSPATH="`dirname $CLI_LOG_CONF`:$CLI_CLASSPATH"
+
+# Build the OPTs
+BOOKIE_OPTS=$(build_bookie_opts)
+GC_OPTS=$(build_cli_jvm_opts ${CLI_LOG_DIR} "bkperf-gc.log")
+NETTY_OPTS=$(build_netty_opts)
+LOGGING_OPTS=$(build_cli_logging_opts ${CLI_LOG_CONF} ${CLI_LOG_DIR} ${CLI_LOG_FILE} ${CLI_ROOT_LOGGER})
+
+OPTS="${OPTS} -cp ${CLI_CLASSPATH} ${BOOKIE_OPTS} ${GC_OPTS} ${NETTY_OPTS} ${LOGGING_OPTS} ${CLI_EXTRA_OPTS}"
+
+#Change to BK_HOME to support relative paths
+cd "$BK_HOME"
+exec ${JAVA} ${OPTS} org.apache.bookkeeper.tools.perf.BKPerf --conf ${CLI_CONF} $@
diff --git a/conf/log4j.cli.properties b/conf/log4j.cli.properties
index 51c95f5..ceb77cc 100644
--- a/conf/log4j.cli.properties
+++ b/conf/log4j.cli.properties
@@ -55,5 +55,6 @@ log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{
 log4j.logger.verbose=INFO,VERBOSECONSOLE
 log4j.logger.org.apache.zookeeper=ERROR
 log4j.logger.org.apache.bookkeeper=ERROR
+log4j.logger.org.apache.bookkeeper.tools=INFO
 log4j.logger.org.apache.bookkeeper.bookie.BookieShell=INFO
 log4j.logger.org.apache.bookkeeper.client.BookKeeperAdmin=INFO
diff --git a/pom.xml b/pom.xml
index e5a3ffd..5570259 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,6 +130,7 @@
     <guava.version>21.0</guava.version>
     <hadoop.version>2.7.3</hadoop.version>
     <hamcrest.version>1.3</hamcrest.version>
+    <hdrhistogram.version>2.1.10</hdrhistogram.version>
     <jackson.version>2.8.9</jackson.version>
     <jackson-mapper-asl.version>1.9.11</jackson-mapper-asl.version>
     <jcommander.version>1.48</jcommander.version>
@@ -543,6 +544,13 @@
         <version>${jcommander.version}</version>
       </dependency>
 
+      <!-- pref dependencies -->
+      <dependency>
+        <groupId>org.hdrhistogram</groupId>
+        <artifactId>HdrHistogram</artifactId>
+        <version>${hdrhistogram.version}</version>
+      </dependency>
+
       <!-- test dependencies -->
       <dependency>
         <groupId>junit</groupId>
diff --git a/tools/perf/README.md b/tools/perf/README.md
new file mode 100644
index 0000000..945d80c
--- /dev/null
+++ b/tools/perf/README.md
@@ -0,0 +1,100 @@
+## BookKeeper Perf Tool
+
+### Dlog
+
+```shell
+$ bin/bkperf dlog
+Commands on evaluating performance of distributedlog library
+
+Usage:  bkperf dlog [command] [command options]
+
+Commands:
+
+    read        Read log records to distributedlog streams
+    write       Write log records to distributedlog streams
+
+    help        Display help information about it
+```
+
+#### Write records to logs
+
+```shell
+$ bin/bkperf dlog write -h
+Write log records to distributedlog streams
+
+Usage:  bkperf dlog write [flags]
+
+Flags:
+
+    -a, --ack-quorum-size
+        Ledger ack quorum size
+
+    -e, --ensemble-size
+        Ledger ensemble size
+
+    -ln, --log-name
+        Log name or log name pattern if more than 1 log is specified at
+        `--num-logs`
+
+    -b, --num-bytes
+        Number of bytes to write in total. If 0, it will keep writing
+
+    -l, --num-logs
+        Number of log streams
+
+    -n, --num-records
+        Number of records to write in total. If 0, it will keep writing
+
+    -r, --rate
+        Write rate bytes/s across log streams
+
+    -rs, --record-size
+        Log record size
+
+    --threads
+        Number of threads writing
+
+    -w, --write-quorum-size
+        Ledger write quorum size
+
+
+    -h, --help
+        Display help information
+```
+
+Example: write to log stream `test-log` at `100mb/second`, using 1-bookie ensemble.
+
+```shell
+$ bin/bkperf dlog write -w 1 -a 1 -e 1 -r 104857600 --log-name test-log
+```
+
+### Read records from logs
+
+```shell
+$ bin/bkperf dlog read -h
+Read log records from distributedlog streams
+
+Usage:  bkperf dlog read [flags]
+
+Flags:
+
+    -ln, --log-name
+        Log name or log name pattern if more than 1 log is specified at
+        `--num-logs`
+
+    -l, --num-logs
+        Number of log streams
+
+    --threads
+        Number of threads reading
+
+
+    -h, --help
+        Display help information
+```
+
+Example: read from log stream `test-log-000000`.
+
+```shell
+$ bin/bkperf dlog read --log-name test-log-000000
+```
diff --git a/tools/perf/pom.xml b/tools/perf/pom.xml
new file mode 100644
index 0000000..d3a617f
--- /dev/null
+++ b/tools/perf/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.bookkeeper</groupId>
+    <artifactId>bookkeeper-tools-parent</artifactId>
+    <version>4.9.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>bookkeeper-perf</artifactId>
+  <name>Apache BookKeeper :: Tools :: Perf</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-tools-framework</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.distributedlog</groupId>
+      <artifactId>distributedlog-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.hdrhistogram</groupId>
+      <artifactId>HdrHistogram</artifactId>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java
new file mode 100644
index 0000000..23021ee
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.Cli;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+
+/**
+ * <b>bkperf</b> evaluates the performance of <i>Apache BookKeeper</i> cluster.
+ */
+@Slf4j
+public class BKPerf {
+
+    public static final String NAME = "bkperf";
+
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) {
+        CliSpec.Builder<BKFlags> specBuilder = CliSpec.<BKFlags>newBuilder()
+            .withName(NAME)
+            .withUsage(NAME + " [flags] [command group] [commands]")
+            .withDescription(NAME + " evaluates the performance of Apache BookKeeper clusters")
+            .withFlags(new BKFlags())
+            .withConsole(System.out)
+            .addCommand(new DlogPerfCommandGroup());
+
+        CliSpec<BKFlags> spec = specBuilder.build();
+
+        int retCode = Cli.runCli(spec, args);
+        Runtime.getRuntime().exit(retCode);
+    }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
new file mode 100644
index 0000000..75adc4b
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf;
+
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.dlog.ReadCommand;
+import org.apache.bookkeeper.tools.perf.dlog.WriteCommand;
+
+/**
+ * Commands that evaluate performance of distributedlog library.
+ */
+public class DlogPerfCommandGroup extends CliCommandGroup<BKFlags> implements PerfCommandGroup<BKFlags> {
+
+    private static final String NAME = "dlog";
+    private static final String DESC = "Commands on evaluating performance of distributedlog library";
+
+    private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+        .withName(NAME)
+        .withDescription(DESC)
+        .withParent(BKPerf.NAME)
+        .addCommand(new WriteCommand())
+        .addCommand(new ReadCommand())
+        .build();
+
+    public DlogPerfCommandGroup() {
+        super(spec);
+    }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/PerfCommandGroup.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/PerfCommandGroup.java
new file mode 100644
index 0000000..c64d77f
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/PerfCommandGroup.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf;
+
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CommandGroup;
+
+/**
+ * A command group that group commands together for performance evaluations.
+ */
+public interface PerfCommandGroup<GlobalFlagsT extends CliFlags>
+    extends CommandGroup<GlobalFlagsT> {
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
new file mode 100644
index 0000000..7497d7d
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import com.beust.jcommander.Parameter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+
+/**
+ * A perf writer to evaluate write performance.
+ */
+@Slf4j
+public class PerfReader implements Runnable {
+
+    /**
+     * Flags for the write command.
+     */
+    public static class Flags extends CliFlags {
+
+        @Parameter(
+            names = {
+                "-ln", "--log-name"
+            },
+            description = "Log name or log name pattern if more than 1 log is specified at `--num-logs`")
+        public String logName = "test-log-%06d";
+
+        @Parameter(
+            names = {
+                "-l", "--num-logs"
+            },
+            description = "Number of log streams")
+        public int numLogs = 1;
+
+        @Parameter(
+            names = {
+                "-t", "--threads"
+            },
+            description = "Number of threads reading")
+        public int numThreads = 1;
+
+        @Parameter(
+            names = {
+                "-mr", "--max-readahead-records"
+            },
+            description = "Max readhead records")
+        public int maxReadAheadRecords = 1000000;
+
+        @Parameter(
+            names = {
+                "-bs", "--readahead-batch-size"
+            },
+            description = "ReadAhead Batch Size, in entries"
+        )
+        public int readAheadBatchSize = 4;
+
+    }
+
+
+    // stats
+    private final LongAdder recordsRead = new LongAdder();
+    private final LongAdder bytesRead = new LongAdder();
+
+    private final ServiceURI serviceURI;
+    private final Flags flags;
+    private final Recorder recorder = new Recorder(
+        TimeUnit.SECONDS.toMillis(120000), 5
+    );
+    private final Recorder cumulativeRecorder = new Recorder(
+        TimeUnit.SECONDS.toMillis(120000), 5
+    );
+    private final AtomicBoolean isDone = new AtomicBoolean(false);
+
+    PerfReader(ServiceURI serviceURI, Flags flags) {
+        this.serviceURI = serviceURI;
+        this.flags = flags;
+    }
+
+    @Override
+    public void run() {
+        try {
+            execute();
+        } catch (Exception e) {
+            log.error("Encountered exception at running dlog perf writer", e);
+        }
+    }
+
+    void execute() throws Exception {
+        ObjectMapper m = new ObjectMapper();
+        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+        log.info("Starting dlog perf reader with config : {}", w.writeValueAsString(flags));
+
+        DistributedLogConfiguration conf = newDlogConf(flags);
+        try (Namespace namespace = NamespaceBuilder.newBuilder()
+             .conf(conf)
+             .uri(serviceURI.getUri())
+             .build()) {
+            execute(namespace);
+        }
+    }
+
+    void execute(Namespace namespace) throws Exception {
+        List<Pair<Integer, DistributedLogManager>> managers = new ArrayList<>(flags.numLogs);
+        for (int i = 0; i < flags.numLogs; i++) {
+            String logName = String.format(flags.logName, i);
+            managers.add(Pair.of(i, namespace.openLog(logName)));
+        }
+        log.info("Successfully open {} logs", managers.size());
+
+        // register shutdown hook to aggregate stats
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            isDone.set(true);
+            printAggregatedStats(cumulativeRecorder);
+        }));
+
+        ExecutorService executor = Executors.newFixedThreadPool(flags.numThreads);
+        try {
+            for (int i = 0; i < flags.numThreads; i++) {
+                final int idx = i;
+                final List<DistributedLogManager> logsThisThread = managers
+                    .stream()
+                    .filter(pair -> pair.getLeft() % flags.numThreads == idx)
+                    .map(pair -> pair.getRight())
+                    .collect(Collectors.toList());
+                executor.submit(() -> {
+                    try {
+                        read(logsThisThread);
+                    } catch (Exception e) {
+                        log.error("Encountered error at writing records", e);
+                    }
+                });
+            }
+            log.info("Started {} write threads", flags.numThreads);
+            reportStats();
+        } finally {
+            executor.shutdown();
+            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                executor.shutdownNow();
+            }
+            managers.forEach(manager -> manager.getRight().asyncClose());
+        }
+    }
+
+    void read(List<DistributedLogManager> logs) throws Exception {
+        log.info("Read thread started with : logs = {}",
+            logs.stream().map(l -> l.getStreamName()).collect(Collectors.toList()));
+
+        List<LogReader> readers = logs.stream()
+            .map(manager -> {
+                try {
+                    return manager.openLogReader(DLSN.InitialDLSN);
+                } catch (IOException e) {
+                    log.error("Failed to open reader for log stream {}", manager.getStreamName(), e);
+                    throw new UncheckedIOException(e);
+                }
+            })
+            .collect(Collectors.toList());
+
+        final int numLogs = logs.size();
+        while (true) {
+            for (int i = 0; i < numLogs; i++) {
+                LogRecordWithDLSN record = readers.get(i).readNext(true);
+                if (null != record) {
+                    recordsRead.increment();
+                    bytesRead.add(record.getPayloadBuf().readableBytes());
+                }
+            }
+        }
+    }
+
+    void reportStats() {
+        // Print report stats
+        long oldTime = System.nanoTime();
+
+        Histogram reportHistogram = null;
+
+        while (true) {
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                break;
+            }
+
+            if (isDone.get()) {
+                break;
+            }
+
+            long now = System.nanoTime();
+            double elapsed = (now - oldTime) / 1e9;
+
+            double rate = recordsRead.sumThenReset() / elapsed;
+            double throughput = bytesRead.sumThenReset() / elapsed / 1024 / 1024;
+
+            reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+
+            log.info("Throughput read : {}  records/s --- {} MB/s --- Latency: mean:"
+                        + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    throughputFormat.format(rate), throughputFormat.format(throughput),
+                    dec.format(reportHistogram.getMean() / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+                    dec.format(reportHistogram.getMaxValue() / 1000.0));
+
+            reportHistogram.reset();
+
+            oldTime = now;
+        }
+
+    }
+
+    private static DistributedLogConfiguration newDlogConf(Flags flags) {
+        return new DistributedLogConfiguration()
+            .setReadAheadBatchSize(flags.readAheadBatchSize)
+            .setReadAheadMaxRecords(flags.maxReadAheadRecords)
+            .setReadAheadWaitTime(200);
+    }
+
+
+    private static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
+    private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+
+    private static void printAggregatedStats(Recorder recorder) {
+        Histogram reportHistogram = recorder.getIntervalHistogram();
+
+        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {}"
+                + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+                dec.format(reportHistogram.getMean() / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.999) / 1000.0),
+                dec.format(reportHistogram.getMaxValue() / 1000.0));
+    }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
new file mode 100644
index 0000000..e29dc64
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+
+import com.beust.jcommander.Parameter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.util.concurrent.RateLimiter;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.netty.buffer.Unpooled;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+
+/**
+ * A perf writer to evaluate write performance.
+ */
+@Slf4j
+public class PerfWriter implements Runnable {
+
+    /**
+     * Flags for the write command.
+     */
+    public static class Flags extends CliFlags {
+
+        @Parameter(
+            names = {
+                "-r", "--rate"
+            },
+            description = "Write rate bytes/s across log streams")
+        public int writeRate = 0;
+
+        @Parameter(
+            names = {
+                "-rs", "--record-size"
+            },
+            description = "Log record size")
+        public int recordSize = 1024;
+
+        @Parameter(
+            names = {
+                "-ln", "--log-name"
+            },
+            description = "Log name or log name pattern if more than 1 log is specified at `--num-logs`")
+        public String logName = "test-log-%06d";
+
+        @Parameter(
+            names = {
+                "-l", "--num-logs"
+            },
+            description = "Number of log streams")
+        public int numLogs = 1;
+
+        @Parameter(
+            names = {
+                "-t", "--threads"
+            },
+            description = "Number of threads writing")
+        public int numThreads = 1;
+
+        @Parameter(
+            names = {
+                "-mob", "--max-outstanding-megabytes"
+            },
+            description = "Number of threads writing")
+        public long maxOutstandingMB = 200;
+
+        @Parameter(
+            names = {
+                "-n", "--num-records"
+            },
+            description = "Number of records to write in total. If 0, it will keep writing")
+        public long numRecords = 0;
+
+        @Parameter(
+            names = {
+                "-b", "--num-bytes"
+            },
+            description = "Number of bytes to write in total. If 0, it will keep writing")
+        public long numBytes = 0;
+
+        @Parameter(
+            names = {
+                "-e", "--ensemble-size"
+            },
+            description = "Ledger ensemble size")
+        public int ensembleSize = 1;
+
+        @Parameter(
+            names = {
+                "-w", "--write-quorum-size"
+            },
+            description = "Ledger write quorum size")
+        public int writeQuorumSize = 1;
+
+        @Parameter(
+            names = {
+                "-a", "--ack-quorum-size"
+            },
+            description = "Ledger ack quorum size")
+        public int ackQuorumSize = 1;
+
+    }
+
+
+    // stats
+    private final LongAdder recordsWritten = new LongAdder();
+    private final LongAdder bytesWritten = new LongAdder();
+
+    private final byte[] payload;
+    private final ServiceURI serviceURI;
+    private final Flags flags;
+    private final Recorder recorder = new Recorder(
+        TimeUnit.SECONDS.toMillis(120000), 5
+    );
+    private final Recorder cumulativeRecorder = new Recorder(
+        TimeUnit.SECONDS.toMillis(120000), 5
+    );
+    private final AtomicBoolean isDone = new AtomicBoolean(false);
+
+    PerfWriter(ServiceURI serviceURI, Flags flags) {
+        this.serviceURI = serviceURI;
+        this.flags = flags;
+        this.payload = new byte[flags.recordSize];
+        ThreadLocalRandom.current().nextBytes(payload);
+    }
+
+    @Override
+    public void run() {
+        try {
+            execute();
+        } catch (Exception e) {
+            log.error("Encountered exception at running dlog perf writer", e);
+        }
+    }
+
+    void execute() throws Exception {
+        ObjectMapper m = new ObjectMapper();
+        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+        log.info("Starting dlog perf writer with config : {}", w.writeValueAsString(flags));
+
+        DistributedLogConfiguration conf = newDlogConf(flags);
+        try (Namespace namespace = NamespaceBuilder.newBuilder()
+             .conf(conf)
+             .uri(serviceURI.getUri())
+             .build()) {
+            execute(namespace);
+        }
+    }
+
+    void execute(Namespace namespace) throws Exception {
+        List<Pair<Integer, DistributedLogManager>> managers = new ArrayList<>(flags.numLogs);
+        for (int i = 0; i < flags.numLogs; i++) {
+            String logName = String.format(flags.logName, i);
+            managers.add(Pair.of(i, namespace.openLog(logName)));
+        }
+        log.info("Successfully open {} logs", managers.size());
+
+        // register shutdown hook to aggregate stats
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            isDone.set(true);
+            printAggregatedStats(cumulativeRecorder);
+        }));
+
+        ExecutorService executor = Executors.newFixedThreadPool(flags.numThreads);
+        try {
+            for (int i = 0; i < flags.numThreads; i++) {
+                final int idx = i;
+                final List<DistributedLogManager> logsThisThread = managers
+                    .stream()
+                    .filter(pair -> pair.getLeft() % flags.numThreads == idx)
+                    .map(pair -> pair.getRight())
+                    .collect(Collectors.toList());
+                final long numRecordsForThisThread = flags.numRecords / flags.numThreads;
+                final long numBytesForThisThread = flags.numBytes / flags.numThreads;
+                final double writeRateForThisThread = flags.writeRate / (double) flags.numThreads;
+                final long maxOutstandingBytesForThisThread = flags.maxOutstandingMB * 1024 * 1024 / flags.numThreads;
+                executor.submit(() -> {
+                    try {
+                        write(
+                            logsThisThread,
+                            writeRateForThisThread,
+                            (int) maxOutstandingBytesForThisThread,
+                            numRecordsForThisThread,
+                            numBytesForThisThread);
+                    } catch (Exception e) {
+                        log.error("Encountered error at writing records", e);
+                    }
+                });
+            }
+            log.info("Started {} write threads", flags.numThreads);
+            reportStats();
+        } finally {
+            executor.shutdown();
+            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                executor.shutdownNow();
+            }
+            managers.forEach(manager -> manager.getRight().asyncClose());
+        }
+    }
+
+    void write(List<DistributedLogManager> logs,
+               double writeRate,
+               int maxOutstandingBytesForThisThread,
+               long numRecordsForThisThread,
+               long numBytesForThisThread) throws Exception {
+        log.info("Write thread started with : logs = {}, rate = {},"
+                + " num records = {}, num bytes = {}, max outstanding bytes = {}",
+            logs.stream().map(l -> l.getStreamName()).collect(Collectors.toList()),
+            writeRate,
+            numRecordsForThisThread,
+            numBytesForThisThread,
+            maxOutstandingBytesForThisThread);
+
+        List<CompletableFuture<AsyncLogWriter>> writerFutures = logs.stream()
+            .map(manager -> manager.openAsyncLogWriter())
+            .collect(Collectors.toList());
+        List<AsyncLogWriter> writers = result(FutureUtils.collect(writerFutures));
+
+        long txid = writers
+            .stream()
+            .mapToLong(writer -> writer.getLastTxId())
+            .max()
+            .orElse(0L);
+        txid = Math.max(0L, txid);
+
+        RateLimiter limiter;
+        if (writeRate > 0) {
+            limiter = RateLimiter.create(writeRate);
+        } else {
+            limiter = null;
+        }
+        final Semaphore semaphore;
+        if (maxOutstandingBytesForThisThread > 0) {
+            semaphore = new Semaphore(maxOutstandingBytesForThisThread);
+        } else {
+            semaphore = null;
+        }
+
+        // Acquire 1 second worth of records to have a slower ramp-up
+        if (limiter != null) {
+            limiter.acquire((int) writeRate);
+        }
+
+        long totalWritten = 0L;
+        long totalBytesWritten = 0L;
+        final int numLogs = logs.size();
+        while (true) {
+            for (int i = 0; i < numLogs; i++) {
+                if (numRecordsForThisThread > 0
+                    && totalWritten >= numRecordsForThisThread) {
+                    markPerfDone();
+                }
+                if (numBytesForThisThread > 0
+                    && totalBytesWritten >= numBytesForThisThread) {
+                    markPerfDone();
+                }
+                if (null != semaphore) {
+                    semaphore.acquire(payload.length);
+                }
+
+                totalWritten++;
+                totalBytesWritten += payload.length;
+                if (null != limiter) {
+                    limiter.acquire(payload.length);
+                }
+                final long sendTime = System.nanoTime();
+                writers.get(i).write(
+                    new LogRecord(++txid, Unpooled.wrappedBuffer(payload))
+                ).thenAccept(dlsn -> {
+                    if (null != semaphore) {
+                        semaphore.release(payload.length);
+                    }
+
+                    recordsWritten.increment();
+                    bytesWritten.add(payload.length);
+
+                    long latencyMicros = TimeUnit.NANOSECONDS.toMicros(
+                        System.nanoTime() - sendTime
+                    );
+                    recorder.recordValue(latencyMicros);
+                    cumulativeRecorder.recordValue(latencyMicros);
+                }).exceptionally(cause -> {
+                    log.warn("Error at writing records", cause);
+                    System.exit(-1);
+                    return null;
+                });
+            }
+        }
+    }
+
+    @SuppressFBWarnings("DM_EXIT")
+    void markPerfDone() throws Exception {
+        log.info("------------------- DONE -----------------------");
+        printAggregatedStats(cumulativeRecorder);
+        isDone.set(true);
+        Thread.sleep(5000);
+        System.exit(0);
+    }
+
+    void reportStats() {
+        // Print report stats
+        long oldTime = System.nanoTime();
+
+        Histogram reportHistogram = null;
+
+        while (true) {
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                break;
+            }
+
+            if (isDone.get()) {
+                break;
+            }
+
+            long now = System.nanoTime();
+            double elapsed = (now - oldTime) / 1e9;
+
+            double rate = recordsWritten.sumThenReset() / elapsed;
+            double throughput = bytesWritten.sumThenReset() / elapsed / 1024 / 1024;
+
+            reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+
+            log.info(
+                    "Throughput written : {}  records/s --- {} MB/s --- Latency: mean:"
+                        + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    throughputFormat.format(rate), throughputFormat.format(throughput),
+                    dec.format(reportHistogram.getMean() / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+                    dec.format(reportHistogram.getMaxValue() / 1000.0));
+
+            reportHistogram.reset();
+
+            oldTime = now;
+        }
+
+    }
+
+    private static DistributedLogConfiguration newDlogConf(Flags flags) {
+        return new DistributedLogConfiguration()
+            .setEnsembleSize(flags.ensembleSize)
+            .setWriteQuorumSize(flags.writeQuorumSize)
+            .setAckQuorumSize(flags.ackQuorumSize)
+            .setOutputBufferSize(512 * 1024)
+            .setPeriodicFlushFrequencyMilliSeconds(2)
+            .setWriteLockEnabled(false)
+            .setMaxLogSegmentBytes(512 * 1024 * 1024) // 512MB
+            .setExplicitTruncationByApplication(true);
+    }
+
+
+    private static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
+    private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+
+    private static void printAggregatedStats(Recorder recorder) {
+        Histogram reportHistogram = recorder.getIntervalHistogram();
+
+        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {}"
+                + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+                dec.format(reportHistogram.getMean() / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.999) / 1000.0),
+                dec.format(reportHistogram.getMaxValue() / 1000.0));
+    }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
new file mode 100644
index 0000000..001cacb
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.dlog.PerfReader.Flags;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * Command to read log records to distributedlog streams.
+ */
+@Slf4j
+public class ReadCommand extends BKCommand<Flags> {
+
+    private static final String NAME = "read";
+    private static final String DESC = "Read log records from distributedlog streams";
+
+    public ReadCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .build());
+    }
+
+    @Override
+    protected boolean apply(ServiceURI serviceURI,
+                            CompositeConfiguration conf,
+                            BKFlags globalFlags, Flags cmdFlags) {
+
+
+        if (serviceURI == null) {
+            log.warn("No service uri is provided. Use default 'distributedlog://localhost/distributedlog'.");
+            serviceURI = ServiceURI.create("distributedlog://localhost/distributedlog");
+        }
+
+        PerfReader reader = new PerfReader(serviceURI, cmdFlags);
+        reader.run();
+        return true;
+    }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/WriteCommand.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/WriteCommand.java
new file mode 100644
index 0000000..aa7c92e
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/WriteCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.dlog.PerfWriter.Flags;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * Command to write log records to distributedlog streams.
+ */
+@Slf4j
+public class WriteCommand extends BKCommand<Flags> {
+
+    private static final String NAME = "write";
+    private static final String DESC = "Write log records to distributedlog streams";
+
+    public WriteCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .build());
+    }
+
+    @Override
+    protected boolean apply(ServiceURI serviceURI,
+                            CompositeConfiguration conf,
+                            BKFlags globalFlags, Flags cmdFlags) {
+
+
+        if (serviceURI == null) {
+            log.warn("No service uri is provided. Use default 'distributedlog://localhost/distributedlog'.");
+            serviceURI = ServiceURI.create("distributedlog://localhost/distributedlog");
+        }
+
+        PerfWriter writer = new PerfWriter(serviceURI, cmdFlags);
+        writer.run();
+        return true;
+    }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/package-info.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/package-info.java
new file mode 100644
index 0000000..596d419
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Dlog related perf command.
+ */
+package org.apache.bookkeeper.tools.perf.dlog;
\ No newline at end of file
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/package-info.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/package-info.java
new file mode 100644
index 0000000..ca7aee6
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * BookKeeper Perf Tool.
+ */
+package org.apache.bookkeeper.tools.perf;
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/PaddingDecimalFormat.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/PaddingDecimalFormat.java
new file mode 100644
index 0000000..0bc92af
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/PaddingDecimalFormat.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.utils;
+
+import java.text.DecimalFormat;
+import java.text.DecimalFormatSymbols;
+import java.text.FieldPosition;
+
+/**
+ * A decimal format that adds padding zeros.
+ */
+public class PaddingDecimalFormat extends DecimalFormat {
+    private int minimumLength;
+
+    /**
+     * Creates a PaddingDecimalFormat using the given pattern and minimum minimumLength and the symbols for the default
+     * locale.
+     */
+    public PaddingDecimalFormat(String pattern, int minLength) {
+        super(pattern);
+        minimumLength = minLength;
+    }
+
+    /**
+     * Creates a PaddingDecimalFormat using the given pattern, symbols and minimum minimumLength.
+     */
+    public PaddingDecimalFormat(String pattern, DecimalFormatSymbols symbols, int minLength) {
+        super(pattern, symbols);
+        minimumLength = minLength;
+    }
+
+    @Override
+    public StringBuffer format(double number, StringBuffer toAppendTo, FieldPosition pos) {
+        int initLength = toAppendTo.length();
+        super.format(number, toAppendTo, pos);
+        return pad(toAppendTo, initLength);
+    }
+
+    @Override
+    public StringBuffer format(long number, StringBuffer toAppendTo, FieldPosition pos) {
+        int initLength = toAppendTo.length();
+        super.format(number, toAppendTo, pos);
+        return pad(toAppendTo, initLength);
+    }
+
+    private StringBuffer pad(StringBuffer toAppendTo, int initLength) {
+        int numLength = toAppendTo.length() - initLength;
+        int padLength = minimumLength - numLength;
+        if (padLength > 0) {
+            StringBuffer pad = new StringBuffer(padLength);
+            for (int i = 0; i < padLength; i++) {
+                pad.append(' ');
+            }
+            toAppendTo.insert(initLength, pad);
+        }
+        return toAppendTo;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof PaddingDecimalFormat)) {
+            return false;
+        }
+
+        PaddingDecimalFormat other = (PaddingDecimalFormat) obj;
+        return minimumLength == other.minimumLength && super.equals(obj);
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 * super.hashCode() + minimumLength;
+    }
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/package-info.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/package-info.java
new file mode 100644
index 0000000..76a7e42
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utils used in the perf tool.
+ */
+package org.apache.bookkeeper.tools.perf.utils;
\ No newline at end of file
diff --git a/tools/pom.xml b/tools/pom.xml
index 988de81..37e6375 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -27,6 +27,7 @@
   <packaging>pom</packaging>
   <modules>
     <module>framework</module>
+    <module>perf</module>
     <module>all</module>
   </modules>
   <profiles>