You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:55 UTC

[50/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
deleted file mode 100644
index 5b04a05..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter;
-import com.twitter.finagle.stats.OstrichStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.NullStatsProvider;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The launcher for benchmarks.
- */
-public class Benchmarker {
-
-    private static final Logger logger = LoggerFactory.getLogger(Benchmarker.class);
-
-    static final String USAGE = "Benchmarker [-u <uri>] [-c <conf>] [-s serverset] [-m (read|write|dlwrite)]";
-
-    final String[] args;
-    final Options options = new Options();
-
-    int rate = 100;
-    int maxRate = 1000;
-    int changeRate = 100;
-    int changeRateSeconds = 1800;
-    int concurrency = 10;
-    String streamPrefix = "dlog-loadtest";
-    int shardId = -1;
-    int numStreams = 10;
-    List<String> serversetPaths = new ArrayList<String>();
-    List<String> finagleNames = new ArrayList<String>();
-    int msgSize = 256;
-    String mode = null;
-    int durationMins = 60;
-    URI dlUri = null;
-    int batchSize = 0;
-    int readersPerStream = 1;
-    Integer maxStreamId = null;
-    int truncationInterval = 3600;
-    Integer startStreamId = null;
-    Integer endStreamId = null;
-    int hostConnectionCoreSize = 10;
-    int hostConnectionLimit = 10;
-    boolean thriftmux = false;
-    boolean handshakeWithClientInfo = false;
-    boolean readFromHead = false;
-    int sendBufferSize = 1024 * 1024;
-    int recvBufferSize = 1024 * 1024;
-    boolean enableBatching = false;
-    int batchBufferSize = 256 * 1024;
-    int batchFlushIntervalMicros = 2000;
-    String routingServiceFinagleNameString;
-
-    final DistributedLogConfiguration conf = new DistributedLogConfiguration();
-    final StatsReceiver statsReceiver = new OstrichStatsReceiver();
-    StatsProvider statsProvider = null;
-
-    Benchmarker(String[] args) {
-        this.args = args;
-        // prepare options
-        options.addOption("s", "serverset", true, "Proxy Server Set (separated by ',')");
-        options.addOption("fn", "finagle-name", true, "Write proxy finagle name (separated by ',')");
-        options.addOption("c", "conf", true, "DistributedLog Configuration File");
-        options.addOption("u", "uri", true, "DistributedLog URI");
-        options.addOption("i", "shard", true, "Shard Id");
-        options.addOption("p", "provider", true, "DistributedLog Stats Provider");
-        options.addOption("d", "duration", true, "Duration (minutes)");
-        options.addOption("sp", "streamprefix", true, "Stream Prefix");
-        options.addOption("sc", "streamcount", true, "Number of Streams");
-        options.addOption("ms", "messagesize", true, "Message Size (bytes)");
-        options.addOption("bs", "batchsize", true, "Batch Size");
-        options.addOption("r", "rate", true, "Rate limit (requests/second)");
-        options.addOption("mr", "max-rate", true, "Maximum Rate limit (requests/second)");
-        options.addOption("cr", "change-rate", true, "Rate to increase each change period (requests/second)");
-        options.addOption("ci", "change-interval", true, "Rate to increase period, seconds");
-        options.addOption("t", "concurrency", true, "Concurrency (number of threads)");
-        options.addOption("m", "mode", true, "Benchmark mode (read/write)");
-        options.addOption("rps", "readers-per-stream", true, "Number readers per stream");
-        options.addOption("msid", "max-stream-id", true, "Max Stream ID");
-        options.addOption("ti", "truncation-interval", true, "Truncation interval in seconds");
-        options.addOption("ssid", "start-stream-id", true, "Start Stream ID");
-        options.addOption("esid", "end-stream-id", true, "Start Stream ID");
-        options.addOption("hccs", "host-connection-core-size", true, "Finagle hostConnectionCoreSize");
-        options.addOption("hcl", "host-connection-limit", true, "Finagle hostConnectionLimit");
-        options.addOption("mx", "thriftmux", false, "Enable thriftmux (write mode only)");
-        options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info");
-        options.addOption("rfh", "read-from-head", false, "Read from head of the stream");
-        options.addOption("sb", "send-buffer", true, "Channel send buffer size, in bytes");
-        options.addOption("rb", "recv-buffer", true, "Channel recv buffer size, in bytes");
-        options.addOption("bt", "enable-batch", false, "Enable batching on writers");
-        options.addOption("bbs", "batch-buffer-size", true, "The batch buffer size in bytes");
-        options.addOption("bfi", "batch-flush-interval", true, "The batch buffer flush interval in micros");
-        options.addOption("rs", "routing-service", true, "The routing service finagle name for server-side routing");
-        options.addOption("h", "help", false, "Print usage.");
-    }
-
-    void printUsage() {
-        HelpFormatter helpFormatter = new HelpFormatter();
-        helpFormatter.printHelp(USAGE, options);
-    }
-
-    void run() throws Exception {
-        logger.info("Running benchmark.");
-
-        BasicParser parser = new BasicParser();
-        CommandLine cmdline = parser.parse(options, args);
-        if (cmdline.hasOption("h")) {
-            printUsage();
-            System.exit(0);
-        }
-        if (cmdline.hasOption("s")) {
-            String serversetPathStr = cmdline.getOptionValue("s");
-            serversetPaths = Arrays.asList(StringUtils.split(serversetPathStr, ','));
-        }
-        if (cmdline.hasOption("fn")) {
-            String finagleNameStr = cmdline.getOptionValue("fn");
-            finagleNames = Arrays.asList(StringUtils.split(finagleNameStr, ','));
-        }
-        if (cmdline.hasOption("i")) {
-            shardId = Integer.parseInt(cmdline.getOptionValue("i"));
-        }
-        if (cmdline.hasOption("d")) {
-            durationMins = Integer.parseInt(cmdline.getOptionValue("d"));
-        }
-        if (cmdline.hasOption("sp")) {
-            streamPrefix = cmdline.getOptionValue("sp");
-        }
-        if (cmdline.hasOption("sc")) {
-            numStreams = Integer.parseInt(cmdline.getOptionValue("sc"));
-        }
-        if (cmdline.hasOption("ms")) {
-            msgSize = Integer.parseInt(cmdline.getOptionValue("ms"));
-        }
-        if (cmdline.hasOption("r")) {
-            rate = Integer.parseInt(cmdline.getOptionValue("r"));
-        }
-        if (cmdline.hasOption("mr")) {
-            maxRate = Integer.parseInt(cmdline.getOptionValue("mr"));
-        }
-        if (cmdline.hasOption("cr")) {
-            changeRate = Integer.parseInt(cmdline.getOptionValue("cr"));
-        }
-        if (cmdline.hasOption("ci")) {
-            changeRateSeconds = Integer.parseInt(cmdline.getOptionValue("ci"));
-        }
-        if (cmdline.hasOption("t")) {
-            concurrency = Integer.parseInt(cmdline.getOptionValue("t"));
-        }
-        if (cmdline.hasOption("m")) {
-            mode = cmdline.getOptionValue("m");
-        }
-        if (cmdline.hasOption("u")) {
-            dlUri = URI.create(cmdline.getOptionValue("u"));
-        }
-        if (cmdline.hasOption("bs")) {
-            batchSize = Integer.parseInt(cmdline.getOptionValue("bs"));
-            checkArgument("write" != mode, "batchSize supported only for mode=write");
-        }
-        if (cmdline.hasOption("c")) {
-            String configFile = cmdline.getOptionValue("c");
-            conf.loadConf(new File(configFile).toURI().toURL());
-        }
-        if (cmdline.hasOption("rps")) {
-            readersPerStream = Integer.parseInt(cmdline.getOptionValue("rps"));
-        }
-        if (cmdline.hasOption("msid")) {
-            maxStreamId = Integer.parseInt(cmdline.getOptionValue("msid"));
-        }
-        if (cmdline.hasOption("ti")) {
-            truncationInterval = Integer.parseInt(cmdline.getOptionValue("ti"));
-        }
-        if (cmdline.hasOption("ssid")) {
-            startStreamId = Integer.parseInt(cmdline.getOptionValue("ssid"));
-        }
-        if (cmdline.hasOption("esid")) {
-            endStreamId = Integer.parseInt(cmdline.getOptionValue("esid"));
-        }
-        if (cmdline.hasOption("hccs")) {
-            hostConnectionCoreSize = Integer.parseInt(cmdline.getOptionValue("hccs"));
-        }
-        if (cmdline.hasOption("hcl")) {
-            hostConnectionLimit = Integer.parseInt(cmdline.getOptionValue("hcl"));
-        }
-        if (cmdline.hasOption("sb")) {
-            sendBufferSize = Integer.parseInt(cmdline.getOptionValue("sb"));
-        }
-        if (cmdline.hasOption("rb")) {
-            recvBufferSize = Integer.parseInt(cmdline.getOptionValue("rb"));
-        }
-        if (cmdline.hasOption("rs")) {
-            routingServiceFinagleNameString = cmdline.getOptionValue("rs");
-        }
-        thriftmux = cmdline.hasOption("mx");
-        handshakeWithClientInfo = cmdline.hasOption("hsci");
-        readFromHead = cmdline.hasOption("rfh");
-        enableBatching = cmdline.hasOption("bt");
-        if (cmdline.hasOption("bbs")) {
-            batchBufferSize = Integer.parseInt(cmdline.getOptionValue("bbs"));
-        }
-        if (cmdline.hasOption("bfi")) {
-            batchFlushIntervalMicros = Integer.parseInt(cmdline.getOptionValue("bfi"));
-        }
-
-        checkArgument(shardId >= 0, "shardId must be >= 0");
-        checkArgument(numStreams > 0, "numStreams must be > 0");
-        checkArgument(durationMins > 0, "durationMins must be > 0");
-        checkArgument(streamPrefix != null, "streamPrefix must be defined");
-        checkArgument(hostConnectionCoreSize > 0, "host connection core size must be > 0");
-        checkArgument(hostConnectionLimit > 0, "host connection limit must be > 0");
-
-        if (cmdline.hasOption("p")) {
-            statsProvider = ReflectionUtils.newInstance(cmdline.getOptionValue("p"), StatsProvider.class);
-        } else {
-            statsProvider = new NullStatsProvider();
-        }
-
-        logger.info("Starting stats provider : {}.", statsProvider.getClass());
-        statsProvider.start(conf);
-
-        Worker w = null;
-        if (mode.startsWith("read")) {
-            w = runReader();
-        } else if (mode.startsWith("write")) {
-            w = runWriter();
-        } else if (mode.startsWith("dlwrite")) {
-            w = runDLWriter();
-        } else if (mode.startsWith("dlread")) {
-            w = runDLReader();
-        }
-
-        if (w == null) {
-            throw new IOException("Unknown mode " + mode + " to run the benchmark.");
-        }
-
-        Thread workerThread = new Thread(w, mode + "-benchmark-thread");
-        workerThread.start();
-
-        TimeUnit.MINUTES.sleep(durationMins);
-
-        logger.info("{} minutes passed, exiting...", durationMins);
-        w.close();
-
-        if (null != statsProvider) {
-            statsProvider.stop();
-        }
-
-        Runtime.getRuntime().exit(0);
-    }
-
-    Worker runWriter() {
-        checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri,
-                "either serverset paths, finagle-names or uri required");
-        checkArgument(msgSize > 0, "messagesize must be greater than 0");
-        checkArgument(rate > 0, "rate must be greater than 0");
-        checkArgument(maxRate >= rate, "max rate must be greater than rate");
-        checkArgument(changeRate >= 0, "change rate must be positive");
-        checkArgument(changeRateSeconds >= 0, "change rate must be positive");
-        checkArgument(concurrency > 0, "concurrency must be greater than 0");
-
-        ShiftableRateLimiter rateLimiter =
-                new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS);
-        return createWriteWorker(
-                streamPrefix,
-                dlUri,
-                null == startStreamId ? shardId * numStreams : startStreamId,
-                null == endStreamId ? (shardId + 1) * numStreams : endStreamId,
-                rateLimiter,
-                concurrency,
-                msgSize,
-                batchSize,
-                hostConnectionCoreSize,
-                hostConnectionLimit,
-                serversetPaths,
-                finagleNames,
-                statsReceiver.scope("write_client"),
-                statsProvider.getStatsLogger("write"),
-                thriftmux,
-                handshakeWithClientInfo,
-                sendBufferSize,
-                recvBufferSize,
-                enableBatching,
-                batchBufferSize,
-                batchFlushIntervalMicros,
-                routingServiceFinagleNameString);
-    }
-
-    protected WriterWorker createWriteWorker(
-            String streamPrefix,
-            URI uri,
-            int startStreamId,
-            int endStreamId,
-            ShiftableRateLimiter rateLimiter,
-            int writeConcurrency,
-            int messageSizeBytes,
-            int batchSize,
-            int hostConnectionCoreSize,
-            int hostConnectionLimit,
-            List<String> serverSetPaths,
-            List<String> finagleNames,
-            StatsReceiver statsReceiver,
-            StatsLogger statsLogger,
-            boolean thriftmux,
-            boolean handshakeWithClientInfo,
-            int sendBufferSize,
-            int recvBufferSize,
-            boolean enableBatching,
-            int batchBufferSize,
-            int batchFlushIntervalMicros,
-            String routingServiceFinagleNameString) {
-        return new WriterWorker(
-                streamPrefix,
-                uri,
-                startStreamId,
-                endStreamId,
-                rateLimiter,
-                writeConcurrency,
-                messageSizeBytes,
-                batchSize,
-                hostConnectionCoreSize,
-                hostConnectionLimit,
-                serverSetPaths,
-                finagleNames,
-                statsReceiver,
-                statsLogger,
-                thriftmux,
-                handshakeWithClientInfo,
-                sendBufferSize,
-                recvBufferSize,
-                enableBatching,
-                batchBufferSize,
-                batchFlushIntervalMicros,
-                routingServiceFinagleNameString);
-    }
-
-    Worker runDLWriter() throws IOException {
-        checkNotNull(dlUri, "dlUri must be defined");
-        checkArgument(rate > 0, "rate must be greater than 0");
-        checkArgument(maxRate >= rate, "max rate must be greater than rate");
-        checkArgument(changeRate >= 0, "change rate must be positive");
-        checkArgument(changeRateSeconds >= 0, "change rate must be positive");
-        checkArgument(concurrency > 0, "concurrency must be greater than 0");
-
-        ShiftableRateLimiter rateLimiter =
-                new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS);
-
-        return new DLWriterWorker(conf,
-                dlUri,
-                streamPrefix,
-                shardId * numStreams,
-                (shardId + 1) * numStreams,
-                rateLimiter,
-                concurrency,
-                msgSize,
-                statsProvider.getStatsLogger("dlwrite"));
-    }
-
-    Worker runReader() throws IOException {
-        checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri,
-                "either serverset paths, finagle-names or dlUri required");
-        checkArgument(concurrency > 0, "concurrency must be greater than 0");
-        checkArgument(truncationInterval > 0, "truncation interval should be greater than 0");
-        return runReaderInternal(serversetPaths, finagleNames, truncationInterval);
-    }
-
-    Worker runDLReader() throws IOException {
-        return runReaderInternal(new ArrayList<String>(), new ArrayList<String>(), 0);
-    }
-
-    private Worker runReaderInternal(List<String> serversetPaths,
-                                     List<String> finagleNames,
-                                     int truncationInterval) throws IOException {
-        checkNotNull(dlUri);
-
-        int ssid = null == startStreamId ? shardId * numStreams : startStreamId;
-        int esid = null == endStreamId ? (shardId + readersPerStream) * numStreams : endStreamId;
-        if (null != maxStreamId) {
-            esid = Math.min(esid, maxStreamId);
-        }
-
-        return createReaderWorker(
-                conf,
-                dlUri,
-                streamPrefix,
-                ssid,
-                esid,
-                concurrency,
-                serversetPaths,
-                finagleNames,
-                truncationInterval,
-                readFromHead,
-                statsReceiver,
-                statsProvider.getStatsLogger("dlreader"));
-    }
-
-    protected ReaderWorker createReaderWorker(
-            DistributedLogConfiguration conf,
-            URI uri,
-            String streamPrefix,
-            int startStreamId,
-            int endStreamId,
-            int readThreadPoolSize,
-            List<String> serverSetPaths,
-            List<String> finagleNames,
-            int truncationIntervalInSeconds,
-            boolean readFromHead, /* read from the earliest data of log */
-            StatsReceiver statsReceiver,
-            StatsLogger statsLogger) throws IOException {
-        return new ReaderWorker(
-                conf,
-                uri,
-                streamPrefix,
-                startStreamId,
-                endStreamId,
-                readThreadPoolSize,
-                serverSetPaths,
-                finagleNames,
-                truncationIntervalInSeconds,
-                readFromHead,
-                statsReceiver,
-                statsLogger);
-    }
-
-    public static void main(String[] args) {
-        Benchmarker benchmarker = new Benchmarker(args);
-        try {
-            benchmarker.run();
-        } catch (Exception e) {
-            logger.info("Benchmark quit due to : ", e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/DLWriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/DLWriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/DLWriterWorker.java
deleted file mode 100644
index 152cd32..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/DLWriterWorker.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.twitter.distributedlog.AsyncLogWriter;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LogRecord;
-import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.util.FutureEventListener;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The benchmark for core library writer.
- */
-public class DLWriterWorker implements Worker {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DLWriterWorker.class);
-
-    static final int BACKOFF_MS = 200;
-
-    final String streamPrefix;
-    final int startStreamId;
-    final int endStreamId;
-    final int writeConcurrency;
-    final int messageSizeBytes;
-    final ExecutorService executorService;
-    final ScheduledExecutorService rescueService;
-    final ShiftableRateLimiter rateLimiter;
-    final Random random;
-    final DistributedLogNamespace namespace;
-    final List<DistributedLogManager> dlms;
-    final List<AsyncLogWriter> streamWriters;
-    final int numStreams;
-
-    volatile boolean running = true;
-
-    final StatsLogger statsLogger;
-    final OpStatsLogger requestStat;
-
-    public DLWriterWorker(DistributedLogConfiguration conf,
-                          URI uri,
-                          String streamPrefix,
-                          int startStreamId,
-                          int endStreamId,
-                          ShiftableRateLimiter rateLimiter,
-                          int writeConcurrency,
-                          int messageSizeBytes,
-                          StatsLogger statsLogger) throws IOException {
-        checkArgument(startStreamId <= endStreamId);
-        this.streamPrefix = streamPrefix;
-        this.startStreamId = startStreamId;
-        this.endStreamId = endStreamId;
-        this.rateLimiter = rateLimiter;
-        this.writeConcurrency = writeConcurrency;
-        this.messageSizeBytes = messageSizeBytes;
-        this.statsLogger = statsLogger;
-        this.requestStat = this.statsLogger.getOpStatsLogger("requests");
-        this.executorService = Executors.newCachedThreadPool();
-        this.rescueService = Executors.newSingleThreadScheduledExecutor();
-        this.random = new Random(System.currentTimeMillis());
-
-        this.namespace = DistributedLogNamespaceBuilder.newBuilder()
-                .conf(conf)
-                .uri(uri)
-                .statsLogger(statsLogger.scope("dl"))
-                .build();
-        this.numStreams = endStreamId - startStreamId;
-        dlms = new ArrayList<DistributedLogManager>(numStreams);
-        streamWriters = new ArrayList<AsyncLogWriter>(numStreams);
-        final ConcurrentMap<String, AsyncLogWriter> writers = new ConcurrentHashMap<String, AsyncLogWriter>();
-        final CountDownLatch latch = new CountDownLatch(this.numStreams);
-        for (int i = startStreamId; i < endStreamId; i++) {
-            final String streamName = String.format("%s_%d", streamPrefix, i);
-            final DistributedLogManager dlm = namespace.openLog(streamName);
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
-                        if (null != writers.putIfAbsent(streamName, writer)) {
-                            FutureUtils.result(writer.asyncClose());
-                        }
-                        latch.countDown();
-                    } catch (IOException e) {
-                        LOG.error("Failed to intialize writer for stream : {}", streamName, e);
-                    }
-
-                }
-            });
-            dlms.add(dlm);
-        }
-        try {
-            latch.await();
-        } catch (InterruptedException e) {
-            throw new IOException("Interrupted on initializing writers for streams.", e);
-        }
-        for (int i = startStreamId; i < endStreamId; i++) {
-            final String streamName = String.format("%s_%d", streamPrefix, i);
-            AsyncLogWriter writer = writers.get(streamName);
-            if (null == writer) {
-                throw new IOException("Writer for " + streamName + " never initialized.");
-            }
-            streamWriters.add(writer);
-        }
-        LOG.info("Writing to {} streams.", numStreams);
-    }
-
-    void rescueWriter(int idx, AsyncLogWriter writer) {
-        if (streamWriters.get(idx) == writer) {
-            try {
-                FutureUtils.result(writer.asyncClose());
-            } catch (IOException e) {
-                LOG.error("Failed to close writer for stream {}.", idx);
-            }
-            AsyncLogWriter newWriter = null;
-            try {
-                newWriter = dlms.get(idx).startAsyncLogSegmentNonPartitioned();
-            } catch (IOException e) {
-                LOG.error("Failed to create new writer for stream {}, backoff for {} ms.",
-                          idx, BACKOFF_MS);
-                scheduleRescue(idx, writer, BACKOFF_MS);
-            }
-            streamWriters.set(idx, newWriter);
-        } else {
-            LOG.warn("AsyncLogWriter for stream {} was already rescued.", idx);
-        }
-    }
-
-    void scheduleRescue(final int idx, final AsyncLogWriter writer, int delayMs) {
-        Runnable r = new Runnable() {
-            @Override
-            public void run() {
-                rescueWriter(idx, writer);
-            }
-        };
-        if (delayMs > 0) {
-            rescueService.schedule(r, delayMs, TimeUnit.MILLISECONDS);
-        } else {
-            rescueService.submit(r);
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.running = false;
-        SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES);
-        SchedulerUtils.shutdownScheduler(this.rescueService, 2, TimeUnit.MINUTES);
-        for (AsyncLogWriter writer : streamWriters) {
-            FutureUtils.result(writer.asyncClose());
-        }
-        for (DistributedLogManager dlm : dlms) {
-            dlm.close();
-        }
-        namespace.close();
-    }
-
-    @Override
-    public void run() {
-        LOG.info("Starting dlwriter (concurrency = {}, prefix = {}, numStreams = {})",
-                 new Object[] { writeConcurrency, streamPrefix, numStreams });
-        for (int i = 0; i < writeConcurrency; i++) {
-            executorService.submit(new Writer(i));
-        }
-    }
-
-    class Writer implements Runnable {
-
-        final int idx;
-
-        Writer(int idx) {
-            this.idx = idx;
-        }
-
-        @Override
-        public void run() {
-            LOG.info("Started writer {}.", idx);
-            while (running) {
-                final int streamIdx = random.nextInt(numStreams);
-                final AsyncLogWriter writer = streamWriters.get(streamIdx);
-                rateLimiter.getLimiter().acquire();
-                final long requestMillis = System.currentTimeMillis();
-                final byte[] data;
-                try {
-                    data = Utils.generateMessage(requestMillis, messageSizeBytes);
-                } catch (TException e) {
-                    LOG.error("Error on generating message : ", e);
-                    break;
-                }
-                writer.write(new LogRecord(requestMillis, data)).addEventListener(new FutureEventListener<DLSN>() {
-                    @Override
-                    public void onSuccess(DLSN value) {
-                        requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
-                        LOG.error("Failed to publish, rescue it : ", cause);
-                        scheduleRescue(streamIdx, writer, 0);
-                    }
-                });
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
deleted file mode 100644
index adbdeda..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.distributedlog.AsyncLogReader;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LogRecordSet;
-import com.twitter.distributedlog.LogRecordWithDLSN;
-import com.twitter.distributedlog.benchmark.thrift.Message;
-import com.twitter.distributedlog.client.serverset.DLZkServerSet;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.distributedlog.service.DistributedLogClient;
-import com.twitter.distributedlog.service.DistributedLogClientBuilder;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Duration$;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The benchmark for core library reader.
- */
-public class ReaderWorker implements Worker {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ReaderWorker.class);
-
-    static final int BACKOFF_MS = 200;
-
-    final String streamPrefix;
-    final int startStreamId;
-    final int endStreamId;
-    final ScheduledExecutorService executorService;
-    final ExecutorService callbackExecutor;
-    final DistributedLogNamespace namespace;
-    final DistributedLogManager[] dlms;
-    final AsyncLogReader[] logReaders;
-    final StreamReader[] streamReaders;
-    final int numStreams;
-    final boolean readFromHead;
-
-    final int truncationIntervalInSeconds;
-    // DL Client Related Variables
-    final DLZkServerSet[] serverSets;
-    final List<String> finagleNames;
-    final DistributedLogClient dlc;
-
-    volatile boolean running = true;
-
-    final StatsReceiver statsReceiver;
-    final StatsLogger statsLogger;
-    final OpStatsLogger e2eStat;
-    final OpStatsLogger deliveryStat;
-    final OpStatsLogger negativeE2EStat;
-    final OpStatsLogger negativeDeliveryStat;
-    final OpStatsLogger truncationStat;
-    final Counter invalidRecordsCounter;
-    final Counter outOfOrderSequenceIdCounter;
-
-    class StreamReader implements FutureEventListener<List<LogRecordWithDLSN>>, Runnable, Gauge<Number> {
-
-        final int streamIdx;
-        final String streamName;
-        DLSN prevDLSN = null;
-        long prevSequenceId = Long.MIN_VALUE;
-        private static final String gaugeLabel = "sequence_id";
-
-        StreamReader(int idx, StatsLogger statsLogger) {
-            this.streamIdx = idx;
-            int streamId = startStreamId + streamIdx;
-            streamName = String.format("%s_%d", streamPrefix, streamId);
-            statsLogger.scope(streamName).registerGauge(gaugeLabel, this);
-        }
-
-        @Override
-        public void onSuccess(final List<LogRecordWithDLSN> records) {
-            for (final LogRecordWithDLSN record : records) {
-                if (record.isRecordSet()) {
-                    try {
-                        processRecordSet(record);
-                    } catch (IOException e) {
-                        onFailure(e);
-                    }
-                } else {
-                    processRecord(record);
-                }
-            }
-            readLoop();
-        }
-
-        public void processRecordSet(final LogRecordWithDLSN record) throws IOException {
-            LogRecordSet.Reader reader = LogRecordSet.of(record);
-            LogRecordWithDLSN nextRecord = reader.nextRecord();
-            while (null != nextRecord) {
-                processRecord(nextRecord);
-                nextRecord = reader.nextRecord();
-            }
-        }
-
-        public void processRecord(final LogRecordWithDLSN record) {
-            Message msg;
-            try {
-                msg = Utils.parseMessage(record.getPayload());
-            } catch (TException e) {
-                invalidRecordsCounter.inc();
-                LOG.warn("Failed to parse record {} for stream {} : size = {} , ",
-                         new Object[] { record, streamIdx, record.getPayload().length, e });
-                return;
-            }
-            long curTimeMillis = System.currentTimeMillis();
-            long e2eLatency = curTimeMillis - msg.getPublishTime();
-            long deliveryLatency = curTimeMillis - record.getTransactionId();
-            if (e2eLatency >= 0) {
-                e2eStat.registerSuccessfulEvent(e2eLatency);
-            } else {
-                negativeE2EStat.registerSuccessfulEvent(-e2eLatency);
-            }
-            if (deliveryLatency >= 0) {
-                deliveryStat.registerSuccessfulEvent(deliveryLatency);
-            } else {
-                negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency);
-            }
-
-            prevDLSN = record.getDlsn();
-        }
-
-        @Override
-        public void onFailure(Throwable cause) {
-            scheduleReinitStream(streamIdx).map(new Function<Void, Void>() {
-                @Override
-                public Void apply(Void value) {
-                    prevDLSN = null;
-                    prevSequenceId = Long.MIN_VALUE;
-                    readLoop();
-                    return null;
-                }
-            });
-        }
-
-        void readLoop() {
-            if (!running) {
-                return;
-            }
-            logReaders[streamIdx].readBulk(10).addEventListener(this);
-        }
-
-        @Override
-        public void run() {
-            final DLSN dlsnToTruncate = prevDLSN;
-            if (null == dlsnToTruncate) {
-                return;
-            }
-            final Stopwatch stopwatch = Stopwatch.createStarted();
-            dlc.truncate(streamName, dlsnToTruncate).addEventListener(
-                    new FutureEventListener<Boolean>() {
-                        @Override
-                        public void onSuccess(Boolean value) {
-                            truncationStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
-                        }
-
-                        @Override
-                        public void onFailure(Throwable cause) {
-                            truncationStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
-                            LOG.error("Failed to truncate stream {} to {} : ",
-                                    new Object[]{streamName, dlsnToTruncate, cause});
-                        }
-                    });
-        }
-
-        @Override
-        public Number getDefaultValue() {
-            return Long.MIN_VALUE;
-        }
-
-        @Override
-        public synchronized Number getSample() {
-            return prevSequenceId;
-        }
-
-        void unregisterGauge() {
-            statsLogger.scope(streamName).unregisterGauge(gaugeLabel, this);
-        }
-    }
-
-    public ReaderWorker(DistributedLogConfiguration conf,
-                        URI uri,
-                        String streamPrefix,
-                        int startStreamId,
-                        int endStreamId,
-                        int readThreadPoolSize,
-                        List<String> serverSetPaths,
-                        List<String> finagleNames,
-                        int truncationIntervalInSeconds,
-                        boolean readFromHead, /* read from the earliest data of log */
-                        StatsReceiver statsReceiver,
-                        StatsLogger statsLogger) throws IOException {
-        checkArgument(startStreamId <= endStreamId);
-        this.streamPrefix = streamPrefix;
-        this.startStreamId = startStreamId;
-        this.endStreamId = endStreamId;
-        this.truncationIntervalInSeconds = truncationIntervalInSeconds;
-        this.readFromHead = readFromHead;
-        this.statsReceiver = statsReceiver;
-        this.statsLogger = statsLogger;
-        this.e2eStat = this.statsLogger.getOpStatsLogger("e2e");
-        this.negativeE2EStat = this.statsLogger.getOpStatsLogger("e2eNegative");
-        this.deliveryStat = this.statsLogger.getOpStatsLogger("delivery");
-        this.negativeDeliveryStat = this.statsLogger.getOpStatsLogger("deliveryNegative");
-        this.truncationStat = this.statsLogger.getOpStatsLogger("truncation");
-        this.invalidRecordsCounter = this.statsLogger.getCounter("invalid_records");
-        this.outOfOrderSequenceIdCounter = this.statsLogger.getCounter("out_of_order_seq_id");
-        this.executorService = Executors.newScheduledThreadPool(
-                readThreadPoolSize, new ThreadFactoryBuilder().setNameFormat("benchmark.reader-%d").build());
-        this.callbackExecutor = Executors.newFixedThreadPool(
-                Runtime.getRuntime().availableProcessors(),
-                new ThreadFactoryBuilder().setNameFormat("benchmark.reader-callback-%d").build());
-        this.finagleNames = finagleNames;
-        this.serverSets = createServerSets(serverSetPaths);
-
-        conf.setDeserializeRecordSetOnReads(false);
-
-        if (truncationIntervalInSeconds > 0 && (!finagleNames.isEmpty() || !serverSetPaths.isEmpty())) {
-            // Construct client for truncation
-            DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder()
-                    .clientId(ClientId$.MODULE$.apply("dlog_loadtest_reader"))
-                    .clientBuilder(ClientBuilder.get()
-                        .hostConnectionLimit(10)
-                        .hostConnectionCoresize(10)
-                        .tcpConnectTimeout(Duration$.MODULE$.fromSeconds(1))
-                        .requestTimeout(Duration$.MODULE$.fromSeconds(2)))
-                    .redirectBackoffStartMs(100)
-                    .redirectBackoffMaxMs(500)
-                    .requestTimeoutMs(2000)
-                    .statsReceiver(statsReceiver)
-                    .thriftmux(true)
-                    .name("reader");
-
-            if (serverSetPaths.isEmpty()) {
-                // Prepare finagle names
-                String local = finagleNames.get(0);
-                String[] remotes = new String[finagleNames.size() - 1];
-                finagleNames.subList(1, finagleNames.size()).toArray(remotes);
-
-                builder = builder.finagleNameStrs(local, remotes);
-                LOG.info("Initialized distributedlog client for truncation @ {}.", finagleNames);
-            } else if (serverSets.length != 0){
-                ServerSet local = this.serverSets[0].getServerSet();
-                ServerSet[] remotes = new ServerSet[this.serverSets.length - 1];
-                for (int i = 1; i < serverSets.length; i++) {
-                    remotes[i - 1] = serverSets[i].getServerSet();
-                }
-
-                builder = builder.serverSets(local, remotes);
-                LOG.info("Initialized distributedlog client for truncation @ {}.", serverSetPaths);
-            } else {
-                builder = builder.uri(uri);
-                LOG.info("Initialized distributedlog client for namespace {}", uri);
-            }
-            dlc = builder.build();
-        } else {
-            dlc = null;
-        }
-
-        // construct the factory
-        this.namespace = DistributedLogNamespaceBuilder.newBuilder()
-                .conf(conf)
-                .uri(uri)
-                .statsLogger(statsLogger.scope("dl"))
-                .build();
-        this.numStreams = endStreamId - startStreamId;
-        this.dlms = new DistributedLogManager[numStreams];
-        this.logReaders = new AsyncLogReader[numStreams];
-        final CountDownLatch latch = new CountDownLatch(numStreams);
-        for (int i = 0; i < numStreams; i++) {
-            final int idx = i;
-            executorService.submit(new Runnable() {
-                @Override
-                public void run() {
-                    reinitStream(idx).map(new Function<Void, Void>() {
-                        @Override
-                        public Void apply(Void value) {
-                            LOG.info("Initialized stream reader {}.", idx);
-                            latch.countDown();
-                            return null;
-                        }
-                    });
-                }
-            });
-        }
-        try {
-            latch.await();
-        } catch (InterruptedException e) {
-            throw new DLInterruptedException("Failed to intialize benchmark readers : ", e);
-        }
-        this.streamReaders = new StreamReader[numStreams];
-        for (int i = 0; i < numStreams; i++) {
-            streamReaders[i] = new StreamReader(i, statsLogger.scope("perstream"));
-            if (truncationIntervalInSeconds > 0) {
-                executorService.scheduleWithFixedDelay(streamReaders[i],
-                        truncationIntervalInSeconds, truncationIntervalInSeconds, TimeUnit.SECONDS);
-            }
-        }
-        LOG.info("Initialized benchmark reader on {} streams {} : [{} - {})",
-                 new Object[] { numStreams, streamPrefix, startStreamId, endStreamId });
-    }
-
-    protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) {
-        DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()];
-        for (int i = 0; i < serverSets.length; i++) {
-            String serverSetPath = serverSetPaths.get(i);
-            serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000);
-        }
-        return serverSets;
-    }
-
-    private Future<Void> reinitStream(int idx) {
-        Promise<Void> promise = new Promise<Void>();
-        reinitStream(idx, promise);
-        return promise;
-    }
-
-    private void reinitStream(int idx, Promise<Void> promise) {
-        int streamId = startStreamId + idx;
-        String streamName = String.format("%s_%d", streamPrefix, streamId);
-
-        if (logReaders[idx] != null) {
-            try {
-                FutureUtils.result(logReaders[idx].asyncClose());
-            } catch (IOException e) {
-                LOG.warn("Failed on closing stream reader {} : ", streamName, e);
-            }
-            logReaders[idx] = null;
-        }
-        if (dlms[idx] != null) {
-            try {
-                dlms[idx].close();
-            } catch (IOException e) {
-                LOG.warn("Failed on closing dlm {} : ", streamName, e);
-            }
-            dlms[idx] = null;
-        }
-
-        try {
-            dlms[idx] = namespace.openLog(streamName);
-        } catch (IOException ioe) {
-            LOG.error("Failed on creating dlm {} : ", streamName, ioe);
-            scheduleReinitStream(idx, promise);
-            return;
-        }
-        DLSN lastDLSN;
-        if (readFromHead) {
-            lastDLSN = DLSN.InitialDLSN;
-        } else {
-            try {
-                lastDLSN = dlms[idx].getLastDLSN();
-            } catch (IOException ioe) {
-                LOG.error("Failed on getting last dlsn from stream {} : ", streamName, ioe);
-                scheduleReinitStream(idx, promise);
-                return;
-            }
-        }
-        try {
-            logReaders[idx] = dlms[idx].getAsyncLogReader(lastDLSN);
-        } catch (IOException ioe) {
-            LOG.error("Failed on opening reader for stream {} starting from {} : ",
-                      new Object[] { streamName, lastDLSN, ioe });
-            scheduleReinitStream(idx, promise);
-            return;
-        }
-        LOG.info("Opened reader for stream {}, starting from {}.", streamName, lastDLSN);
-        promise.setValue(null);
-    }
-
-    Future<Void> scheduleReinitStream(int idx) {
-        Promise<Void> promise = new Promise<Void>();
-        scheduleReinitStream(idx, promise);
-        return promise;
-    }
-
-    void scheduleReinitStream(final int idx, final Promise<Void> promise) {
-        executorService.schedule(new Runnable() {
-            @Override
-            public void run() {
-                reinitStream(idx, promise);
-            }
-        }, BACKOFF_MS, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.running = false;
-        for (AsyncLogReader reader : logReaders) {
-            if (null != reader) {
-                FutureUtils.result(reader.asyncClose());
-            }
-        }
-        for (DistributedLogManager dlm : dlms) {
-            if (null != dlm) {
-                dlm.close();
-            }
-        }
-        namespace.close();
-        SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES);
-        SchedulerUtils.shutdownScheduler(callbackExecutor, 2, TimeUnit.MINUTES);
-        if (this.dlc != null) {
-            this.dlc.close();
-        }
-        for (DLZkServerSet serverSet: serverSets) {
-            serverSet.close();
-        }
-        // Unregister gauges to prevent GC spirals
-        for (StreamReader sr : streamReaders) {
-            sr.unregisterGauge();
-        }
-    }
-
-    @Override
-    public void run() {
-        LOG.info("Starting reader (prefix = {}, numStreams = {}).",
-                 streamPrefix, numStreams);
-        for (StreamReader sr : streamReaders) {
-            sr.readLoop();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Utils.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Utils.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Utils.java
deleted file mode 100644
index f5c32db..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Utils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark;
-
-import com.twitter.distributedlog.benchmark.thrift.Message;
-import java.nio.ByteBuffer;
-import java.util.Random;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TMemoryInputTransport;
-
-/**
- * Utils for generating and parsing messages.
- */
-public class Utils {
-
-    static final Random RAND = new Random(System.currentTimeMillis());
-    static final ThreadLocal<TSerializer> MSG_SERIALIZER =
-            new ThreadLocal<TSerializer>() {
-                @Override
-                public TSerializer initialValue() {
-                    return new TSerializer(new TBinaryProtocol.Factory());
-                }
-            };
-
-    public static byte[] generateMessage(long requestMillis, int payLoadSize) throws TException {
-        byte[] payload = new byte[payLoadSize];
-        RAND.nextBytes(payload);
-        Message msg = new Message(requestMillis, ByteBuffer.wrap(payload));
-        return MSG_SERIALIZER.get().serialize(msg);
-    }
-
-    public static Message parseMessage(byte[] data) throws TException {
-        Message msg = new Message();
-        TMemoryInputTransport transport = new TMemoryInputTransport(data);
-        TBinaryProtocol protocol = new TBinaryProtocol(transport);
-        msg.read(protocol);
-        return msg;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Worker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Worker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Worker.java
deleted file mode 100644
index 6c60034..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Worker.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark;
-
-import java.io.Closeable;
-
-/**
- * Worker to run benchmark.
- */
-public interface Worker extends Closeable, Runnable {
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
deleted file mode 100644
index dc5a6e2..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter;
-import com.twitter.distributedlog.client.DistributedLogMultiStreamWriter;
-import com.twitter.distributedlog.client.serverset.DLZkServerSet;
-import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.io.CompressionCodec;
-import com.twitter.distributedlog.service.DistributedLogClient;
-import com.twitter.distributedlog.service.DistributedLogClientBuilder;
-import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientId;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Duration$;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Benchmark for distributedlog proxy client.
- */
-public class WriterWorker implements Worker {
-
-    static final Logger LOG = LoggerFactory.getLogger(WriterWorker.class);
-
-    final String streamPrefix;
-    final int startStreamId;
-    final int endStreamId;
-    final int writeConcurrency;
-    final int messageSizeBytes;
-    final int hostConnectionCoreSize;
-    final int hostConnectionLimit;
-    final ExecutorService executorService;
-    final ShiftableRateLimiter rateLimiter;
-    final URI dlUri;
-    final DLZkServerSet[] serverSets;
-    final List<String> finagleNames;
-    final Random random;
-    final List<String> streamNames;
-    final int numStreams;
-    final int batchSize;
-    final boolean thriftmux;
-    final boolean handshakeWithClientInfo;
-    final int sendBufferSize;
-    final int recvBufferSize;
-    final boolean enableBatching;
-    final int batchBufferSize;
-    final int batchFlushIntervalMicros;
-    private final String routingServiceFinagleName;
-
-    volatile boolean running = true;
-
-    final StatsReceiver statsReceiver;
-    final StatsLogger statsLogger;
-    final OpStatsLogger requestStat;
-    final StatsLogger exceptionsLogger;
-    final StatsLogger dlErrorCodeLogger;
-
-    // callback thread
-    final ExecutorService executor;
-
-    public WriterWorker(String streamPrefix,
-                        URI uri,
-                        int startStreamId,
-                        int endStreamId,
-                        ShiftableRateLimiter rateLimiter,
-                        int writeConcurrency,
-                        int messageSizeBytes,
-                        int batchSize,
-                        int hostConnectionCoreSize,
-                        int hostConnectionLimit,
-                        List<String> serverSetPaths,
-                        List<String> finagleNames,
-                        StatsReceiver statsReceiver,
-                        StatsLogger statsLogger,
-                        boolean thriftmux,
-                        boolean handshakeWithClientInfo,
-                        int sendBufferSize,
-                        int recvBufferSize,
-                        boolean enableBatching,
-                        int batchBufferSize,
-                        int batchFlushIntervalMicros,
-                        String routingServiceFinagleName) {
-        checkArgument(startStreamId <= endStreamId);
-        checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty());
-        this.streamPrefix = streamPrefix;
-        this.dlUri = uri;
-        this.startStreamId = startStreamId;
-        this.endStreamId = endStreamId;
-        this.rateLimiter = rateLimiter;
-        this.writeConcurrency = writeConcurrency;
-        this.messageSizeBytes = messageSizeBytes;
-        this.statsReceiver = statsReceiver;
-        this.statsLogger = statsLogger;
-        this.requestStat = this.statsLogger.getOpStatsLogger("requests");
-        this.exceptionsLogger = statsLogger.scope("exceptions");
-        this.dlErrorCodeLogger = statsLogger.scope("dl_error_code");
-        this.executorService = Executors.newCachedThreadPool();
-        this.random = new Random(System.currentTimeMillis());
-        this.batchSize = batchSize;
-        this.hostConnectionCoreSize = hostConnectionCoreSize;
-        this.hostConnectionLimit = hostConnectionLimit;
-        this.thriftmux = thriftmux;
-        this.handshakeWithClientInfo = handshakeWithClientInfo;
-        this.sendBufferSize = sendBufferSize;
-        this.recvBufferSize = recvBufferSize;
-        this.enableBatching = enableBatching;
-        this.batchBufferSize = batchBufferSize;
-        this.batchFlushIntervalMicros = batchFlushIntervalMicros;
-        this.finagleNames = finagleNames;
-        this.serverSets = createServerSets(serverSetPaths);
-        this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
-        this.routingServiceFinagleName = routingServiceFinagleName;
-
-        // Streams
-        streamNames = new ArrayList<String>(endStreamId - startStreamId);
-        for (int i = startStreamId; i < endStreamId; i++) {
-            streamNames.add(String.format("%s_%d", streamPrefix, i));
-        }
-        numStreams = streamNames.size();
-        LOG.info("Writing to {} streams : {}", numStreams, streamNames);
-    }
-
-    protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) {
-        DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()];
-        for (int i = 0; i < serverSets.length; i++) {
-            String serverSetPath = serverSetPaths.get(i);
-            serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000);
-        }
-        return serverSets;
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.running = false;
-        SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES);
-        for (DLZkServerSet serverSet: serverSets) {
-            serverSet.close();
-        }
-    }
-
-    private DistributedLogClient buildDlogClient() {
-        ClientBuilder clientBuilder = ClientBuilder.get()
-            .hostConnectionLimit(hostConnectionLimit)
-            .hostConnectionCoresize(hostConnectionCoreSize)
-            .tcpConnectTimeout(Duration$.MODULE$.fromMilliseconds(200))
-            .connectTimeout(Duration$.MODULE$.fromMilliseconds(200))
-            .requestTimeout(Duration$.MODULE$.fromSeconds(10))
-            .sendBufferSize(sendBufferSize)
-            .recvBufferSize(recvBufferSize);
-
-        ClientId clientId = ClientId$.MODULE$.apply("dlog_loadtest_writer");
-
-        DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder()
-            .clientId(clientId)
-            .clientBuilder(clientBuilder)
-            .thriftmux(thriftmux)
-            .redirectBackoffStartMs(100)
-            .redirectBackoffMaxMs(500)
-            .requestTimeoutMs(10000)
-            .statsReceiver(statsReceiver)
-            .streamNameRegex("^" + streamPrefix + "_[0-9]+$")
-            .handshakeWithClientInfo(handshakeWithClientInfo)
-            .periodicHandshakeIntervalMs(TimeUnit.SECONDS.toMillis(30))
-            .periodicOwnershipSyncIntervalMs(TimeUnit.MINUTES.toMillis(5))
-            .periodicDumpOwnershipCache(true)
-            .handshakeTracing(true)
-            .serverRoutingServiceFinagleNameStr(routingServiceFinagleName)
-            .name("writer");
-
-        if (!finagleNames.isEmpty()) {
-            String local = finagleNames.get(0);
-            String[] remotes = new String[finagleNames.size() - 1];
-            finagleNames.subList(1, finagleNames.size()).toArray(remotes);
-
-            builder = builder.finagleNameStrs(local, remotes);
-        } else if (serverSets.length != 0){
-            ServerSet local = serverSets[0].getServerSet();
-            ServerSet[] remotes = new ServerSet[serverSets.length - 1];
-            for (int i = 1; i < serverSets.length; i++) {
-                remotes[i - 1] = serverSets[i].getServerSet();
-            }
-            builder = builder.serverSets(local, remotes);
-        } else {
-            builder = builder.uri(dlUri);
-        }
-
-        return builder.build();
-    }
-
-    ByteBuffer buildBuffer(long requestMillis, int messageSizeBytes) {
-        ByteBuffer data;
-        try {
-            data = ByteBuffer.wrap(Utils.generateMessage(requestMillis, messageSizeBytes));
-            return data;
-        } catch (TException e) {
-            LOG.error("Error generating message : ", e);
-            return null;
-        }
-    }
-
-    List<ByteBuffer> buildBufferList(int batchSize, long requestMillis, int messageSizeBytes) {
-        ArrayList<ByteBuffer> bufferList = new ArrayList<ByteBuffer>(batchSize);
-        for (int i = 0; i < batchSize; i++) {
-            ByteBuffer buf = buildBuffer(requestMillis, messageSizeBytes);
-            if (null == buf) {
-                return null;
-            }
-            bufferList.add(buf);
-        }
-        return bufferList;
-    }
-
-    class TimedRequestHandler implements FutureEventListener<DLSN>, Runnable {
-        final String streamName;
-        final long requestMillis;
-        DLSN dlsn = null;
-        Throwable cause = null;
-
-        TimedRequestHandler(String streamName,
-                            long requestMillis) {
-            this.streamName = streamName;
-            this.requestMillis = requestMillis;
-        }
-        @Override
-        public void onSuccess(DLSN value) {
-            dlsn = value;
-            executor.submit(this);
-        }
-        @Override
-        public void onFailure(Throwable cause) {
-            this.cause = cause;
-            executor.submit(this);
-        }
-
-        @Override
-        public void run() {
-            if (null != dlsn) {
-                requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
-            } else {
-                LOG.error("Failed to publish to {} : ", streamName, cause);
-                requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
-                exceptionsLogger.getCounter(cause.getClass().getName()).inc();
-                if (cause instanceof DLException) {
-                    DLException dle = (DLException) cause;
-                    dlErrorCodeLogger.getCounter(dle.getCode().toString()).inc();
-                }
-            }
-        }
-    }
-
-    class Writer implements Runnable {
-
-        final int idx;
-        final DistributedLogClient dlc;
-        DistributedLogMultiStreamWriter writer = null;
-        final ShiftableRateLimiter limiter;
-
-        Writer(int idx) {
-            this.idx = idx;
-            this.dlc = buildDlogClient();
-            if (enableBatching) {
-                writer = DistributedLogMultiStreamWriter.newBuilder()
-                        .client(this.dlc)
-                        .streams(streamNames)
-                        .compressionCodec(CompressionCodec.Type.NONE)
-                        .flushIntervalMicros(batchFlushIntervalMicros)
-                        .bufferSize(batchBufferSize)
-                        .firstSpeculativeTimeoutMs(9000)
-                        .maxSpeculativeTimeoutMs(9000)
-                        .requestTimeoutMs(10000)
-                        .speculativeBackoffMultiplier(2)
-                        .build();
-            }
-            this.limiter = rateLimiter.duplicate();
-        }
-
-        @Override
-        public void run() {
-            LOG.info("Started writer {}.", idx);
-            while (running) {
-                this.limiter.getLimiter().acquire();
-                final String streamName = streamNames.get(random.nextInt(numStreams));
-                final long requestMillis = System.currentTimeMillis();
-                final ByteBuffer data = buildBuffer(requestMillis, messageSizeBytes);
-                if (null == data) {
-                    break;
-                }
-                if (null != writer) {
-                    writer.write(data).addEventListener(
-                            new TimedRequestHandler(streamName, requestMillis));
-                } else {
-                    dlc.write(streamName, data).addEventListener(
-                            new TimedRequestHandler(streamName, requestMillis));
-                }
-            }
-            if (null != writer) {
-                writer.close();
-            }
-            dlc.close();
-        }
-    }
-
-    class BulkWriter implements Runnable {
-
-        final int idx;
-        final DistributedLogClient dlc;
-
-        BulkWriter(int idx) {
-            this.idx = idx;
-            this.dlc = buildDlogClient();
-        }
-
-        @Override
-        public void run() {
-            LOG.info("Started writer {}.", idx);
-            while (running) {
-                rateLimiter.getLimiter().acquire(batchSize);
-                String streamName = streamNames.get(random.nextInt(numStreams));
-                final long requestMillis = System.currentTimeMillis();
-                final List<ByteBuffer> data = buildBufferList(batchSize, requestMillis, messageSizeBytes);
-                if (null == data) {
-                    break;
-                }
-                List<Future<DLSN>> results = dlc.writeBulk(streamName, data);
-                for (Future<DLSN> result : results) {
-                    result.addEventListener(new TimedRequestHandler(streamName, requestMillis));
-                }
-            }
-            dlc.close();
-        }
-    }
-
-    @Override
-    public void run() {
-        LOG.info("Starting writer (concurrency = {}, prefix = {}, batchSize = {})",
-                 new Object[] { writeConcurrency, streamPrefix, batchSize });
-        try {
-            for (int i = 0; i < writeConcurrency; i++) {
-                Runnable writer = null;
-                if (batchSize > 0) {
-                    writer = new BulkWriter(i);
-                } else {
-                    writer = new Writer(i);
-                }
-                executorService.submit(writer);
-            }
-        } catch (Throwable t) {
-            LOG.error("Unhandled exception caught", t);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/package-info.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/package-info.java
deleted file mode 100644
index 052a661..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Benchmarks for distributedlog.
- */
-package com.twitter.distributedlog.benchmark;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AbstractReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AbstractReaderBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AbstractReaderBenchmark.java
deleted file mode 100644
index 4d436ee..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AbstractReaderBenchmark.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark.stream;
-
-import com.twitter.distributedlog.DistributedLogConstants;
-import org.apache.commons.cli.CommandLine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-abstract class AbstractReaderBenchmark extends StreamBenchmark {
-
-    private static final Logger logger = LoggerFactory.getLogger(SyncReaderBenchmark.class);
-
-    protected ReadMode readMode = ReadMode.LATEST;
-    protected long fromTxId = DistributedLogConstants.INVALID_TXID;
-    protected long rewindMs = 0L;
-    protected int batchSize = 1;
-
-    protected AbstractReaderBenchmark() {
-        options.addOption("t", "tx-id", true,
-            "Transaction ID to start read from when reading in mode 'position'");
-        options.addOption("r", "rewind", true,
-            "Time to rewind back to read from when reading in mode 'rewind' (in milliseconds)");
-        options.addOption("m", "mode", true,
-            "Read Mode : [oldest, latest, rewind, position]");
-        options.addOption("b", "batch-size", true, "Read batch size");
-    }
-
-    @Override
-    protected void parseCommandLine(CommandLine cmdline) {
-        if (cmdline.hasOption("m")) {
-            String mode = cmdline.getOptionValue("m");
-            try {
-                readMode = ReadMode.valueOf(mode.toUpperCase());
-            } catch (IllegalArgumentException iae) {
-                logger.error("Invalid read mode {}.", mode);
-                printUsage();
-                System.exit(0);
-            }
-        } else {
-            printUsage();
-            System.exit(0);
-        }
-        if (cmdline.hasOption("t")) {
-            fromTxId = Long.parseLong(cmdline.getOptionValue("t"));
-        }
-        if (cmdline.hasOption("r")) {
-            rewindMs = Long.parseLong(cmdline.getOptionValue("r"));
-        }
-        if (cmdline.hasOption("b")) {
-            batchSize = Integer.parseInt(cmdline.getOptionValue("b"));
-        }
-        logger.info("Start reading from transaction id {}, rewind {} ms.", fromTxId, rewindMs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
deleted file mode 100644
index 86acdb6..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark.stream;
-
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.AsyncLogReader;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.LogRecordWithDLSN;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.util.FutureUtils;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Benchmark on {@link com.twitter.distributedlog.AsyncLogReader} reading from a stream.
- */
-public class AsyncReaderBenchmark extends AbstractReaderBenchmark {
-
-    private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class);
-
-    @Override
-    protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) {
-        DistributedLogManager dlm = null;
-        while (null == dlm) {
-            try {
-                dlm = namespace.openLog(streamName);
-            } catch (IOException ioe) {
-                logger.warn("Failed to create dlm for stream {} : ", streamName, ioe);
-            }
-            if (null == dlm) {
-                try {
-                    TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
-                } catch (InterruptedException e) {
-                    logger.warn("Interrupted from sleep while creating dlm for stream {} : ",
-                        streamName, e);
-                }
-            }
-        }
-        logger.info("Created dlm for stream {}.", streamName);
-
-        // Stats
-        OpStatsLogger openReaderStats = statsLogger.getOpStatsLogger("open_reader");
-        OpStatsLogger blockingReadStats = statsLogger.getOpStatsLogger("blocking_read");
-        Counter readCounter = statsLogger.getCounter("reads");
-
-        AsyncLogReader reader = null;
-        DLSN lastDLSN = null;
-        Long lastTxId = null;
-        while (null == reader) {
-            // initialize the last txid
-            if (null == lastTxId) {
-                switch (readMode) {
-                    case OLDEST:
-                        lastTxId = 0L;
-                        lastDLSN = DLSN.InitialDLSN;
-                        break;
-                    case LATEST:
-                        lastTxId = Long.MAX_VALUE;
-                        try {
-                            lastDLSN = dlm.getLastDLSN();
-                        } catch (IOException ioe) {
-                            continue;
-                        }
-                        break;
-                    case REWIND:
-                        lastTxId = System.currentTimeMillis() - rewindMs;
-                        lastDLSN = null;
-                        break;
-                    case POSITION:
-                        lastTxId = fromTxId;
-                        lastDLSN = null;
-                        break;
-                    default:
-                        logger.warn("Unsupported mode {}", readMode);
-                        printUsage();
-                        System.exit(0);
-                        break;
-                }
-                logger.info("Reading from transaction id = {}, dlsn = {}", lastTxId, lastDLSN);
-            }
-            // Open the reader
-            Stopwatch stopwatch = Stopwatch.createStarted();
-            try {
-                if (null == lastDLSN) {
-                    reader = FutureUtils.result(dlm.openAsyncLogReader(lastTxId));
-                } else {
-                    reader = FutureUtils.result(dlm.openAsyncLogReader(lastDLSN));
-                }
-                long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
-                openReaderStats.registerSuccessfulEvent(elapsedMs);
-                logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}",
-                        lastTxId, lastDLSN);
-            } catch (IOException ioe) {
-                openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.",
-                        new Object[] { streamName, lastTxId, lastDLSN });
-            }
-            if (null == reader) {
-                try {
-                    TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
-                } catch (InterruptedException e) {
-                    logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ",
-                        streamName, e);
-                }
-                continue;
-            }
-            List<LogRecordWithDLSN> records;
-            stopwatch = Stopwatch.createUnstarted();
-            while (true) {
-                try {
-                    stopwatch.start();
-                    records = FutureUtils.result(reader.readBulk(batchSize));
-                    long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
-                    blockingReadStats.registerSuccessfulEvent(elapsedMicros);
-                    if (!records.isEmpty()) {
-                        readCounter.add(records.size());
-                        LogRecordWithDLSN lastRecord = records.get(records.size() - 1);
-                        lastTxId = lastRecord.getTransactionId();
-                        lastDLSN = lastRecord.getDlsn();
-                    }
-                    stopwatch.reset();
-                } catch (IOException e) {
-                    logger.warn("Encountered reading record from stream {} : ", streamName, e);
-                    reader = null;
-                    break;
-                }
-            }
-            try {
-                TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
-            } catch (InterruptedException e) {
-                logger.warn("Interrupted from sleep while creating reader for stream {} : ",
-                    streamName, e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerBatchReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerBatchReader.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerBatchReader.java
deleted file mode 100644
index 6a11469..0000000
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerBatchReader.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.benchmark.stream;
-
-import java.util.Enumeration;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Read ledgers in batches.
- */
-public class LedgerBatchReader implements Runnable {
-
-    private static final Logger logger = LoggerFactory.getLogger(LedgerBatchReader.class);
-
-    private final LedgerHandle lh;
-    private final ReadEntryListener readEntryListener;
-    private final int batchSize;
-
-    public LedgerBatchReader(LedgerHandle lh,
-                             ReadEntryListener readEntryListener,
-                             int batchSize) {
-        this.lh = lh;
-        this.batchSize = batchSize;
-        this.readEntryListener = readEntryListener;
-    }
-
-    @Override
-    public void run() {
-        long lac = lh.getLastAddConfirmed();
-
-        long entryId = 0L;
-
-        while (entryId <= lac) {
-            long startEntryId = entryId;
-            long endEntryId = Math.min(startEntryId + batchSize - 1, lac);
-
-            Enumeration<LedgerEntry> entries = null;
-            while (null == entries) {
-                try {
-                    entries = lh.readEntries(startEntryId, endEntryId);
-                } catch (BKException bke) {
-                    logger.error("Encountered exceptions on reading [ {} - {} ] ",
-                            new Object[] { startEntryId, endEntryId, bke });
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    break;
-                }
-            }
-            if (null == entries) {
-                break;
-            }
-
-            while (entries.hasMoreElements()) {
-                LedgerEntry entry = entries.nextElement();
-                readEntryListener.onEntryComplete(BKException.Code.OK, lh, entry, null);
-            }
-
-            entryId = endEntryId + 1;
-        }
-
-    }
-}