You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:55 UTC
[50/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
deleted file mode 100644
index 5b04a05..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter;
-import com.twitter.finagle.stats.OstrichStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.NullStatsProvider;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The launcher for benchmarks.
- */
-public class Benchmarker {
-
- private static final Logger logger = LoggerFactory.getLogger(Benchmarker.class);
-
- static final String USAGE = "Benchmarker [-u <uri>] [-c <conf>] [-s serverset] [-m (read|write|dlwrite)]";
-
- final String[] args;
- final Options options = new Options();
-
- int rate = 100;
- int maxRate = 1000;
- int changeRate = 100;
- int changeRateSeconds = 1800;
- int concurrency = 10;
- String streamPrefix = "dlog-loadtest";
- int shardId = -1;
- int numStreams = 10;
- List<String> serversetPaths = new ArrayList<String>();
- List<String> finagleNames = new ArrayList<String>();
- int msgSize = 256;
- String mode = null;
- int durationMins = 60;
- URI dlUri = null;
- int batchSize = 0;
- int readersPerStream = 1;
- Integer maxStreamId = null;
- int truncationInterval = 3600;
- Integer startStreamId = null;
- Integer endStreamId = null;
- int hostConnectionCoreSize = 10;
- int hostConnectionLimit = 10;
- boolean thriftmux = false;
- boolean handshakeWithClientInfo = false;
- boolean readFromHead = false;
- int sendBufferSize = 1024 * 1024;
- int recvBufferSize = 1024 * 1024;
- boolean enableBatching = false;
- int batchBufferSize = 256 * 1024;
- int batchFlushIntervalMicros = 2000;
- String routingServiceFinagleNameString;
-
- final DistributedLogConfiguration conf = new DistributedLogConfiguration();
- final StatsReceiver statsReceiver = new OstrichStatsReceiver();
- StatsProvider statsProvider = null;
-
- Benchmarker(String[] args) {
- this.args = args;
- // prepare options
- options.addOption("s", "serverset", true, "Proxy Server Set (separated by ',')");
- options.addOption("fn", "finagle-name", true, "Write proxy finagle name (separated by ',')");
- options.addOption("c", "conf", true, "DistributedLog Configuration File");
- options.addOption("u", "uri", true, "DistributedLog URI");
- options.addOption("i", "shard", true, "Shard Id");
- options.addOption("p", "provider", true, "DistributedLog Stats Provider");
- options.addOption("d", "duration", true, "Duration (minutes)");
- options.addOption("sp", "streamprefix", true, "Stream Prefix");
- options.addOption("sc", "streamcount", true, "Number of Streams");
- options.addOption("ms", "messagesize", true, "Message Size (bytes)");
- options.addOption("bs", "batchsize", true, "Batch Size");
- options.addOption("r", "rate", true, "Rate limit (requests/second)");
- options.addOption("mr", "max-rate", true, "Maximum Rate limit (requests/second)");
- options.addOption("cr", "change-rate", true, "Rate to increase each change period (requests/second)");
- options.addOption("ci", "change-interval", true, "Rate to increase period, seconds");
- options.addOption("t", "concurrency", true, "Concurrency (number of threads)");
- options.addOption("m", "mode", true, "Benchmark mode (read/write)");
- options.addOption("rps", "readers-per-stream", true, "Number readers per stream");
- options.addOption("msid", "max-stream-id", true, "Max Stream ID");
- options.addOption("ti", "truncation-interval", true, "Truncation interval in seconds");
- options.addOption("ssid", "start-stream-id", true, "Start Stream ID");
- options.addOption("esid", "end-stream-id", true, "Start Stream ID");
- options.addOption("hccs", "host-connection-core-size", true, "Finagle hostConnectionCoreSize");
- options.addOption("hcl", "host-connection-limit", true, "Finagle hostConnectionLimit");
- options.addOption("mx", "thriftmux", false, "Enable thriftmux (write mode only)");
- options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info");
- options.addOption("rfh", "read-from-head", false, "Read from head of the stream");
- options.addOption("sb", "send-buffer", true, "Channel send buffer size, in bytes");
- options.addOption("rb", "recv-buffer", true, "Channel recv buffer size, in bytes");
- options.addOption("bt", "enable-batch", false, "Enable batching on writers");
- options.addOption("bbs", "batch-buffer-size", true, "The batch buffer size in bytes");
- options.addOption("bfi", "batch-flush-interval", true, "The batch buffer flush interval in micros");
- options.addOption("rs", "routing-service", true, "The routing service finagle name for server-side routing");
- options.addOption("h", "help", false, "Print usage.");
- }
-
- void printUsage() {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp(USAGE, options);
- }
-
- void run() throws Exception {
- logger.info("Running benchmark.");
-
- BasicParser parser = new BasicParser();
- CommandLine cmdline = parser.parse(options, args);
- if (cmdline.hasOption("h")) {
- printUsage();
- System.exit(0);
- }
- if (cmdline.hasOption("s")) {
- String serversetPathStr = cmdline.getOptionValue("s");
- serversetPaths = Arrays.asList(StringUtils.split(serversetPathStr, ','));
- }
- if (cmdline.hasOption("fn")) {
- String finagleNameStr = cmdline.getOptionValue("fn");
- finagleNames = Arrays.asList(StringUtils.split(finagleNameStr, ','));
- }
- if (cmdline.hasOption("i")) {
- shardId = Integer.parseInt(cmdline.getOptionValue("i"));
- }
- if (cmdline.hasOption("d")) {
- durationMins = Integer.parseInt(cmdline.getOptionValue("d"));
- }
- if (cmdline.hasOption("sp")) {
- streamPrefix = cmdline.getOptionValue("sp");
- }
- if (cmdline.hasOption("sc")) {
- numStreams = Integer.parseInt(cmdline.getOptionValue("sc"));
- }
- if (cmdline.hasOption("ms")) {
- msgSize = Integer.parseInt(cmdline.getOptionValue("ms"));
- }
- if (cmdline.hasOption("r")) {
- rate = Integer.parseInt(cmdline.getOptionValue("r"));
- }
- if (cmdline.hasOption("mr")) {
- maxRate = Integer.parseInt(cmdline.getOptionValue("mr"));
- }
- if (cmdline.hasOption("cr")) {
- changeRate = Integer.parseInt(cmdline.getOptionValue("cr"));
- }
- if (cmdline.hasOption("ci")) {
- changeRateSeconds = Integer.parseInt(cmdline.getOptionValue("ci"));
- }
- if (cmdline.hasOption("t")) {
- concurrency = Integer.parseInt(cmdline.getOptionValue("t"));
- }
- if (cmdline.hasOption("m")) {
- mode = cmdline.getOptionValue("m");
- }
- if (cmdline.hasOption("u")) {
- dlUri = URI.create(cmdline.getOptionValue("u"));
- }
- if (cmdline.hasOption("bs")) {
- batchSize = Integer.parseInt(cmdline.getOptionValue("bs"));
- checkArgument("write" != mode, "batchSize supported only for mode=write");
- }
- if (cmdline.hasOption("c")) {
- String configFile = cmdline.getOptionValue("c");
- conf.loadConf(new File(configFile).toURI().toURL());
- }
- if (cmdline.hasOption("rps")) {
- readersPerStream = Integer.parseInt(cmdline.getOptionValue("rps"));
- }
- if (cmdline.hasOption("msid")) {
- maxStreamId = Integer.parseInt(cmdline.getOptionValue("msid"));
- }
- if (cmdline.hasOption("ti")) {
- truncationInterval = Integer.parseInt(cmdline.getOptionValue("ti"));
- }
- if (cmdline.hasOption("ssid")) {
- startStreamId = Integer.parseInt(cmdline.getOptionValue("ssid"));
- }
- if (cmdline.hasOption("esid")) {
- endStreamId = Integer.parseInt(cmdline.getOptionValue("esid"));
- }
- if (cmdline.hasOption("hccs")) {
- hostConnectionCoreSize = Integer.parseInt(cmdline.getOptionValue("hccs"));
- }
- if (cmdline.hasOption("hcl")) {
- hostConnectionLimit = Integer.parseInt(cmdline.getOptionValue("hcl"));
- }
- if (cmdline.hasOption("sb")) {
- sendBufferSize = Integer.parseInt(cmdline.getOptionValue("sb"));
- }
- if (cmdline.hasOption("rb")) {
- recvBufferSize = Integer.parseInt(cmdline.getOptionValue("rb"));
- }
- if (cmdline.hasOption("rs")) {
- routingServiceFinagleNameString = cmdline.getOptionValue("rs");
- }
- thriftmux = cmdline.hasOption("mx");
- handshakeWithClientInfo = cmdline.hasOption("hsci");
- readFromHead = cmdline.hasOption("rfh");
- enableBatching = cmdline.hasOption("bt");
- if (cmdline.hasOption("bbs")) {
- batchBufferSize = Integer.parseInt(cmdline.getOptionValue("bbs"));
- }
- if (cmdline.hasOption("bfi")) {
- batchFlushIntervalMicros = Integer.parseInt(cmdline.getOptionValue("bfi"));
- }
-
- checkArgument(shardId >= 0, "shardId must be >= 0");
- checkArgument(numStreams > 0, "numStreams must be > 0");
- checkArgument(durationMins > 0, "durationMins must be > 0");
- checkArgument(streamPrefix != null, "streamPrefix must be defined");
- checkArgument(hostConnectionCoreSize > 0, "host connection core size must be > 0");
- checkArgument(hostConnectionLimit > 0, "host connection limit must be > 0");
-
- if (cmdline.hasOption("p")) {
- statsProvider = ReflectionUtils.newInstance(cmdline.getOptionValue("p"), StatsProvider.class);
- } else {
- statsProvider = new NullStatsProvider();
- }
-
- logger.info("Starting stats provider : {}.", statsProvider.getClass());
- statsProvider.start(conf);
-
- Worker w = null;
- if (mode.startsWith("read")) {
- w = runReader();
- } else if (mode.startsWith("write")) {
- w = runWriter();
- } else if (mode.startsWith("dlwrite")) {
- w = runDLWriter();
- } else if (mode.startsWith("dlread")) {
- w = runDLReader();
- }
-
- if (w == null) {
- throw new IOException("Unknown mode " + mode + " to run the benchmark.");
- }
-
- Thread workerThread = new Thread(w, mode + "-benchmark-thread");
- workerThread.start();
-
- TimeUnit.MINUTES.sleep(durationMins);
-
- logger.info("{} minutes passed, exiting...", durationMins);
- w.close();
-
- if (null != statsProvider) {
- statsProvider.stop();
- }
-
- Runtime.getRuntime().exit(0);
- }
-
- Worker runWriter() {
- checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri,
- "either serverset paths, finagle-names or uri required");
- checkArgument(msgSize > 0, "messagesize must be greater than 0");
- checkArgument(rate > 0, "rate must be greater than 0");
- checkArgument(maxRate >= rate, "max rate must be greater than rate");
- checkArgument(changeRate >= 0, "change rate must be positive");
- checkArgument(changeRateSeconds >= 0, "change rate must be positive");
- checkArgument(concurrency > 0, "concurrency must be greater than 0");
-
- ShiftableRateLimiter rateLimiter =
- new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS);
- return createWriteWorker(
- streamPrefix,
- dlUri,
- null == startStreamId ? shardId * numStreams : startStreamId,
- null == endStreamId ? (shardId + 1) * numStreams : endStreamId,
- rateLimiter,
- concurrency,
- msgSize,
- batchSize,
- hostConnectionCoreSize,
- hostConnectionLimit,
- serversetPaths,
- finagleNames,
- statsReceiver.scope("write_client"),
- statsProvider.getStatsLogger("write"),
- thriftmux,
- handshakeWithClientInfo,
- sendBufferSize,
- recvBufferSize,
- enableBatching,
- batchBufferSize,
- batchFlushIntervalMicros,
- routingServiceFinagleNameString);
- }
-
- protected WriterWorker createWriteWorker(
- String streamPrefix,
- URI uri,
- int startStreamId,
- int endStreamId,
- ShiftableRateLimiter rateLimiter,
- int writeConcurrency,
- int messageSizeBytes,
- int batchSize,
- int hostConnectionCoreSize,
- int hostConnectionLimit,
- List<String> serverSetPaths,
- List<String> finagleNames,
- StatsReceiver statsReceiver,
- StatsLogger statsLogger,
- boolean thriftmux,
- boolean handshakeWithClientInfo,
- int sendBufferSize,
- int recvBufferSize,
- boolean enableBatching,
- int batchBufferSize,
- int batchFlushIntervalMicros,
- String routingServiceFinagleNameString) {
- return new WriterWorker(
- streamPrefix,
- uri,
- startStreamId,
- endStreamId,
- rateLimiter,
- writeConcurrency,
- messageSizeBytes,
- batchSize,
- hostConnectionCoreSize,
- hostConnectionLimit,
- serverSetPaths,
- finagleNames,
- statsReceiver,
- statsLogger,
- thriftmux,
- handshakeWithClientInfo,
- sendBufferSize,
- recvBufferSize,
- enableBatching,
- batchBufferSize,
- batchFlushIntervalMicros,
- routingServiceFinagleNameString);
- }
-
- Worker runDLWriter() throws IOException {
- checkNotNull(dlUri, "dlUri must be defined");
- checkArgument(rate > 0, "rate must be greater than 0");
- checkArgument(maxRate >= rate, "max rate must be greater than rate");
- checkArgument(changeRate >= 0, "change rate must be positive");
- checkArgument(changeRateSeconds >= 0, "change rate must be positive");
- checkArgument(concurrency > 0, "concurrency must be greater than 0");
-
- ShiftableRateLimiter rateLimiter =
- new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS);
-
- return new DLWriterWorker(conf,
- dlUri,
- streamPrefix,
- shardId * numStreams,
- (shardId + 1) * numStreams,
- rateLimiter,
- concurrency,
- msgSize,
- statsProvider.getStatsLogger("dlwrite"));
- }
-
- Worker runReader() throws IOException {
- checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri,
- "either serverset paths, finagle-names or dlUri required");
- checkArgument(concurrency > 0, "concurrency must be greater than 0");
- checkArgument(truncationInterval > 0, "truncation interval should be greater than 0");
- return runReaderInternal(serversetPaths, finagleNames, truncationInterval);
- }
-
- Worker runDLReader() throws IOException {
- return runReaderInternal(new ArrayList<String>(), new ArrayList<String>(), 0);
- }
-
- private Worker runReaderInternal(List<String> serversetPaths,
- List<String> finagleNames,
- int truncationInterval) throws IOException {
- checkNotNull(dlUri);
-
- int ssid = null == startStreamId ? shardId * numStreams : startStreamId;
- int esid = null == endStreamId ? (shardId + readersPerStream) * numStreams : endStreamId;
- if (null != maxStreamId) {
- esid = Math.min(esid, maxStreamId);
- }
-
- return createReaderWorker(
- conf,
- dlUri,
- streamPrefix,
- ssid,
- esid,
- concurrency,
- serversetPaths,
- finagleNames,
- truncationInterval,
- readFromHead,
- statsReceiver,
- statsProvider.getStatsLogger("dlreader"));
- }
-
- protected ReaderWorker createReaderWorker(
- DistributedLogConfiguration conf,
- URI uri,
- String streamPrefix,
- int startStreamId,
- int endStreamId,
- int readThreadPoolSize,
- List<String> serverSetPaths,
- List<String> finagleNames,
- int truncationIntervalInSeconds,
- boolean readFromHead, /* read from the earliest data of log */
- StatsReceiver statsReceiver,
- StatsLogger statsLogger) throws IOException {
- return new ReaderWorker(
- conf,
- uri,
- streamPrefix,
- startStreamId,
- endStreamId,
- readThreadPoolSize,
- serverSetPaths,
- finagleNames,
- truncationIntervalInSeconds,
- readFromHead,
- statsReceiver,
- statsLogger);
- }
-
- public static void main(String[] args) {
- Benchmarker benchmarker = new Benchmarker(args);
- try {
- benchmarker.run();
- } catch (Exception e) {
- logger.info("Benchmark quit due to : ", e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/DLWriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/DLWriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/DLWriterWorker.java
deleted file mode 100644
index 152cd32..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/DLWriterWorker.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.twitter.distributedlog.AsyncLogWriter;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LogRecord;
-import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.util.FutureEventListener;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The benchmark for core library writer.
- */
-public class DLWriterWorker implements Worker {
-
- private static final Logger LOG = LoggerFactory.getLogger(DLWriterWorker.class);
-
- static final int BACKOFF_MS = 200;
-
- final String streamPrefix;
- final int startStreamId;
- final int endStreamId;
- final int writeConcurrency;
- final int messageSizeBytes;
- final ExecutorService executorService;
- final ScheduledExecutorService rescueService;
- final ShiftableRateLimiter rateLimiter;
- final Random random;
- final DistributedLogNamespace namespace;
- final List<DistributedLogManager> dlms;
- final List<AsyncLogWriter> streamWriters;
- final int numStreams;
-
- volatile boolean running = true;
-
- final StatsLogger statsLogger;
- final OpStatsLogger requestStat;
-
- public DLWriterWorker(DistributedLogConfiguration conf,
- URI uri,
- String streamPrefix,
- int startStreamId,
- int endStreamId,
- ShiftableRateLimiter rateLimiter,
- int writeConcurrency,
- int messageSizeBytes,
- StatsLogger statsLogger) throws IOException {
- checkArgument(startStreamId <= endStreamId);
- this.streamPrefix = streamPrefix;
- this.startStreamId = startStreamId;
- this.endStreamId = endStreamId;
- this.rateLimiter = rateLimiter;
- this.writeConcurrency = writeConcurrency;
- this.messageSizeBytes = messageSizeBytes;
- this.statsLogger = statsLogger;
- this.requestStat = this.statsLogger.getOpStatsLogger("requests");
- this.executorService = Executors.newCachedThreadPool();
- this.rescueService = Executors.newSingleThreadScheduledExecutor();
- this.random = new Random(System.currentTimeMillis());
-
- this.namespace = DistributedLogNamespaceBuilder.newBuilder()
- .conf(conf)
- .uri(uri)
- .statsLogger(statsLogger.scope("dl"))
- .build();
- this.numStreams = endStreamId - startStreamId;
- dlms = new ArrayList<DistributedLogManager>(numStreams);
- streamWriters = new ArrayList<AsyncLogWriter>(numStreams);
- final ConcurrentMap<String, AsyncLogWriter> writers = new ConcurrentHashMap<String, AsyncLogWriter>();
- final CountDownLatch latch = new CountDownLatch(this.numStreams);
- for (int i = startStreamId; i < endStreamId; i++) {
- final String streamName = String.format("%s_%d", streamPrefix, i);
- final DistributedLogManager dlm = namespace.openLog(streamName);
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
- if (null != writers.putIfAbsent(streamName, writer)) {
- FutureUtils.result(writer.asyncClose());
- }
- latch.countDown();
- } catch (IOException e) {
- LOG.error("Failed to intialize writer for stream : {}", streamName, e);
- }
-
- }
- });
- dlms.add(dlm);
- }
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw new IOException("Interrupted on initializing writers for streams.", e);
- }
- for (int i = startStreamId; i < endStreamId; i++) {
- final String streamName = String.format("%s_%d", streamPrefix, i);
- AsyncLogWriter writer = writers.get(streamName);
- if (null == writer) {
- throw new IOException("Writer for " + streamName + " never initialized.");
- }
- streamWriters.add(writer);
- }
- LOG.info("Writing to {} streams.", numStreams);
- }
-
- void rescueWriter(int idx, AsyncLogWriter writer) {
- if (streamWriters.get(idx) == writer) {
- try {
- FutureUtils.result(writer.asyncClose());
- } catch (IOException e) {
- LOG.error("Failed to close writer for stream {}.", idx);
- }
- AsyncLogWriter newWriter = null;
- try {
- newWriter = dlms.get(idx).startAsyncLogSegmentNonPartitioned();
- } catch (IOException e) {
- LOG.error("Failed to create new writer for stream {}, backoff for {} ms.",
- idx, BACKOFF_MS);
- scheduleRescue(idx, writer, BACKOFF_MS);
- }
- streamWriters.set(idx, newWriter);
- } else {
- LOG.warn("AsyncLogWriter for stream {} was already rescued.", idx);
- }
- }
-
- void scheduleRescue(final int idx, final AsyncLogWriter writer, int delayMs) {
- Runnable r = new Runnable() {
- @Override
- public void run() {
- rescueWriter(idx, writer);
- }
- };
- if (delayMs > 0) {
- rescueService.schedule(r, delayMs, TimeUnit.MILLISECONDS);
- } else {
- rescueService.submit(r);
- }
- }
-
- @Override
- public void close() throws IOException {
- this.running = false;
- SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES);
- SchedulerUtils.shutdownScheduler(this.rescueService, 2, TimeUnit.MINUTES);
- for (AsyncLogWriter writer : streamWriters) {
- FutureUtils.result(writer.asyncClose());
- }
- for (DistributedLogManager dlm : dlms) {
- dlm.close();
- }
- namespace.close();
- }
-
- @Override
- public void run() {
- LOG.info("Starting dlwriter (concurrency = {}, prefix = {}, numStreams = {})",
- new Object[] { writeConcurrency, streamPrefix, numStreams });
- for (int i = 0; i < writeConcurrency; i++) {
- executorService.submit(new Writer(i));
- }
- }
-
- class Writer implements Runnable {
-
- final int idx;
-
- Writer(int idx) {
- this.idx = idx;
- }
-
- @Override
- public void run() {
- LOG.info("Started writer {}.", idx);
- while (running) {
- final int streamIdx = random.nextInt(numStreams);
- final AsyncLogWriter writer = streamWriters.get(streamIdx);
- rateLimiter.getLimiter().acquire();
- final long requestMillis = System.currentTimeMillis();
- final byte[] data;
- try {
- data = Utils.generateMessage(requestMillis, messageSizeBytes);
- } catch (TException e) {
- LOG.error("Error on generating message : ", e);
- break;
- }
- writer.write(new LogRecord(requestMillis, data)).addEventListener(new FutureEventListener<DLSN>() {
- @Override
- public void onSuccess(DLSN value) {
- requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
- }
-
- @Override
- public void onFailure(Throwable cause) {
- requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
- LOG.error("Failed to publish, rescue it : ", cause);
- scheduleRescue(streamIdx, writer, 0);
- }
- });
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
deleted file mode 100644
index adbdeda..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.distributedlog.AsyncLogReader;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LogRecordSet;
-import com.twitter.distributedlog.LogRecordWithDLSN;
-import com.twitter.distributedlog.benchmark.thrift.Message;
-import com.twitter.distributedlog.client.serverset.DLZkServerSet;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.distributedlog.service.DistributedLogClient;
-import com.twitter.distributedlog.service.DistributedLogClientBuilder;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Duration$;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The benchmark for core library reader.
- */
-public class ReaderWorker implements Worker {
-
- private static final Logger LOG = LoggerFactory.getLogger(ReaderWorker.class);
-
- static final int BACKOFF_MS = 200;
-
- final String streamPrefix;
- final int startStreamId;
- final int endStreamId;
- final ScheduledExecutorService executorService;
- final ExecutorService callbackExecutor;
- final DistributedLogNamespace namespace;
- final DistributedLogManager[] dlms;
- final AsyncLogReader[] logReaders;
- final StreamReader[] streamReaders;
- final int numStreams;
- final boolean readFromHead;
-
- final int truncationIntervalInSeconds;
- // DL Client Related Variables
- final DLZkServerSet[] serverSets;
- final List<String> finagleNames;
- final DistributedLogClient dlc;
-
- volatile boolean running = true;
-
- final StatsReceiver statsReceiver;
- final StatsLogger statsLogger;
- final OpStatsLogger e2eStat;
- final OpStatsLogger deliveryStat;
- final OpStatsLogger negativeE2EStat;
- final OpStatsLogger negativeDeliveryStat;
- final OpStatsLogger truncationStat;
- final Counter invalidRecordsCounter;
- final Counter outOfOrderSequenceIdCounter;
-
- class StreamReader implements FutureEventListener<List<LogRecordWithDLSN>>, Runnable, Gauge<Number> {
-
- final int streamIdx;
- final String streamName;
- DLSN prevDLSN = null;
- long prevSequenceId = Long.MIN_VALUE;
- private static final String gaugeLabel = "sequence_id";
-
- StreamReader(int idx, StatsLogger statsLogger) {
- this.streamIdx = idx;
- int streamId = startStreamId + streamIdx;
- streamName = String.format("%s_%d", streamPrefix, streamId);
- statsLogger.scope(streamName).registerGauge(gaugeLabel, this);
- }
-
- @Override
- public void onSuccess(final List<LogRecordWithDLSN> records) {
- for (final LogRecordWithDLSN record : records) {
- if (record.isRecordSet()) {
- try {
- processRecordSet(record);
- } catch (IOException e) {
- onFailure(e);
- }
- } else {
- processRecord(record);
- }
- }
- readLoop();
- }
-
- public void processRecordSet(final LogRecordWithDLSN record) throws IOException {
- LogRecordSet.Reader reader = LogRecordSet.of(record);
- LogRecordWithDLSN nextRecord = reader.nextRecord();
- while (null != nextRecord) {
- processRecord(nextRecord);
- nextRecord = reader.nextRecord();
- }
- }
-
- public void processRecord(final LogRecordWithDLSN record) {
- Message msg;
- try {
- msg = Utils.parseMessage(record.getPayload());
- } catch (TException e) {
- invalidRecordsCounter.inc();
- LOG.warn("Failed to parse record {} for stream {} : size = {} , ",
- new Object[] { record, streamIdx, record.getPayload().length, e });
- return;
- }
- long curTimeMillis = System.currentTimeMillis();
- long e2eLatency = curTimeMillis - msg.getPublishTime();
- long deliveryLatency = curTimeMillis - record.getTransactionId();
- if (e2eLatency >= 0) {
- e2eStat.registerSuccessfulEvent(e2eLatency);
- } else {
- negativeE2EStat.registerSuccessfulEvent(-e2eLatency);
- }
- if (deliveryLatency >= 0) {
- deliveryStat.registerSuccessfulEvent(deliveryLatency);
- } else {
- negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency);
- }
-
- prevDLSN = record.getDlsn();
- }
-
- @Override
- public void onFailure(Throwable cause) {
- scheduleReinitStream(streamIdx).map(new Function<Void, Void>() {
- @Override
- public Void apply(Void value) {
- prevDLSN = null;
- prevSequenceId = Long.MIN_VALUE;
- readLoop();
- return null;
- }
- });
- }
-
- void readLoop() {
- if (!running) {
- return;
- }
- logReaders[streamIdx].readBulk(10).addEventListener(this);
- }
-
- @Override
- public void run() {
- final DLSN dlsnToTruncate = prevDLSN;
- if (null == dlsnToTruncate) {
- return;
- }
- final Stopwatch stopwatch = Stopwatch.createStarted();
- dlc.truncate(streamName, dlsnToTruncate).addEventListener(
- new FutureEventListener<Boolean>() {
- @Override
- public void onSuccess(Boolean value) {
- truncationStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
- }
-
- @Override
- public void onFailure(Throwable cause) {
- truncationStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
- LOG.error("Failed to truncate stream {} to {} : ",
- new Object[]{streamName, dlsnToTruncate, cause});
- }
- });
- }
-
- @Override
- public Number getDefaultValue() {
- return Long.MIN_VALUE;
- }
-
- @Override
- public synchronized Number getSample() {
- return prevSequenceId;
- }
-
- void unregisterGauge() {
- statsLogger.scope(streamName).unregisterGauge(gaugeLabel, this);
- }
- }
-
- public ReaderWorker(DistributedLogConfiguration conf,
- URI uri,
- String streamPrefix,
- int startStreamId,
- int endStreamId,
- int readThreadPoolSize,
- List<String> serverSetPaths,
- List<String> finagleNames,
- int truncationIntervalInSeconds,
- boolean readFromHead, /* read from the earliest data of log */
- StatsReceiver statsReceiver,
- StatsLogger statsLogger) throws IOException {
- checkArgument(startStreamId <= endStreamId);
- this.streamPrefix = streamPrefix;
- this.startStreamId = startStreamId;
- this.endStreamId = endStreamId;
- this.truncationIntervalInSeconds = truncationIntervalInSeconds;
- this.readFromHead = readFromHead;
- this.statsReceiver = statsReceiver;
- this.statsLogger = statsLogger;
- this.e2eStat = this.statsLogger.getOpStatsLogger("e2e");
- this.negativeE2EStat = this.statsLogger.getOpStatsLogger("e2eNegative");
- this.deliveryStat = this.statsLogger.getOpStatsLogger("delivery");
- this.negativeDeliveryStat = this.statsLogger.getOpStatsLogger("deliveryNegative");
- this.truncationStat = this.statsLogger.getOpStatsLogger("truncation");
- this.invalidRecordsCounter = this.statsLogger.getCounter("invalid_records");
- this.outOfOrderSequenceIdCounter = this.statsLogger.getCounter("out_of_order_seq_id");
- this.executorService = Executors.newScheduledThreadPool(
- readThreadPoolSize, new ThreadFactoryBuilder().setNameFormat("benchmark.reader-%d").build());
- this.callbackExecutor = Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
- new ThreadFactoryBuilder().setNameFormat("benchmark.reader-callback-%d").build());
- this.finagleNames = finagleNames;
- this.serverSets = createServerSets(serverSetPaths);
-
- conf.setDeserializeRecordSetOnReads(false);
-
- if (truncationIntervalInSeconds > 0 && (!finagleNames.isEmpty() || !serverSetPaths.isEmpty())) {
- // Construct client for truncation
- DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder()
- .clientId(ClientId$.MODULE$.apply("dlog_loadtest_reader"))
- .clientBuilder(ClientBuilder.get()
- .hostConnectionLimit(10)
- .hostConnectionCoresize(10)
- .tcpConnectTimeout(Duration$.MODULE$.fromSeconds(1))
- .requestTimeout(Duration$.MODULE$.fromSeconds(2)))
- .redirectBackoffStartMs(100)
- .redirectBackoffMaxMs(500)
- .requestTimeoutMs(2000)
- .statsReceiver(statsReceiver)
- .thriftmux(true)
- .name("reader");
-
- if (serverSetPaths.isEmpty()) {
- // Prepare finagle names
- String local = finagleNames.get(0);
- String[] remotes = new String[finagleNames.size() - 1];
- finagleNames.subList(1, finagleNames.size()).toArray(remotes);
-
- builder = builder.finagleNameStrs(local, remotes);
- LOG.info("Initialized distributedlog client for truncation @ {}.", finagleNames);
- } else if (serverSets.length != 0){
- ServerSet local = this.serverSets[0].getServerSet();
- ServerSet[] remotes = new ServerSet[this.serverSets.length - 1];
- for (int i = 1; i < serverSets.length; i++) {
- remotes[i - 1] = serverSets[i].getServerSet();
- }
-
- builder = builder.serverSets(local, remotes);
- LOG.info("Initialized distributedlog client for truncation @ {}.", serverSetPaths);
- } else {
- builder = builder.uri(uri);
- LOG.info("Initialized distributedlog client for namespace {}", uri);
- }
- dlc = builder.build();
- } else {
- dlc = null;
- }
-
- // construct the factory
- this.namespace = DistributedLogNamespaceBuilder.newBuilder()
- .conf(conf)
- .uri(uri)
- .statsLogger(statsLogger.scope("dl"))
- .build();
- this.numStreams = endStreamId - startStreamId;
- this.dlms = new DistributedLogManager[numStreams];
- this.logReaders = new AsyncLogReader[numStreams];
- final CountDownLatch latch = new CountDownLatch(numStreams);
- for (int i = 0; i < numStreams; i++) {
- final int idx = i;
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- reinitStream(idx).map(new Function<Void, Void>() {
- @Override
- public Void apply(Void value) {
- LOG.info("Initialized stream reader {}.", idx);
- latch.countDown();
- return null;
- }
- });
- }
- });
- }
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw new DLInterruptedException("Failed to intialize benchmark readers : ", e);
- }
- this.streamReaders = new StreamReader[numStreams];
- for (int i = 0; i < numStreams; i++) {
- streamReaders[i] = new StreamReader(i, statsLogger.scope("perstream"));
- if (truncationIntervalInSeconds > 0) {
- executorService.scheduleWithFixedDelay(streamReaders[i],
- truncationIntervalInSeconds, truncationIntervalInSeconds, TimeUnit.SECONDS);
- }
- }
- LOG.info("Initialized benchmark reader on {} streams {} : [{} - {})",
- new Object[] { numStreams, streamPrefix, startStreamId, endStreamId });
- }
-
- protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) {
- DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()];
- for (int i = 0; i < serverSets.length; i++) {
- String serverSetPath = serverSetPaths.get(i);
- serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000);
- }
- return serverSets;
- }
-
- private Future<Void> reinitStream(int idx) {
- Promise<Void> promise = new Promise<Void>();
- reinitStream(idx, promise);
- return promise;
- }
-
- private void reinitStream(int idx, Promise<Void> promise) {
- int streamId = startStreamId + idx;
- String streamName = String.format("%s_%d", streamPrefix, streamId);
-
- if (logReaders[idx] != null) {
- try {
- FutureUtils.result(logReaders[idx].asyncClose());
- } catch (IOException e) {
- LOG.warn("Failed on closing stream reader {} : ", streamName, e);
- }
- logReaders[idx] = null;
- }
- if (dlms[idx] != null) {
- try {
- dlms[idx].close();
- } catch (IOException e) {
- LOG.warn("Failed on closing dlm {} : ", streamName, e);
- }
- dlms[idx] = null;
- }
-
- try {
- dlms[idx] = namespace.openLog(streamName);
- } catch (IOException ioe) {
- LOG.error("Failed on creating dlm {} : ", streamName, ioe);
- scheduleReinitStream(idx, promise);
- return;
- }
- DLSN lastDLSN;
- if (readFromHead) {
- lastDLSN = DLSN.InitialDLSN;
- } else {
- try {
- lastDLSN = dlms[idx].getLastDLSN();
- } catch (IOException ioe) {
- LOG.error("Failed on getting last dlsn from stream {} : ", streamName, ioe);
- scheduleReinitStream(idx, promise);
- return;
- }
- }
- try {
- logReaders[idx] = dlms[idx].getAsyncLogReader(lastDLSN);
- } catch (IOException ioe) {
- LOG.error("Failed on opening reader for stream {} starting from {} : ",
- new Object[] { streamName, lastDLSN, ioe });
- scheduleReinitStream(idx, promise);
- return;
- }
- LOG.info("Opened reader for stream {}, starting from {}.", streamName, lastDLSN);
- promise.setValue(null);
- }
-
- Future<Void> scheduleReinitStream(int idx) {
- Promise<Void> promise = new Promise<Void>();
- scheduleReinitStream(idx, promise);
- return promise;
- }
-
- void scheduleReinitStream(final int idx, final Promise<Void> promise) {
- executorService.schedule(new Runnable() {
- @Override
- public void run() {
- reinitStream(idx, promise);
- }
- }, BACKOFF_MS, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void close() throws IOException {
- this.running = false;
- for (AsyncLogReader reader : logReaders) {
- if (null != reader) {
- FutureUtils.result(reader.asyncClose());
- }
- }
- for (DistributedLogManager dlm : dlms) {
- if (null != dlm) {
- dlm.close();
- }
- }
- namespace.close();
- SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES);
- SchedulerUtils.shutdownScheduler(callbackExecutor, 2, TimeUnit.MINUTES);
- if (this.dlc != null) {
- this.dlc.close();
- }
- for (DLZkServerSet serverSet: serverSets) {
- serverSet.close();
- }
- // Unregister gauges to prevent GC spirals
- for (StreamReader sr : streamReaders) {
- sr.unregisterGauge();
- }
- }
-
- @Override
- public void run() {
- LOG.info("Starting reader (prefix = {}, numStreams = {}).",
- streamPrefix, numStreams);
- for (StreamReader sr : streamReaders) {
- sr.readLoop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Utils.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Utils.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Utils.java
deleted file mode 100644
index f5c32db..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Utils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark;
-
-import com.twitter.distributedlog.benchmark.thrift.Message;
-import java.nio.ByteBuffer;
-import java.util.Random;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TMemoryInputTransport;
-
-/**
- * Utils for generating and parsing messages.
- */
-public class Utils {
-
- static final Random RAND = new Random(System.currentTimeMillis());
- static final ThreadLocal<TSerializer> MSG_SERIALIZER =
- new ThreadLocal<TSerializer>() {
- @Override
- public TSerializer initialValue() {
- return new TSerializer(new TBinaryProtocol.Factory());
- }
- };
-
- public static byte[] generateMessage(long requestMillis, int payLoadSize) throws TException {
- byte[] payload = new byte[payLoadSize];
- RAND.nextBytes(payload);
- Message msg = new Message(requestMillis, ByteBuffer.wrap(payload));
- return MSG_SERIALIZER.get().serialize(msg);
- }
-
- public static Message parseMessage(byte[] data) throws TException {
- Message msg = new Message();
- TMemoryInputTransport transport = new TMemoryInputTransport(data);
- TBinaryProtocol protocol = new TBinaryProtocol(transport);
- msg.read(protocol);
- return msg;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Worker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Worker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Worker.java
deleted file mode 100644
index 6c60034..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Worker.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark;
-
-import java.io.Closeable;
-
-/**
- * Worker to run benchmark.
- */
-public interface Worker extends Closeable, Runnable {
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
deleted file mode 100644
index dc5a6e2..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter;
-import com.twitter.distributedlog.client.DistributedLogMultiStreamWriter;
-import com.twitter.distributedlog.client.serverset.DLZkServerSet;
-import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.io.CompressionCodec;
-import com.twitter.distributedlog.service.DistributedLogClient;
-import com.twitter.distributedlog.service.DistributedLogClientBuilder;
-import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientId;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Duration$;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Benchmark for distributedlog proxy client.
- */
-public class WriterWorker implements Worker {
-
- static final Logger LOG = LoggerFactory.getLogger(WriterWorker.class);
-
- final String streamPrefix;
- final int startStreamId;
- final int endStreamId;
- final int writeConcurrency;
- final int messageSizeBytes;
- final int hostConnectionCoreSize;
- final int hostConnectionLimit;
- final ExecutorService executorService;
- final ShiftableRateLimiter rateLimiter;
- final URI dlUri;
- final DLZkServerSet[] serverSets;
- final List<String> finagleNames;
- final Random random;
- final List<String> streamNames;
- final int numStreams;
- final int batchSize;
- final boolean thriftmux;
- final boolean handshakeWithClientInfo;
- final int sendBufferSize;
- final int recvBufferSize;
- final boolean enableBatching;
- final int batchBufferSize;
- final int batchFlushIntervalMicros;
- private final String routingServiceFinagleName;
-
- volatile boolean running = true;
-
- final StatsReceiver statsReceiver;
- final StatsLogger statsLogger;
- final OpStatsLogger requestStat;
- final StatsLogger exceptionsLogger;
- final StatsLogger dlErrorCodeLogger;
-
- // callback thread
- final ExecutorService executor;
-
- public WriterWorker(String streamPrefix,
- URI uri,
- int startStreamId,
- int endStreamId,
- ShiftableRateLimiter rateLimiter,
- int writeConcurrency,
- int messageSizeBytes,
- int batchSize,
- int hostConnectionCoreSize,
- int hostConnectionLimit,
- List<String> serverSetPaths,
- List<String> finagleNames,
- StatsReceiver statsReceiver,
- StatsLogger statsLogger,
- boolean thriftmux,
- boolean handshakeWithClientInfo,
- int sendBufferSize,
- int recvBufferSize,
- boolean enableBatching,
- int batchBufferSize,
- int batchFlushIntervalMicros,
- String routingServiceFinagleName) {
- checkArgument(startStreamId <= endStreamId);
- checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty());
- this.streamPrefix = streamPrefix;
- this.dlUri = uri;
- this.startStreamId = startStreamId;
- this.endStreamId = endStreamId;
- this.rateLimiter = rateLimiter;
- this.writeConcurrency = writeConcurrency;
- this.messageSizeBytes = messageSizeBytes;
- this.statsReceiver = statsReceiver;
- this.statsLogger = statsLogger;
- this.requestStat = this.statsLogger.getOpStatsLogger("requests");
- this.exceptionsLogger = statsLogger.scope("exceptions");
- this.dlErrorCodeLogger = statsLogger.scope("dl_error_code");
- this.executorService = Executors.newCachedThreadPool();
- this.random = new Random(System.currentTimeMillis());
- this.batchSize = batchSize;
- this.hostConnectionCoreSize = hostConnectionCoreSize;
- this.hostConnectionLimit = hostConnectionLimit;
- this.thriftmux = thriftmux;
- this.handshakeWithClientInfo = handshakeWithClientInfo;
- this.sendBufferSize = sendBufferSize;
- this.recvBufferSize = recvBufferSize;
- this.enableBatching = enableBatching;
- this.batchBufferSize = batchBufferSize;
- this.batchFlushIntervalMicros = batchFlushIntervalMicros;
- this.finagleNames = finagleNames;
- this.serverSets = createServerSets(serverSetPaths);
- this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
- this.routingServiceFinagleName = routingServiceFinagleName;
-
- // Streams
- streamNames = new ArrayList<String>(endStreamId - startStreamId);
- for (int i = startStreamId; i < endStreamId; i++) {
- streamNames.add(String.format("%s_%d", streamPrefix, i));
- }
- numStreams = streamNames.size();
- LOG.info("Writing to {} streams : {}", numStreams, streamNames);
- }
-
- protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) {
- DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()];
- for (int i = 0; i < serverSets.length; i++) {
- String serverSetPath = serverSetPaths.get(i);
- serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000);
- }
- return serverSets;
- }
-
- @Override
- public void close() throws IOException {
- this.running = false;
- SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES);
- for (DLZkServerSet serverSet: serverSets) {
- serverSet.close();
- }
- }
-
- private DistributedLogClient buildDlogClient() {
- ClientBuilder clientBuilder = ClientBuilder.get()
- .hostConnectionLimit(hostConnectionLimit)
- .hostConnectionCoresize(hostConnectionCoreSize)
- .tcpConnectTimeout(Duration$.MODULE$.fromMilliseconds(200))
- .connectTimeout(Duration$.MODULE$.fromMilliseconds(200))
- .requestTimeout(Duration$.MODULE$.fromSeconds(10))
- .sendBufferSize(sendBufferSize)
- .recvBufferSize(recvBufferSize);
-
- ClientId clientId = ClientId$.MODULE$.apply("dlog_loadtest_writer");
-
- DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder()
- .clientId(clientId)
- .clientBuilder(clientBuilder)
- .thriftmux(thriftmux)
- .redirectBackoffStartMs(100)
- .redirectBackoffMaxMs(500)
- .requestTimeoutMs(10000)
- .statsReceiver(statsReceiver)
- .streamNameRegex("^" + streamPrefix + "_[0-9]+$")
- .handshakeWithClientInfo(handshakeWithClientInfo)
- .periodicHandshakeIntervalMs(TimeUnit.SECONDS.toMillis(30))
- .periodicOwnershipSyncIntervalMs(TimeUnit.MINUTES.toMillis(5))
- .periodicDumpOwnershipCache(true)
- .handshakeTracing(true)
- .serverRoutingServiceFinagleNameStr(routingServiceFinagleName)
- .name("writer");
-
- if (!finagleNames.isEmpty()) {
- String local = finagleNames.get(0);
- String[] remotes = new String[finagleNames.size() - 1];
- finagleNames.subList(1, finagleNames.size()).toArray(remotes);
-
- builder = builder.finagleNameStrs(local, remotes);
- } else if (serverSets.length != 0){
- ServerSet local = serverSets[0].getServerSet();
- ServerSet[] remotes = new ServerSet[serverSets.length - 1];
- for (int i = 1; i < serverSets.length; i++) {
- remotes[i - 1] = serverSets[i].getServerSet();
- }
- builder = builder.serverSets(local, remotes);
- } else {
- builder = builder.uri(dlUri);
- }
-
- return builder.build();
- }
-
- ByteBuffer buildBuffer(long requestMillis, int messageSizeBytes) {
- ByteBuffer data;
- try {
- data = ByteBuffer.wrap(Utils.generateMessage(requestMillis, messageSizeBytes));
- return data;
- } catch (TException e) {
- LOG.error("Error generating message : ", e);
- return null;
- }
- }
-
- List<ByteBuffer> buildBufferList(int batchSize, long requestMillis, int messageSizeBytes) {
- ArrayList<ByteBuffer> bufferList = new ArrayList<ByteBuffer>(batchSize);
- for (int i = 0; i < batchSize; i++) {
- ByteBuffer buf = buildBuffer(requestMillis, messageSizeBytes);
- if (null == buf) {
- return null;
- }
- bufferList.add(buf);
- }
- return bufferList;
- }
-
- class TimedRequestHandler implements FutureEventListener<DLSN>, Runnable {
- final String streamName;
- final long requestMillis;
- DLSN dlsn = null;
- Throwable cause = null;
-
- TimedRequestHandler(String streamName,
- long requestMillis) {
- this.streamName = streamName;
- this.requestMillis = requestMillis;
- }
- @Override
- public void onSuccess(DLSN value) {
- dlsn = value;
- executor.submit(this);
- }
- @Override
- public void onFailure(Throwable cause) {
- this.cause = cause;
- executor.submit(this);
- }
-
- @Override
- public void run() {
- if (null != dlsn) {
- requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
- } else {
- LOG.error("Failed to publish to {} : ", streamName, cause);
- requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
- exceptionsLogger.getCounter(cause.getClass().getName()).inc();
- if (cause instanceof DLException) {
- DLException dle = (DLException) cause;
- dlErrorCodeLogger.getCounter(dle.getCode().toString()).inc();
- }
- }
- }
- }
-
- class Writer implements Runnable {
-
- final int idx;
- final DistributedLogClient dlc;
- DistributedLogMultiStreamWriter writer = null;
- final ShiftableRateLimiter limiter;
-
- Writer(int idx) {
- this.idx = idx;
- this.dlc = buildDlogClient();
- if (enableBatching) {
- writer = DistributedLogMultiStreamWriter.newBuilder()
- .client(this.dlc)
- .streams(streamNames)
- .compressionCodec(CompressionCodec.Type.NONE)
- .flushIntervalMicros(batchFlushIntervalMicros)
- .bufferSize(batchBufferSize)
- .firstSpeculativeTimeoutMs(9000)
- .maxSpeculativeTimeoutMs(9000)
- .requestTimeoutMs(10000)
- .speculativeBackoffMultiplier(2)
- .build();
- }
- this.limiter = rateLimiter.duplicate();
- }
-
- @Override
- public void run() {
- LOG.info("Started writer {}.", idx);
- while (running) {
- this.limiter.getLimiter().acquire();
- final String streamName = streamNames.get(random.nextInt(numStreams));
- final long requestMillis = System.currentTimeMillis();
- final ByteBuffer data = buildBuffer(requestMillis, messageSizeBytes);
- if (null == data) {
- break;
- }
- if (null != writer) {
- writer.write(data).addEventListener(
- new TimedRequestHandler(streamName, requestMillis));
- } else {
- dlc.write(streamName, data).addEventListener(
- new TimedRequestHandler(streamName, requestMillis));
- }
- }
- if (null != writer) {
- writer.close();
- }
- dlc.close();
- }
- }
-
- class BulkWriter implements Runnable {
-
- final int idx;
- final DistributedLogClient dlc;
-
- BulkWriter(int idx) {
- this.idx = idx;
- this.dlc = buildDlogClient();
- }
-
- @Override
- public void run() {
- LOG.info("Started writer {}.", idx);
- while (running) {
- rateLimiter.getLimiter().acquire(batchSize);
- String streamName = streamNames.get(random.nextInt(numStreams));
- final long requestMillis = System.currentTimeMillis();
- final List<ByteBuffer> data = buildBufferList(batchSize, requestMillis, messageSizeBytes);
- if (null == data) {
- break;
- }
- List<Future<DLSN>> results = dlc.writeBulk(streamName, data);
- for (Future<DLSN> result : results) {
- result.addEventListener(new TimedRequestHandler(streamName, requestMillis));
- }
- }
- dlc.close();
- }
- }
-
- @Override
- public void run() {
- LOG.info("Starting writer (concurrency = {}, prefix = {}, batchSize = {})",
- new Object[] { writeConcurrency, streamPrefix, batchSize });
- try {
- for (int i = 0; i < writeConcurrency; i++) {
- Runnable writer = null;
- if (batchSize > 0) {
- writer = new BulkWriter(i);
- } else {
- writer = new Writer(i);
- }
- executorService.submit(writer);
- }
- } catch (Throwable t) {
- LOG.error("Unhandled exception caught", t);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/package-info.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/package-info.java
deleted file mode 100644
index 052a661..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Benchmarks for distributedlog.
- */
-package com.twitter.distributedlog.benchmark;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AbstractReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AbstractReaderBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AbstractReaderBenchmark.java
deleted file mode 100644
index 4d436ee..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AbstractReaderBenchmark.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark.stream;
-
-import com.twitter.distributedlog.DistributedLogConstants;
-import org.apache.commons.cli.CommandLine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-abstract class AbstractReaderBenchmark extends StreamBenchmark {
-
- private static final Logger logger = LoggerFactory.getLogger(SyncReaderBenchmark.class);
-
- protected ReadMode readMode = ReadMode.LATEST;
- protected long fromTxId = DistributedLogConstants.INVALID_TXID;
- protected long rewindMs = 0L;
- protected int batchSize = 1;
-
- protected AbstractReaderBenchmark() {
- options.addOption("t", "tx-id", true,
- "Transaction ID to start read from when reading in mode 'position'");
- options.addOption("r", "rewind", true,
- "Time to rewind back to read from when reading in mode 'rewind' (in milliseconds)");
- options.addOption("m", "mode", true,
- "Read Mode : [oldest, latest, rewind, position]");
- options.addOption("b", "batch-size", true, "Read batch size");
- }
-
- @Override
- protected void parseCommandLine(CommandLine cmdline) {
- if (cmdline.hasOption("m")) {
- String mode = cmdline.getOptionValue("m");
- try {
- readMode = ReadMode.valueOf(mode.toUpperCase());
- } catch (IllegalArgumentException iae) {
- logger.error("Invalid read mode {}.", mode);
- printUsage();
- System.exit(0);
- }
- } else {
- printUsage();
- System.exit(0);
- }
- if (cmdline.hasOption("t")) {
- fromTxId = Long.parseLong(cmdline.getOptionValue("t"));
- }
- if (cmdline.hasOption("r")) {
- rewindMs = Long.parseLong(cmdline.getOptionValue("r"));
- }
- if (cmdline.hasOption("b")) {
- batchSize = Integer.parseInt(cmdline.getOptionValue("b"));
- }
- logger.info("Start reading from transaction id {}, rewind {} ms.", fromTxId, rewindMs);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
deleted file mode 100644
index 86acdb6..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark.stream;
-
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.AsyncLogReader;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LogRecordWithDLSN;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.util.FutureUtils;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Benchmark on {@link com.twitter.distributedlog.AsyncLogReader} reading from a stream.
- */
-public class AsyncReaderBenchmark extends AbstractReaderBenchmark {
-
- private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class);
-
- @Override
- protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) {
- DistributedLogManager dlm = null;
- while (null == dlm) {
- try {
- dlm = namespace.openLog(streamName);
- } catch (IOException ioe) {
- logger.warn("Failed to create dlm for stream {} : ", streamName, ioe);
- }
- if (null == dlm) {
- try {
- TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
- } catch (InterruptedException e) {
- logger.warn("Interrupted from sleep while creating dlm for stream {} : ",
- streamName, e);
- }
- }
- }
- logger.info("Created dlm for stream {}.", streamName);
-
- // Stats
- OpStatsLogger openReaderStats = statsLogger.getOpStatsLogger("open_reader");
- OpStatsLogger blockingReadStats = statsLogger.getOpStatsLogger("blocking_read");
- Counter readCounter = statsLogger.getCounter("reads");
-
- AsyncLogReader reader = null;
- DLSN lastDLSN = null;
- Long lastTxId = null;
- while (null == reader) {
- // initialize the last txid
- if (null == lastTxId) {
- switch (readMode) {
- case OLDEST:
- lastTxId = 0L;
- lastDLSN = DLSN.InitialDLSN;
- break;
- case LATEST:
- lastTxId = Long.MAX_VALUE;
- try {
- lastDLSN = dlm.getLastDLSN();
- } catch (IOException ioe) {
- continue;
- }
- break;
- case REWIND:
- lastTxId = System.currentTimeMillis() - rewindMs;
- lastDLSN = null;
- break;
- case POSITION:
- lastTxId = fromTxId;
- lastDLSN = null;
- break;
- default:
- logger.warn("Unsupported mode {}", readMode);
- printUsage();
- System.exit(0);
- break;
- }
- logger.info("Reading from transaction id = {}, dlsn = {}", lastTxId, lastDLSN);
- }
- // Open the reader
- Stopwatch stopwatch = Stopwatch.createStarted();
- try {
- if (null == lastDLSN) {
- reader = FutureUtils.result(dlm.openAsyncLogReader(lastTxId));
- } else {
- reader = FutureUtils.result(dlm.openAsyncLogReader(lastDLSN));
- }
- long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
- openReaderStats.registerSuccessfulEvent(elapsedMs);
- logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}",
- lastTxId, lastDLSN);
- } catch (IOException ioe) {
- openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
- logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.",
- new Object[] { streamName, lastTxId, lastDLSN });
- }
- if (null == reader) {
- try {
- TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
- } catch (InterruptedException e) {
- logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ",
- streamName, e);
- }
- continue;
- }
- List<LogRecordWithDLSN> records;
- stopwatch = Stopwatch.createUnstarted();
- while (true) {
- try {
- stopwatch.start();
- records = FutureUtils.result(reader.readBulk(batchSize));
- long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
- blockingReadStats.registerSuccessfulEvent(elapsedMicros);
- if (!records.isEmpty()) {
- readCounter.add(records.size());
- LogRecordWithDLSN lastRecord = records.get(records.size() - 1);
- lastTxId = lastRecord.getTransactionId();
- lastDLSN = lastRecord.getDlsn();
- }
- stopwatch.reset();
- } catch (IOException e) {
- logger.warn("Encountered reading record from stream {} : ", streamName, e);
- reader = null;
- break;
- }
- }
- try {
- TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
- } catch (InterruptedException e) {
- logger.warn("Interrupted from sleep while creating reader for stream {} : ",
- streamName, e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerBatchReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerBatchReader.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerBatchReader.java
deleted file mode 100644
index 6a11469..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerBatchReader.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark.stream;
-
-import java.util.Enumeration;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Read ledgers in batches.
- */
-public class LedgerBatchReader implements Runnable {
-
- private static final Logger logger = LoggerFactory.getLogger(LedgerBatchReader.class);
-
- private final LedgerHandle lh;
- private final ReadEntryListener readEntryListener;
- private final int batchSize;
-
- public LedgerBatchReader(LedgerHandle lh,
- ReadEntryListener readEntryListener,
- int batchSize) {
- this.lh = lh;
- this.batchSize = batchSize;
- this.readEntryListener = readEntryListener;
- }
-
- @Override
- public void run() {
- long lac = lh.getLastAddConfirmed();
-
- long entryId = 0L;
-
- while (entryId <= lac) {
- long startEntryId = entryId;
- long endEntryId = Math.min(startEntryId + batchSize - 1, lac);
-
- Enumeration<LedgerEntry> entries = null;
- while (null == entries) {
- try {
- entries = lh.readEntries(startEntryId, endEntryId);
- } catch (BKException bke) {
- logger.error("Encountered exceptions on reading [ {} - {} ] ",
- new Object[] { startEntryId, endEntryId, bke });
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- break;
- }
- }
- if (null == entries) {
- break;
- }
-
- while (entries.hasMoreElements()) {
- LedgerEntry entry = entries.nextElement();
- readEntryListener.onEntryComplete(BKException.Code.OK, lh, entry, null);
- }
-
- entryId = endEntryId + 1;
- }
-
- }
-}