You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/09/30 01:10:32 UTC
[bookkeeper] branch master updated: [TOOLS] [PERF] Add a `segread`
perf command to read distributedlog streams in segment splits
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 7f2564c [TOOLS] [PERF] Add a `segread` perf command to read distributedlog streams in segment splits
7f2564c is described below
commit 7f2564c49d6ad7795908886433d3afcc4155c9d9
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Sat Sep 29 18:10:27 2018 -0700
[TOOLS] [PERF] Add a `segread` perf command to read distributedlog streams in segment splits
Descriptions of the changes in this PR:
*Motivation*
Find what is the bottleneck/throughput limit of reading a single ledger.
*Changes*
Provide a `segread` perf command to split a stream into a list of segments and split segments into splits,
and test reading segment splits in parallel.
Author: Sijie Guo <si...@apache.org>
Reviewers: Jia Zhai <None>
This closes #1713 from sijie/segment_readers
---
.../tools/perf/DlogPerfCommandGroup.java | 2 +
.../bookkeeper/tools/perf/dlog/PerfReader.java | 166 +----------------
.../bookkeeper/tools/perf/dlog/PerfReaderBase.java | 202 ++++++++++++++++++++
.../tools/perf/dlog/PerfSegmentReader.java | 204 +++++++++++++++++++++
.../bookkeeper/tools/perf/dlog/PerfWriter.java | 10 +-
.../bookkeeper/tools/perf/dlog/ReadCommand.java | 2 +-
.../{ReadCommand.java => SegmentReadCommand.java} | 16 +-
7 files changed, 429 insertions(+), 173 deletions(-)
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
index 75adc4b..d38c839 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
@@ -18,6 +18,7 @@ import org.apache.bookkeeper.tools.common.BKFlags;
import org.apache.bookkeeper.tools.framework.CliCommandGroup;
import org.apache.bookkeeper.tools.framework.CliSpec;
import org.apache.bookkeeper.tools.perf.dlog.ReadCommand;
+import org.apache.bookkeeper.tools.perf.dlog.SegmentReadCommand;
import org.apache.bookkeeper.tools.perf.dlog.WriteCommand;
/**
@@ -34,6 +35,7 @@ public class DlogPerfCommandGroup extends CliCommandGroup<BKFlags> implements Pe
.withParent(BKPerf.NAME)
.addCommand(new WriteCommand())
.addCommand(new ReadCommand())
+ .addCommand(new SegmentReadCommand())
.build();
public DlogPerfCommandGroup() {
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
index 7497d7d..ba0ea7b 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
@@ -28,128 +28,35 @@
package org.apache.bookkeeper.tools.perf.dlog;
-import com.beust.jcommander.Parameter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
-import org.HdrHistogram.Histogram;
-import org.HdrHistogram.Recorder;
import org.apache.bookkeeper.common.net.ServiceURI;
-import org.apache.bookkeeper.tools.framework.CliFlags;
-import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.namespace.Namespace;
-import org.apache.distributedlog.api.namespace.NamespaceBuilder;
/**
- * A perf writer to evaluate write performance.
+ * A perf reader to evaluate read performance.
*/
@Slf4j
-public class PerfReader implements Runnable {
-
- /**
- * Flags for the write command.
- */
- public static class Flags extends CliFlags {
-
- @Parameter(
- names = {
- "-ln", "--log-name"
- },
- description = "Log name or log name pattern if more than 1 log is specified at `--num-logs`")
- public String logName = "test-log-%06d";
-
- @Parameter(
- names = {
- "-l", "--num-logs"
- },
- description = "Number of log streams")
- public int numLogs = 1;
-
- @Parameter(
- names = {
- "-t", "--threads"
- },
- description = "Number of threads reading")
- public int numThreads = 1;
-
- @Parameter(
- names = {
- "-mr", "--max-readahead-records"
- },
- description = "Max readhead records")
- public int maxReadAheadRecords = 1000000;
-
- @Parameter(
- names = {
- "-bs", "--readahead-batch-size"
- },
- description = "ReadAhead Batch Size, in entries"
- )
- public int readAheadBatchSize = 4;
-
- }
-
-
- // stats
- private final LongAdder recordsRead = new LongAdder();
- private final LongAdder bytesRead = new LongAdder();
-
- private final ServiceURI serviceURI;
- private final Flags flags;
- private final Recorder recorder = new Recorder(
- TimeUnit.SECONDS.toMillis(120000), 5
- );
- private final Recorder cumulativeRecorder = new Recorder(
- TimeUnit.SECONDS.toMillis(120000), 5
- );
- private final AtomicBoolean isDone = new AtomicBoolean(false);
+public class PerfReader extends PerfReaderBase {
PerfReader(ServiceURI serviceURI, Flags flags) {
- this.serviceURI = serviceURI;
- this.flags = flags;
+ super(serviceURI, flags);
}
@Override
- public void run() {
- try {
- execute();
- } catch (Exception e) {
- log.error("Encountered exception at running dlog perf writer", e);
- }
- }
-
- void execute() throws Exception {
- ObjectMapper m = new ObjectMapper();
- ObjectWriter w = m.writerWithDefaultPrettyPrinter();
- log.info("Starting dlog perf reader with config : {}", w.writeValueAsString(flags));
-
- DistributedLogConfiguration conf = newDlogConf(flags);
- try (Namespace namespace = NamespaceBuilder.newBuilder()
- .conf(conf)
- .uri(serviceURI.getUri())
- .build()) {
- execute(namespace);
- }
- }
-
- void execute(Namespace namespace) throws Exception {
+ protected void execute(Namespace namespace) throws Exception {
List<Pair<Integer, DistributedLogManager>> managers = new ArrayList<>(flags.numLogs);
for (int i = 0; i < flags.numLogs; i++) {
String logName = String.format(flags.logName, i);
@@ -218,73 +125,8 @@ public class PerfReader implements Runnable {
}
}
- void reportStats() {
- // Print report stats
- long oldTime = System.nanoTime();
- Histogram reportHistogram = null;
- while (true) {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- break;
- }
-
- if (isDone.get()) {
- break;
- }
-
- long now = System.nanoTime();
- double elapsed = (now - oldTime) / 1e9;
- double rate = recordsRead.sumThenReset() / elapsed;
- double throughput = bytesRead.sumThenReset() / elapsed / 1024 / 1024;
-
- reportHistogram = recorder.getIntervalHistogram(reportHistogram);
-
- log.info("Throughput read : {} records/s --- {} MB/s --- Latency: mean:"
- + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
- throughputFormat.format(rate), throughputFormat.format(throughput),
- dec.format(reportHistogram.getMean() / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
- dec.format(reportHistogram.getMaxValue() / 1000.0));
-
- reportHistogram.reset();
-
- oldTime = now;
- }
-
- }
-
- private static DistributedLogConfiguration newDlogConf(Flags flags) {
- return new DistributedLogConfiguration()
- .setReadAheadBatchSize(flags.readAheadBatchSize)
- .setReadAheadMaxRecords(flags.maxReadAheadRecords)
- .setReadAheadWaitTime(200);
- }
-
-
- private static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
- private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
-
- private static void printAggregatedStats(Recorder recorder) {
- Histogram reportHistogram = recorder.getIntervalHistogram();
-
- log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {}"
- + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
- dec.format(reportHistogram.getMean() / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
- dec.format(reportHistogram.getValueAtPercentile(99.999) / 1000.0),
- dec.format(reportHistogram.getMaxValue() / 1000.0));
- }
}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReaderBase.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReaderBase.java
new file mode 100644
index 0000000..9520ef9
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReaderBase.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import com.beust.jcommander.Parameter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import java.text.DecimalFormat;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.extern.slf4j.Slf4j;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+
+@Slf4j
+abstract class PerfReaderBase implements Runnable {
+
+ /**
+ * Flags for the write command.
+ */
+ public static class Flags extends CliFlags {
+
+ @Parameter(
+ names = {
+ "-ln", "--log-name"
+ },
+ description = "Log name or log name pattern if more than 1 log is specified at `--num-logs`")
+ public String logName = "test-log-%06d";
+
+ @Parameter(
+ names = {
+ "-l", "--num-logs"
+ },
+ description = "Number of log streams")
+ public int numLogs = 1;
+
+ @Parameter(
+ names = {
+ "-t", "--threads"
+ },
+ description = "Number of threads reading")
+ public int numThreads = 1;
+
+ @Parameter(
+ names = {
+ "-mr", "--max-readahead-records"
+ },
+ description = "Max readhead records")
+ public int maxReadAheadRecords = 1000000;
+
+ @Parameter(
+ names = {
+ "-ns", "--num-splits-per-segment"
+ },
+ description = "Num splits per segment")
+ public int numSplitsPerSegment = 1;
+
+ @Parameter(
+ names = {
+ "-bs", "--readahead-batch-size"
+ },
+ description = "ReadAhead Batch Size, in entries"
+ )
+ public int readAheadBatchSize = 4;
+
+ }
+
+
+ // stats
+ protected final LongAdder recordsRead = new LongAdder();
+ protected final LongAdder bytesRead = new LongAdder();
+
+ protected final ServiceURI serviceURI;
+ protected final Flags flags;
+ protected final Recorder recorder = new Recorder(
+ TimeUnit.SECONDS.toMillis(120000), 5
+ );
+ protected final Recorder cumulativeRecorder = new Recorder(
+ TimeUnit.SECONDS.toMillis(120000), 5
+ );
+ protected final AtomicBoolean isDone = new AtomicBoolean(false);
+
+ PerfReaderBase(ServiceURI serviceURI, Flags flags) {
+ this.serviceURI = serviceURI;
+ this.flags = flags;
+ }
+
+ protected void execute() throws Exception {
+ ObjectMapper m = new ObjectMapper();
+ ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+ log.info("Starting dlog perf reader with config : {}", w.writeValueAsString(flags));
+
+ DistributedLogConfiguration conf = newDlogConf(flags);
+ try (Namespace namespace = NamespaceBuilder.newBuilder()
+ .conf(conf)
+ .uri(serviceURI.getUri())
+ .build()) {
+ execute(namespace);
+ }
+ }
+
+ protected void reportStats() {
+ // Print report stats
+ long oldTime = System.nanoTime();
+
+ Histogram reportHistogram = null;
+
+ while (true) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ break;
+ }
+
+ if (isDone.get()) {
+ break;
+ }
+
+ long now = System.nanoTime();
+ double elapsed = (now - oldTime) / 1e9;
+
+ double rate = recordsRead.sumThenReset() / elapsed;
+ double throughput = bytesRead.sumThenReset() / elapsed / 1024 / 1024;
+
+ reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+
+ log.info("Throughput read : {} records/s --- {} MB/s --- Latency: mean:"
+ + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+ THROUGHPUT_FORMAT.format(rate), THROUGHPUT_FORMAT.format(throughput),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getMean() / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getMaxValue() / 1000.0));
+
+ reportHistogram.reset();
+
+ oldTime = now;
+ }
+
+ }
+
+ protected abstract void execute(Namespace namespace) throws Exception;
+
+ @Override
+ public void run() {
+ try {
+ execute();
+ } catch (Exception e) {
+ log.error("Encountered exception at running dlog perf writer", e);
+ }
+ }
+
+ private static DistributedLogConfiguration newDlogConf(Flags flags) {
+ DistributedLogConfiguration conf = new DistributedLogConfiguration()
+ .setReadAheadBatchSize(flags.readAheadBatchSize)
+ .setReadAheadMaxRecords(flags.maxReadAheadRecords)
+ .setReadAheadWaitTime(200);
+ conf.setProperty("bkc.numChannelsPerBookie", 8);
+ return conf;
+ }
+
+ protected static final DecimalFormat THROUGHPUT_FORMAT = new PaddingDecimalFormat("0.0", 8);
+ protected static final DecimalFormat PADDING_DECIMAL_FORMAT = new PaddingDecimalFormat("0.000", 7);
+
+ protected static void printAggregatedStats(Recorder recorder) {
+ Histogram reportHistogram = recorder.getIntervalHistogram();
+
+ log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {}"
+ + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getMean() / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getValueAtPercentile(99.999) / 1000.0),
+ PADDING_DECIMAL_FORMAT.format(reportHistogram.getMaxValue() / 1000.0));
+ }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfSegmentReader.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfSegmentReader.java
new file mode 100644
index 0000000..1d77b8d
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfSegmentReader.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.Entry;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
+import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
+import org.apache.distributedlog.namespace.NamespaceDriver.Role;
+
+/**
+ * A perf writer to evaluate write performance.
+ */
+@Slf4j
+public class PerfSegmentReader extends PerfReaderBase {
+
+ @Data
+ static class Split {
+ final DistributedLogManager manager;
+ final LogSegmentMetadata segment;
+ final long startEntryId;
+ final long endEntryId;
+ }
+
+ PerfSegmentReader(ServiceURI serviceURI, Flags flags) {
+ super(serviceURI, flags);
+ }
+
+ @Override
+ protected void execute(Namespace namespace) throws Exception {
+ List<DistributedLogManager> managers = new ArrayList<>(flags.numLogs);
+ for (int i = 0; i < flags.numLogs; i++) {
+ String logName = String.format(flags.logName, i);
+ managers.add(namespace.openLog(logName));
+ }
+ log.info("Successfully open {} logs", managers.size());
+
+ // Get all the log segments
+ final List<Pair<DistributedLogManager, LogSegmentMetadata>> segments = managers.stream()
+ .flatMap(manager -> {
+ try {
+ return manager.getLogSegments().stream().map(segment -> Pair.of(manager, segment));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
+ final List<Split> splits = segments.stream()
+ .flatMap(entry -> getNumSplits(entry.getLeft(), entry.getRight()).stream())
+ .collect(Collectors.toList());
+
+ // register shutdown hook to aggregate stats
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ isDone.set(true);
+ printAggregatedStats(cumulativeRecorder);
+ }));
+
+ ExecutorService executor = Executors.newFixedThreadPool(flags.numThreads);
+ try {
+ for (int i = 0; i < flags.numThreads; i++) {
+ final int idx = i;
+ final List<Split> splitsThisThread = splits
+ .stream()
+ .filter(split -> splits.indexOf(split) % flags.numThreads == idx)
+ .collect(Collectors.toList());
+ executor.submit(() -> {
+ try {
+ read(splitsThisThread);
+ } catch (Exception e) {
+ log.error("Encountered error at writing records", e);
+ }
+ });
+ }
+ log.info("Started {} write threads", flags.numThreads);
+ reportStats();
+ } finally {
+ executor.shutdown();
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ managers.forEach(manager -> manager.asyncClose());
+ }
+ }
+
+ void read(List<Split> splits) throws Exception {
+ log.info("Read thread started with : splits = {}",
+ splits.stream()
+ .map(l -> "(log = " + l.manager.getStreamName() + ", segment = "
+ + l.segment.getLogSegmentSequenceNumber() + " [" + l.startEntryId + ", " + l.endEntryId + "])")
+ .collect(Collectors.toList()));
+
+ splits.forEach(entry -> {
+ try {
+ readSegmentSplit(entry);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ void readSegmentSplit(Split split) throws Exception {
+ LogSegmentEntryReader reader = result(split.manager.getNamespaceDriver().getLogSegmentEntryStore(Role.READER)
+ .openReader(split.segment, split.getStartEntryId()));
+ reader.start();
+
+ try {
+ MutableBoolean isDone = new MutableBoolean(false);
+ while (!isDone.booleanValue()) {
+ // 100 is just an indicator
+ List<Entry.Reader> entries = result(reader.readNext(100));
+ entries.forEach(entry -> {
+ LogRecordWithDLSN record;
+ try {
+ while ((record = entry.nextRecord()) != null) {
+ recordsRead.increment();
+ bytesRead.add(record.getPayloadBuf().readableBytes());
+ }
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ } finally {
+ entry.release();
+ }
+ if (split.getEndEntryId() >= 0 && entry.getEntryId() >= split.getEndEntryId()) {
+ isDone.setValue(true);
+ }
+ });
+ }
+ } catch (EndOfLogSegmentException e) {
+ // we reached end of log segment
+ return;
+ } finally {
+ reader.asyncClose();
+ }
+
+ }
+
+ List<Split> getNumSplits(DistributedLogManager manager, LogSegmentMetadata segment) {
+ if (flags.numSplitsPerSegment <= 1) {
+ // do split
+ return Lists.newArrayList(
+ new Split(
+ manager,
+ segment,
+ 0L,
+ -1L)
+ );
+ } else {
+ long lastEntryId = segment.getLastEntryId();
+ long numEntriesPerSplit = (lastEntryId + 1) / 2;
+ long nextEntryId = 0L;
+ List<Split> splitsInSegment = new ArrayList<>(flags.numSplitsPerSegment);
+ for (int i = 0; i < flags.numSplitsPerSegment; i++) {
+ long startEntryId = nextEntryId;
+ long endEntryId;
+ if (i == flags.numSplitsPerSegment - 1) {
+ endEntryId = lastEntryId;
+ } else {
+ endEntryId = nextEntryId + numEntriesPerSplit - 1;
+ }
+ splitsInSegment.add(new Split(
+ manager,
+ segment,
+ startEntryId,
+ endEntryId
+ ));
+ nextEntryId = endEntryId + 1;
+ }
+ return splitsInSegment;
+ }
+ }
+
+}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
index e29dc64..946616f 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
@@ -111,6 +111,13 @@ public class PerfWriter implements Runnable {
@Parameter(
names = {
+ "-mlss", "--max-log-segment-size"
+ },
+ description = "Max log segment size")
+ public int maxLogSegmentSize = 64 * 1024 * 1024;
+
+ @Parameter(
+ names = {
"-b", "--num-bytes"
},
description = "Number of bytes to write in total. If 0, it will keep writing")
@@ -387,7 +394,8 @@ public class PerfWriter implements Runnable {
.setOutputBufferSize(512 * 1024)
.setPeriodicFlushFrequencyMilliSeconds(2)
.setWriteLockEnabled(false)
- .setMaxLogSegmentBytes(512 * 1024 * 1024) // 512MB
+ .setMaxLogSegmentBytes(flags.maxLogSegmentSize)
+ .setLogSegmentRollingIntervalMinutes(1)
.setExplicitTruncationByApplication(true);
}
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
index 001cacb..3ec8e9c 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
@@ -19,7 +19,7 @@ import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.tools.common.BKCommand;
import org.apache.bookkeeper.tools.common.BKFlags;
import org.apache.bookkeeper.tools.framework.CliSpec;
-import org.apache.bookkeeper.tools.perf.dlog.PerfReader.Flags;
+import org.apache.bookkeeper.tools.perf.dlog.PerfReaderBase.Flags;
import org.apache.commons.configuration.CompositeConfiguration;
/**
diff --git a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/SegmentReadCommand.java
similarity index 80%
copy from tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
copy to tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/SegmentReadCommand.java
index 001cacb..f75955a 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/SegmentReadCommand.java
@@ -19,19 +19,19 @@ import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.tools.common.BKCommand;
import org.apache.bookkeeper.tools.common.BKFlags;
import org.apache.bookkeeper.tools.framework.CliSpec;
-import org.apache.bookkeeper.tools.perf.dlog.PerfReader.Flags;
+import org.apache.bookkeeper.tools.perf.dlog.PerfReaderBase.Flags;
import org.apache.commons.configuration.CompositeConfiguration;
/**
- * Command to read log records to distributedlog streams.
+ * Command to read log records to bookkeeper segments.
*/
@Slf4j
-public class ReadCommand extends BKCommand<Flags> {
+public class SegmentReadCommand extends BKCommand<Flags> {
- private static final String NAME = "read";
- private static final String DESC = "Read log records from distributedlog streams";
+ private static final String NAME = "segread";
+ private static final String DESC = "Read log records from distributedlog streams by breaking it down to segments";
- public ReadCommand() {
+ public SegmentReadCommand() {
super(CliSpec.<Flags>newBuilder()
.withName(NAME)
.withDescription(DESC)
@@ -43,14 +43,12 @@ public class ReadCommand extends BKCommand<Flags> {
protected boolean apply(ServiceURI serviceURI,
CompositeConfiguration conf,
BKFlags globalFlags, Flags cmdFlags) {
-
-
if (serviceURI == null) {
log.warn("No service uri is provided. Use default 'distributedlog://localhost/distributedlog'.");
serviceURI = ServiceURI.create("distributedlog://localhost/distributedlog");
}
- PerfReader reader = new PerfReader(serviceURI, cmdFlags);
+ PerfSegmentReader reader = new PerfSegmentReader(serviceURI, cmdFlags);
reader.run();
return true;
}