You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/09/30 01:10:32 UTC

[bookkeeper] branch master updated: [TOOLS] [PERF] Add a `segread` perf command to read distributedlog streams in segment splits

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f2564c  [TOOLS] [PERF] Add a `segread` perf command to read distributedlog streams in segment splits
7f2564c is described below

commit 7f2564c49d6ad7795908886433d3afcc4155c9d9
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Sat Sep 29 18:10:27 2018 -0700

    [TOOLS] [PERF] Add a `segread` perf command to read distributedlog streams in segment splits
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    Find what is the bottleneck/throughput limit of reading a single ledger.
    
    *Changes*
    
    Provide a `segread` perf command to split a stream into a list of segments and split segments into splits,
    and test reading segment splits in parallel.
    
    
    
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1713 from sijie/segment_readers
---
 .../tools/perf/DlogPerfCommandGroup.java           |   2 +
 .../bookkeeper/tools/perf/dlog/PerfReader.java     | 166 +----------------
 .../bookkeeper/tools/perf/dlog/PerfReaderBase.java | 202 ++++++++++++++++++++
 .../tools/perf/dlog/PerfSegmentReader.java         | 204 +++++++++++++++++++++
 .../bookkeeper/tools/perf/dlog/PerfWriter.java     |  10 +-
 .../bookkeeper/tools/perf/dlog/ReadCommand.java    |   2 +-
 .../{ReadCommand.java => SegmentReadCommand.java}  |  16 +-
 7 files changed, 429 insertions(+), 173 deletions(-)

diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
index 75adc4b..d38c839 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
@@ -18,6 +18,7 @@ import org.apache.bookkeeper.tools.common.BKFlags;
 import org.apache.bookkeeper.tools.framework.CliCommandGroup;
 import org.apache.bookkeeper.tools.framework.CliSpec;
 import org.apache.bookkeeper.tools.perf.dlog.ReadCommand;
+import org.apache.bookkeeper.tools.perf.dlog.SegmentReadCommand;
 import org.apache.bookkeeper.tools.perf.dlog.WriteCommand;
 
 /**
@@ -34,6 +35,7 @@ public class DlogPerfCommandGroup extends CliCommandGroup<BKFlags> implements Pe
         .withParent(BKPerf.NAME)
         .addCommand(new WriteCommand())
         .addCommand(new ReadCommand())
+        .addCommand(new SegmentReadCommand())
         .build();
 
     public DlogPerfCommandGroup() {
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
index 7497d7d..ba0ea7b 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
@@ -28,128 +28,35 @@
 
 package org.apache.bookkeeper.tools.perf.dlog;
 
-import com.beust.jcommander.Parameter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.LongAdder;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
-import org.HdrHistogram.Histogram;
-import org.HdrHistogram.Recorder;
 import org.apache.bookkeeper.common.net.ServiceURI;
-import org.apache.bookkeeper.tools.framework.CliFlags;
-import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LogRecordWithDLSN;
 import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.api.namespace.Namespace;
-import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 
 /**
- * A perf writer to evaluate write performance.
+ * A perf reader to evaluate read performance.
  */
 @Slf4j
-public class PerfReader implements Runnable {
-
-    /**
-     * Flags for the write command.
-     */
-    public static class Flags extends CliFlags {
-
-        @Parameter(
-            names = {
-                "-ln", "--log-name"
-            },
-            description = "Log name or log name pattern if more than 1 log is specified at `--num-logs`")
-        public String logName = "test-log-%06d";
-
-        @Parameter(
-            names = {
-                "-l", "--num-logs"
-            },
-            description = "Number of log streams")
-        public int numLogs = 1;
-
-        @Parameter(
-            names = {
-                "-t", "--threads"
-            },
-            description = "Number of threads reading")
-        public int numThreads = 1;
-
-        @Parameter(
-            names = {
-                "-mr", "--max-readahead-records"
-            },
-            description = "Max readhead records")
-        public int maxReadAheadRecords = 1000000;
-
-        @Parameter(
-            names = {
-                "-bs", "--readahead-batch-size"
-            },
-            description = "ReadAhead Batch Size, in entries"
-        )
-        public int readAheadBatchSize = 4;
-
-    }
-
-
-    // stats
-    private final LongAdder recordsRead = new LongAdder();
-    private final LongAdder bytesRead = new LongAdder();
-
-    private final ServiceURI serviceURI;
-    private final Flags flags;
-    private final Recorder recorder = new Recorder(
-        TimeUnit.SECONDS.toMillis(120000), 5
-    );
-    private final Recorder cumulativeRecorder = new Recorder(
-        TimeUnit.SECONDS.toMillis(120000), 5
-    );
-    private final AtomicBoolean isDone = new AtomicBoolean(false);
+public class PerfReader extends PerfReaderBase {
 
     PerfReader(ServiceURI serviceURI, Flags flags) {
-        this.serviceURI = serviceURI;
-        this.flags = flags;
+        super(serviceURI, flags);
     }
 
     @Override
-    public void run() {
-        try {
-            execute();
-        } catch (Exception e) {
-            log.error("Encountered exception at running dlog perf writer", e);
-        }
-    }
-
-    void execute() throws Exception {
-        ObjectMapper m = new ObjectMapper();
-        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
-        log.info("Starting dlog perf reader with config : {}", w.writeValueAsString(flags));
-
-        DistributedLogConfiguration conf = newDlogConf(flags);
-        try (Namespace namespace = NamespaceBuilder.newBuilder()
-             .conf(conf)
-             .uri(serviceURI.getUri())
-             .build()) {
-            execute(namespace);
-        }
-    }
-
-    void execute(Namespace namespace) throws Exception {
+    protected void execute(Namespace namespace) throws Exception {
         List<Pair<Integer, DistributedLogManager>> managers = new ArrayList<>(flags.numLogs);
         for (int i = 0; i < flags.numLogs; i++) {
             String logName = String.format(flags.logName, i);
@@ -218,73 +125,8 @@ public class PerfReader implements Runnable {
         }
     }
 
-    void reportStats() {
-        // Print report stats
-        long oldTime = System.nanoTime();
 
-        Histogram reportHistogram = null;
 
-        while (true) {
-            try {
-                Thread.sleep(10000);
-            } catch (InterruptedException e) {
-                break;
-            }
-
-            if (isDone.get()) {
-                break;
-            }
-
-            long now = System.nanoTime();
-            double elapsed = (now - oldTime) / 1e9;
 
-            double rate = recordsRead.sumThenReset() / elapsed;
-            double throughput = bytesRead.sumThenReset() / elapsed / 1024 / 1024;
-
-            reportHistogram = recorder.getIntervalHistogram(reportHistogram);
-
-            log.info("Throughput read : {}  records/s --- {} MB/s --- Latency: mean:"
-                        + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
-                    throughputFormat.format(rate), throughputFormat.format(throughput),
-                    dec.format(reportHistogram.getMean() / 1000.0),
-                    dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
-                    dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
-                    dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
-                    dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
-                    dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
-                    dec.format(reportHistogram.getMaxValue() / 1000.0));
-
-            reportHistogram.reset();
-
-            oldTime = now;
-        }
-
-    }
-
-    private static DistributedLogConfiguration newDlogConf(Flags flags) {
-        return new DistributedLogConfiguration()
-            .setReadAheadBatchSize(flags.readAheadBatchSize)
-            .setReadAheadMaxRecords(flags.maxReadAheadRecords)
-            .setReadAheadWaitTime(200);
-    }
-
-
-    private static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
-    private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
-
-    private static void printAggregatedStats(Recorder recorder) {
-        Histogram reportHistogram = recorder.getIntervalHistogram();
-
-        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {}"
-                + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
-                dec.format(reportHistogram.getMean() / 1000.0),
-                dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
-                dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
-                dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
-                dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
-                dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
-                dec.format(reportHistogram.getValueAtPercentile(99.999) / 1000.0),
-                dec.format(reportHistogram.getMaxValue() / 1000.0));
-    }
 
 }
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReaderBase.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReaderBase.java
new file mode 100644
index 0000000..9520ef9
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReaderBase.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import com.beust.jcommander.Parameter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import java.text.DecimalFormat;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.extern.slf4j.Slf4j;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+
+@Slf4j
+abstract class PerfReaderBase implements Runnable {
+
+    /**
+     * Flags for the write command.
+     */
+    public static class Flags extends CliFlags {
+
+        @Parameter(
+            names = {
+                "-ln", "--log-name"
+            },
+            description = "Log name or log name pattern if more than 1 log is specified at `--num-logs`")
+        public String logName = "test-log-%06d";
+
+        @Parameter(
+            names = {
+                "-l", "--num-logs"
+            },
+            description = "Number of log streams")
+        public int numLogs = 1;
+
+        @Parameter(
+            names = {
+                "-t", "--threads"
+            },
+            description = "Number of threads reading")
+        public int numThreads = 1;
+
+        @Parameter(
+            names = {
+                "-mr", "--max-readahead-records"
+            },
+            description = "Max readhead records")
+        public int maxReadAheadRecords = 1000000;
+
+        @Parameter(
+            names = {
+                "-ns", "--num-splits-per-segment"
+            },
+            description = "Num splits per segment")
+        public int numSplitsPerSegment = 1;
+
+        @Parameter(
+            names = {
+                "-bs", "--readahead-batch-size"
+            },
+            description = "ReadAhead Batch Size, in entries"
+        )
+        public int readAheadBatchSize = 4;
+
+    }
+
+
+    // stats
+    protected final LongAdder recordsRead = new LongAdder();
+    protected final LongAdder bytesRead = new LongAdder();
+
+    protected final ServiceURI serviceURI;
+    protected final Flags flags;
+    protected final Recorder recorder = new Recorder(
+        TimeUnit.SECONDS.toMillis(120000), 5
+    );
+    protected final Recorder cumulativeRecorder = new Recorder(
+        TimeUnit.SECONDS.toMillis(120000), 5
+    );
+    protected final AtomicBoolean isDone = new AtomicBoolean(false);
+
+    PerfReaderBase(ServiceURI serviceURI, Flags flags) {
+        this.serviceURI = serviceURI;
+        this.flags = flags;
+    }
+
+    protected void execute() throws Exception {
+        ObjectMapper m = new ObjectMapper();
+        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+        log.info("Starting dlog perf reader with config : {}", w.writeValueAsString(flags));
+
+        DistributedLogConfiguration conf = newDlogConf(flags);
+        try (Namespace namespace = NamespaceBuilder.newBuilder()
+             .conf(conf)
+             .uri(serviceURI.getUri())
+             .build()) {
+            execute(namespace);
+        }
+    }
+
+    protected void reportStats() {
+        // Print report stats
+        long oldTime = System.nanoTime();
+
+        Histogram reportHistogram = null;
+
+        while (true) {
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                break;
+            }
+
+            if (isDone.get()) {
+                break;
+            }
+
+            long now = System.nanoTime();
+            double elapsed = (now - oldTime) / 1e9;
+
+            double rate = recordsRead.sumThenReset() / elapsed;
+            double throughput = bytesRead.sumThenReset() / elapsed / 1024 / 1024;
+
+            reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+
+            log.info("Throughput read : {}  records/s --- {} MB/s --- Latency: mean:"
+                        + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    THROUGHPUT_FORMAT.format(rate), THROUGHPUT_FORMAT.format(throughput),
+                    PADDING_DECIMAL_FORMAT.format(reportHistogram.getMean() / 1000.0),
+                    PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+                    PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+                    PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+                    PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+                    PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+                    PADDING_DECIMAL_FORMAT.format(reportHistogram.getMaxValue() / 1000.0));
+
+            reportHistogram.reset();
+
+            oldTime = now;
+        }
+
+    }
+
+    protected abstract void execute(Namespace namespace) throws Exception;
+
+    @Override
+    public void run() {
+        try {
+            execute();
+        } catch (Exception e) {
+            log.error("Encountered exception at running dlog perf writer", e);
+        }
+    }
+
+    private static DistributedLogConfiguration newDlogConf(Flags flags) {
+        DistributedLogConfiguration conf = new DistributedLogConfiguration()
+            .setReadAheadBatchSize(flags.readAheadBatchSize)
+            .setReadAheadMaxRecords(flags.maxReadAheadRecords)
+            .setReadAheadWaitTime(200);
+        conf.setProperty("bkc.numChannelsPerBookie", 8);
+        return conf;
+    }
+
+    protected static final DecimalFormat THROUGHPUT_FORMAT = new PaddingDecimalFormat("0.0", 8);
+    protected static final DecimalFormat PADDING_DECIMAL_FORMAT = new PaddingDecimalFormat("0.000", 7);
+
+    protected static void printAggregatedStats(Recorder recorder) {
+        Histogram reportHistogram = recorder.getIntervalHistogram();
+
+        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {}"
+                + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+                PADDING_DECIMAL_FORMAT.format(reportHistogram.getMean() / 1000.0),
+                PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+                PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+                PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+                PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+                PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+                PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.999) / 1000.0),
+                PADDING_DECIMAL_FORMAT.format(reportHistogram.getMaxValue() / 1000.0));
+    }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfSegmentReader.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfSegmentReader.java
new file mode 100644
index 0000000..1d77b8d
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfSegmentReader.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.Entry;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
+import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
+import org.apache.distributedlog.namespace.NamespaceDriver.Role;
+
+/**
+ * A perf writer to evaluate write performance.
+ */
+@Slf4j
+public class PerfSegmentReader extends PerfReaderBase {
+
+    @Data
+    static class Split {
+        final DistributedLogManager manager;
+        final LogSegmentMetadata segment;
+        final long startEntryId;
+        final long endEntryId;
+    }
+
+    PerfSegmentReader(ServiceURI serviceURI, Flags flags) {
+        super(serviceURI, flags);
+    }
+
+    @Override
+    protected void execute(Namespace namespace) throws Exception {
+        List<DistributedLogManager> managers = new ArrayList<>(flags.numLogs);
+        for (int i = 0; i < flags.numLogs; i++) {
+            String logName = String.format(flags.logName, i);
+            managers.add(namespace.openLog(logName));
+        }
+        log.info("Successfully open {} logs", managers.size());
+
+        // Get all the log segments
+        final List<Pair<DistributedLogManager, LogSegmentMetadata>> segments = managers.stream()
+            .flatMap(manager -> {
+                try {
+                    return manager.getLogSegments().stream().map(segment -> Pair.of(manager, segment));
+                } catch (IOException e) {
+                    throw new UncheckedIOException(e);
+                }
+            })
+            .collect(Collectors.toList());
+
+        final List<Split> splits = segments.stream()
+            .flatMap(entry -> getNumSplits(entry.getLeft(), entry.getRight()).stream())
+            .collect(Collectors.toList());
+
+        // register shutdown hook to aggregate stats
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            isDone.set(true);
+            printAggregatedStats(cumulativeRecorder);
+        }));
+
+        ExecutorService executor = Executors.newFixedThreadPool(flags.numThreads);
+        try {
+            for (int i = 0; i < flags.numThreads; i++) {
+                final int idx = i;
+                final List<Split> splitsThisThread = splits
+                    .stream()
+                    .filter(split -> splits.indexOf(split) % flags.numThreads == idx)
+                    .collect(Collectors.toList());
+                executor.submit(() -> {
+                    try {
+                        read(splitsThisThread);
+                    } catch (Exception e) {
+                        log.error("Encountered error at writing records", e);
+                    }
+                });
+            }
+            log.info("Started {} write threads", flags.numThreads);
+            reportStats();
+        } finally {
+            executor.shutdown();
+            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                executor.shutdownNow();
+            }
+            managers.forEach(manager -> manager.asyncClose());
+        }
+    }
+
+    void read(List<Split> splits) throws Exception {
+        log.info("Read thread started with : splits = {}",
+            splits.stream()
+                .map(l -> "(log = " + l.manager.getStreamName() + ", segment = "
+                    + l.segment.getLogSegmentSequenceNumber() + " ["  + l.startEntryId + ", " + l.endEntryId + "])")
+                .collect(Collectors.toList()));
+
+        splits.forEach(entry -> {
+            try {
+                readSegmentSplit(entry);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    void readSegmentSplit(Split split) throws Exception {
+        LogSegmentEntryReader reader = result(split.manager.getNamespaceDriver().getLogSegmentEntryStore(Role.READER)
+            .openReader(split.segment, split.getStartEntryId()));
+        reader.start();
+
+        try {
+            MutableBoolean isDone = new MutableBoolean(false);
+            while (!isDone.booleanValue()) {
+                // 100 is just an indicator
+                List<Entry.Reader> entries = result(reader.readNext(100));
+                entries.forEach(entry -> {
+                    LogRecordWithDLSN record;
+                    try {
+                        while ((record = entry.nextRecord()) != null) {
+                            recordsRead.increment();
+                            bytesRead.add(record.getPayloadBuf().readableBytes());
+                        }
+                    } catch (IOException ioe) {
+                        throw new UncheckedIOException(ioe);
+                    } finally {
+                        entry.release();
+                    }
+                    if (split.getEndEntryId() >= 0 && entry.getEntryId() >= split.getEndEntryId()) {
+                        isDone.setValue(true);
+                    }
+                });
+            }
+        } catch (EndOfLogSegmentException e) {
+            // we reached end of log segment
+            return;
+        } finally {
+            reader.asyncClose();
+        }
+
+    }
+
+    List<Split> getNumSplits(DistributedLogManager manager, LogSegmentMetadata segment) {
+        if (flags.numSplitsPerSegment <= 1) {
+            // do split
+            return Lists.newArrayList(
+                new Split(
+                    manager,
+                    segment,
+                    0L,
+                    -1L)
+            );
+        } else {
+            long lastEntryId = segment.getLastEntryId();
+            long numEntriesPerSplit = (lastEntryId + 1) / 2;
+            long nextEntryId = 0L;
+            List<Split> splitsInSegment = new ArrayList<>(flags.numSplitsPerSegment);
+            for (int i = 0; i < flags.numSplitsPerSegment; i++) {
+                long startEntryId = nextEntryId;
+                long endEntryId;
+                if (i == flags.numSplitsPerSegment - 1) {
+                    endEntryId = lastEntryId;
+                } else {
+                    endEntryId = nextEntryId + numEntriesPerSplit - 1;
+                }
+                splitsInSegment.add(new Split(
+                    manager,
+                    segment,
+                    startEntryId,
+                    endEntryId
+                ));
+                nextEntryId = endEntryId + 1;
+            }
+            return splitsInSegment;
+        }
+    }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
index e29dc64..946616f 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
@@ -111,6 +111,13 @@ public class PerfWriter implements Runnable {
 
         @Parameter(
             names = {
+                "-mlss", "--max-log-segment-size"
+            },
+            description = "Max log segment size")
+        public int maxLogSegmentSize = 64 * 1024 * 1024;
+
+        @Parameter(
+            names = {
                 "-b", "--num-bytes"
             },
             description = "Number of bytes to write in total. If 0, it will keep writing")
@@ -387,7 +394,8 @@ public class PerfWriter implements Runnable {
             .setOutputBufferSize(512 * 1024)
             .setPeriodicFlushFrequencyMilliSeconds(2)
             .setWriteLockEnabled(false)
-            .setMaxLogSegmentBytes(512 * 1024 * 1024) // 512MB
+            .setMaxLogSegmentBytes(flags.maxLogSegmentSize)
+            .setLogSegmentRollingIntervalMinutes(1)
             .setExplicitTruncationByApplication(true);
     }
 
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
index 001cacb..3ec8e9c 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
@@ -19,7 +19,7 @@ import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.bookkeeper.tools.common.BKCommand;
 import org.apache.bookkeeper.tools.common.BKFlags;
 import org.apache.bookkeeper.tools.framework.CliSpec;
-import org.apache.bookkeeper.tools.perf.dlog.PerfReader.Flags;
+import org.apache.bookkeeper.tools.perf.dlog.PerfReaderBase.Flags;
 import org.apache.commons.configuration.CompositeConfiguration;
 
 /**
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/SegmentReadCommand.java
similarity index 80%
copy from tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
copy to tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/SegmentReadCommand.java
index 001cacb..f75955a 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/SegmentReadCommand.java
@@ -19,19 +19,19 @@ import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.bookkeeper.tools.common.BKCommand;
 import org.apache.bookkeeper.tools.common.BKFlags;
 import org.apache.bookkeeper.tools.framework.CliSpec;
-import org.apache.bookkeeper.tools.perf.dlog.PerfReader.Flags;
+import org.apache.bookkeeper.tools.perf.dlog.PerfReaderBase.Flags;
 import org.apache.commons.configuration.CompositeConfiguration;
 
 /**
- * Command to read log records to distributedlog streams.
+ * Command to read log records to bookkeeper segments.
  */
 @Slf4j
-public class ReadCommand extends BKCommand<Flags> {
+public class SegmentReadCommand extends BKCommand<Flags> {
 
-    private static final String NAME = "read";
-    private static final String DESC = "Read log records from distributedlog streams";
+    private static final String NAME = "segread";
+    private static final String DESC = "Read log records from distributedlog streams by breaking it down to segments";
 
-    public ReadCommand() {
+    public SegmentReadCommand() {
         super(CliSpec.<Flags>newBuilder()
             .withName(NAME)
             .withDescription(DESC)
@@ -43,14 +43,12 @@ public class ReadCommand extends BKCommand<Flags> {
     protected boolean apply(ServiceURI serviceURI,
                             CompositeConfiguration conf,
                             BKFlags globalFlags, Flags cmdFlags) {
-
-
         if (serviceURI == null) {
             log.warn("No service uri is provided. Use default 'distributedlog://localhost/distributedlog'.");
             serviceURI = ServiceURI.create("distributedlog://localhost/distributedlog");
         }
 
-        PerfReader reader = new PerfReader(serviceURI, cmdFlags);
+        PerfSegmentReader reader = new PerfSegmentReader(serviceURI, cmdFlags);
         reader.run();
         return true;
     }