You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:53 UTC

[48/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Worker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Worker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Worker.java
new file mode 100644
index 0000000..a948092
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Worker.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark;
+
+import java.io.Closeable;
+
+/**
+ * Worker to run benchmark.
+ */
+public interface Worker extends Closeable, Runnable {
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
new file mode 100644
index 0000000..9e96765
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.benchmark.utils.ShiftableRateLimiter;
+import org.apache.distributedlog.client.DistributedLogMultiStreamWriter;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.service.DistributedLogClientBuilder;
+import org.apache.distributedlog.util.SchedulerUtils;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.finagle.thrift.ClientId;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration$;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Benchmark for distributedlog proxy client.
+ */
+public class WriterWorker implements Worker {
+
+    static final Logger LOG = LoggerFactory.getLogger(WriterWorker.class);
+
+    final String streamPrefix;
+    final int startStreamId;
+    final int endStreamId;
+    final int writeConcurrency;
+    final int messageSizeBytes;
+    final int hostConnectionCoreSize;
+    final int hostConnectionLimit;
+    final ExecutorService executorService;
+    final ShiftableRateLimiter rateLimiter;
+    final URI dlUri;
+    final DLZkServerSet[] serverSets;
+    final List<String> finagleNames;
+    final Random random;
+    final List<String> streamNames;
+    final int numStreams;
+    final int batchSize;
+    final boolean thriftmux;
+    final boolean handshakeWithClientInfo;
+    final int sendBufferSize;
+    final int recvBufferSize;
+    final boolean enableBatching;
+    final int batchBufferSize;
+    final int batchFlushIntervalMicros;
+    private final String routingServiceFinagleName;
+
+    volatile boolean running = true;
+
+    final StatsReceiver statsReceiver;
+    final StatsLogger statsLogger;
+    final OpStatsLogger requestStat;
+    final StatsLogger exceptionsLogger;
+    final StatsLogger dlErrorCodeLogger;
+
+    // callback thread
+    final ExecutorService executor;
+
+    public WriterWorker(String streamPrefix,
+                        URI uri,
+                        int startStreamId,
+                        int endStreamId,
+                        ShiftableRateLimiter rateLimiter,
+                        int writeConcurrency,
+                        int messageSizeBytes,
+                        int batchSize,
+                        int hostConnectionCoreSize,
+                        int hostConnectionLimit,
+                        List<String> serverSetPaths,
+                        List<String> finagleNames,
+                        StatsReceiver statsReceiver,
+                        StatsLogger statsLogger,
+                        boolean thriftmux,
+                        boolean handshakeWithClientInfo,
+                        int sendBufferSize,
+                        int recvBufferSize,
+                        boolean enableBatching,
+                        int batchBufferSize,
+                        int batchFlushIntervalMicros,
+                        String routingServiceFinagleName) {
+        checkArgument(startStreamId <= endStreamId);
+        checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty());
+        this.streamPrefix = streamPrefix;
+        this.dlUri = uri;
+        this.startStreamId = startStreamId;
+        this.endStreamId = endStreamId;
+        this.rateLimiter = rateLimiter;
+        this.writeConcurrency = writeConcurrency;
+        this.messageSizeBytes = messageSizeBytes;
+        this.statsReceiver = statsReceiver;
+        this.statsLogger = statsLogger;
+        this.requestStat = this.statsLogger.getOpStatsLogger("requests");
+        this.exceptionsLogger = statsLogger.scope("exceptions");
+        this.dlErrorCodeLogger = statsLogger.scope("dl_error_code");
+        this.executorService = Executors.newCachedThreadPool();
+        this.random = new Random(System.currentTimeMillis());
+        this.batchSize = batchSize;
+        this.hostConnectionCoreSize = hostConnectionCoreSize;
+        this.hostConnectionLimit = hostConnectionLimit;
+        this.thriftmux = thriftmux;
+        this.handshakeWithClientInfo = handshakeWithClientInfo;
+        this.sendBufferSize = sendBufferSize;
+        this.recvBufferSize = recvBufferSize;
+        this.enableBatching = enableBatching;
+        this.batchBufferSize = batchBufferSize;
+        this.batchFlushIntervalMicros = batchFlushIntervalMicros;
+        this.finagleNames = finagleNames;
+        this.serverSets = createServerSets(serverSetPaths);
+        this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+        this.routingServiceFinagleName = routingServiceFinagleName;
+
+        // Streams
+        streamNames = new ArrayList<String>(endStreamId - startStreamId);
+        for (int i = startStreamId; i < endStreamId; i++) {
+            streamNames.add(String.format("%s_%d", streamPrefix, i));
+        }
+        numStreams = streamNames.size();
+        LOG.info("Writing to {} streams : {}", numStreams, streamNames);
+    }
+
+    protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) {
+        DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()];
+        for (int i = 0; i < serverSets.length; i++) {
+            String serverSetPath = serverSetPaths.get(i);
+            serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000);
+        }
+        return serverSets;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.running = false;
+        SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES);
+        for (DLZkServerSet serverSet: serverSets) {
+            serverSet.close();
+        }
+    }
+
+    private DistributedLogClient buildDlogClient() {
+        ClientBuilder clientBuilder = ClientBuilder.get()
+            .hostConnectionLimit(hostConnectionLimit)
+            .hostConnectionCoresize(hostConnectionCoreSize)
+            .tcpConnectTimeout(Duration$.MODULE$.fromMilliseconds(200))
+            .connectTimeout(Duration$.MODULE$.fromMilliseconds(200))
+            .requestTimeout(Duration$.MODULE$.fromSeconds(10))
+            .sendBufferSize(sendBufferSize)
+            .recvBufferSize(recvBufferSize);
+
+        ClientId clientId = ClientId$.MODULE$.apply("dlog_loadtest_writer");
+
+        DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder()
+            .clientId(clientId)
+            .clientBuilder(clientBuilder)
+            .thriftmux(thriftmux)
+            .redirectBackoffStartMs(100)
+            .redirectBackoffMaxMs(500)
+            .requestTimeoutMs(10000)
+            .statsReceiver(statsReceiver)
+            .streamNameRegex("^" + streamPrefix + "_[0-9]+$")
+            .handshakeWithClientInfo(handshakeWithClientInfo)
+            .periodicHandshakeIntervalMs(TimeUnit.SECONDS.toMillis(30))
+            .periodicOwnershipSyncIntervalMs(TimeUnit.MINUTES.toMillis(5))
+            .periodicDumpOwnershipCache(true)
+            .handshakeTracing(true)
+            .serverRoutingServiceFinagleNameStr(routingServiceFinagleName)
+            .name("writer");
+
+        if (!finagleNames.isEmpty()) {
+            String local = finagleNames.get(0);
+            String[] remotes = new String[finagleNames.size() - 1];
+            finagleNames.subList(1, finagleNames.size()).toArray(remotes);
+
+            builder = builder.finagleNameStrs(local, remotes);
+        } else if (serverSets.length != 0){
+            ServerSet local = serverSets[0].getServerSet();
+            ServerSet[] remotes = new ServerSet[serverSets.length - 1];
+            for (int i = 1; i < serverSets.length; i++) {
+                remotes[i - 1] = serverSets[i].getServerSet();
+            }
+            builder = builder.serverSets(local, remotes);
+        } else {
+            builder = builder.uri(dlUri);
+        }
+
+        return builder.build();
+    }
+
+    ByteBuffer buildBuffer(long requestMillis, int messageSizeBytes) {
+        ByteBuffer data;
+        try {
+            data = ByteBuffer.wrap(Utils.generateMessage(requestMillis, messageSizeBytes));
+            return data;
+        } catch (TException e) {
+            LOG.error("Error generating message : ", e);
+            return null;
+        }
+    }
+
+    List<ByteBuffer> buildBufferList(int batchSize, long requestMillis, int messageSizeBytes) {
+        ArrayList<ByteBuffer> bufferList = new ArrayList<ByteBuffer>(batchSize);
+        for (int i = 0; i < batchSize; i++) {
+            ByteBuffer buf = buildBuffer(requestMillis, messageSizeBytes);
+            if (null == buf) {
+                return null;
+            }
+            bufferList.add(buf);
+        }
+        return bufferList;
+    }
+
+    class TimedRequestHandler implements FutureEventListener<DLSN>, Runnable {
+        final String streamName;
+        final long requestMillis;
+        DLSN dlsn = null;
+        Throwable cause = null;
+
+        TimedRequestHandler(String streamName,
+                            long requestMillis) {
+            this.streamName = streamName;
+            this.requestMillis = requestMillis;
+        }
+        @Override
+        public void onSuccess(DLSN value) {
+            dlsn = value;
+            executor.submit(this);
+        }
+        @Override
+        public void onFailure(Throwable cause) {
+            this.cause = cause;
+            executor.submit(this);
+        }
+
+        @Override
+        public void run() {
+            if (null != dlsn) {
+                requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
+            } else {
+                LOG.error("Failed to publish to {} : ", streamName, cause);
+                requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
+                exceptionsLogger.getCounter(cause.getClass().getName()).inc();
+                if (cause instanceof DLException) {
+                    DLException dle = (DLException) cause;
+                    dlErrorCodeLogger.getCounter(dle.getCode().toString()).inc();
+                }
+            }
+        }
+    }
+
+    class Writer implements Runnable {
+
+        final int idx;
+        final DistributedLogClient dlc;
+        DistributedLogMultiStreamWriter writer = null;
+        final ShiftableRateLimiter limiter;
+
+        Writer(int idx) {
+            this.idx = idx;
+            this.dlc = buildDlogClient();
+            if (enableBatching) {
+                writer = DistributedLogMultiStreamWriter.newBuilder()
+                        .client(this.dlc)
+                        .streams(streamNames)
+                        .compressionCodec(CompressionCodec.Type.NONE)
+                        .flushIntervalMicros(batchFlushIntervalMicros)
+                        .bufferSize(batchBufferSize)
+                        .firstSpeculativeTimeoutMs(9000)
+                        .maxSpeculativeTimeoutMs(9000)
+                        .requestTimeoutMs(10000)
+                        .speculativeBackoffMultiplier(2)
+                        .build();
+            }
+            this.limiter = rateLimiter.duplicate();
+        }
+
+        @Override
+        public void run() {
+            LOG.info("Started writer {}.", idx);
+            while (running) {
+                this.limiter.getLimiter().acquire();
+                final String streamName = streamNames.get(random.nextInt(numStreams));
+                final long requestMillis = System.currentTimeMillis();
+                final ByteBuffer data = buildBuffer(requestMillis, messageSizeBytes);
+                if (null == data) {
+                    break;
+                }
+                if (null != writer) {
+                    writer.write(data).addEventListener(
+                            new TimedRequestHandler(streamName, requestMillis));
+                } else {
+                    dlc.write(streamName, data).addEventListener(
+                            new TimedRequestHandler(streamName, requestMillis));
+                }
+            }
+            if (null != writer) {
+                writer.close();
+            }
+            dlc.close();
+        }
+    }
+
+    class BulkWriter implements Runnable {
+
+        final int idx;
+        final DistributedLogClient dlc;
+
+        BulkWriter(int idx) {
+            this.idx = idx;
+            this.dlc = buildDlogClient();
+        }
+
+        @Override
+        public void run() {
+            LOG.info("Started writer {}.", idx);
+            while (running) {
+                rateLimiter.getLimiter().acquire(batchSize);
+                String streamName = streamNames.get(random.nextInt(numStreams));
+                final long requestMillis = System.currentTimeMillis();
+                final List<ByteBuffer> data = buildBufferList(batchSize, requestMillis, messageSizeBytes);
+                if (null == data) {
+                    break;
+                }
+                List<Future<DLSN>> results = dlc.writeBulk(streamName, data);
+                for (Future<DLSN> result : results) {
+                    result.addEventListener(new TimedRequestHandler(streamName, requestMillis));
+                }
+            }
+            dlc.close();
+        }
+    }
+
+    @Override
+    public void run() {
+        LOG.info("Starting writer (concurrency = {}, prefix = {}, batchSize = {})",
+                 new Object[] { writeConcurrency, streamPrefix, batchSize });
+        try {
+            for (int i = 0; i < writeConcurrency; i++) {
+                Runnable writer = null;
+                if (batchSize > 0) {
+                    writer = new BulkWriter(i);
+                } else {
+                    writer = new Writer(i);
+                }
+                executorService.submit(writer);
+            }
+        } catch (Throwable t) {
+            LOG.error("Unhandled exception caught", t);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/package-info.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/package-info.java
new file mode 100644
index 0000000..7e87644
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Benchmarks for distributedlog.
+ */
+package org.apache.distributedlog.benchmark;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AbstractReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AbstractReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AbstractReaderBenchmark.java
new file mode 100644
index 0000000..a1f1f9f
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AbstractReaderBenchmark.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.commons.cli.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractReaderBenchmark extends StreamBenchmark {
+
+    private static final Logger logger = LoggerFactory.getLogger(SyncReaderBenchmark.class);
+
+    protected ReadMode readMode = ReadMode.LATEST;
+    protected long fromTxId = DistributedLogConstants.INVALID_TXID;
+    protected long rewindMs = 0L;
+    protected int batchSize = 1;
+
+    protected AbstractReaderBenchmark() {
+        options.addOption("t", "tx-id", true,
+            "Transaction ID to start read from when reading in mode 'position'");
+        options.addOption("r", "rewind", true,
+            "Time to rewind back to read from when reading in mode 'rewind' (in milliseconds)");
+        options.addOption("m", "mode", true,
+            "Read Mode : [oldest, latest, rewind, position]");
+        options.addOption("b", "batch-size", true, "Read batch size");
+    }
+
+    @Override
+    protected void parseCommandLine(CommandLine cmdline) {
+        if (cmdline.hasOption("m")) {
+            String mode = cmdline.getOptionValue("m");
+            try {
+                readMode = ReadMode.valueOf(mode.toUpperCase());
+            } catch (IllegalArgumentException iae) {
+                logger.error("Invalid read mode {}.", mode);
+                printUsage();
+                System.exit(0);
+            }
+        } else {
+            printUsage();
+            System.exit(0);
+        }
+        if (cmdline.hasOption("t")) {
+            fromTxId = Long.parseLong(cmdline.getOptionValue("t"));
+        }
+        if (cmdline.hasOption("r")) {
+            rewindMs = Long.parseLong(cmdline.getOptionValue("r"));
+        }
+        if (cmdline.hasOption("b")) {
+            batchSize = Integer.parseInt(cmdline.getOptionValue("b"));
+        }
+        logger.info("Start reading from transaction id {}, rewind {} ms.", fromTxId, rewindMs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
new file mode 100644
index 0000000..4930b8a
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.AsyncLogReader;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.util.FutureUtils;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Benchmark on {@link org.apache.distributedlog.AsyncLogReader} reading from a stream.
+ */
+public class AsyncReaderBenchmark extends AbstractReaderBenchmark {
+
+    private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class);
+
+    @Override
+    protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) {
+        DistributedLogManager dlm = null;
+        while (null == dlm) {
+            try {
+                dlm = namespace.openLog(streamName);
+            } catch (IOException ioe) {
+                logger.warn("Failed to create dlm for stream {} : ", streamName, ioe);
+            }
+            if (null == dlm) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+                } catch (InterruptedException e) {
+                    logger.warn("Interrupted from sleep while creating dlm for stream {} : ",
+                        streamName, e);
+                }
+            }
+        }
+        logger.info("Created dlm for stream {}.", streamName);
+
+        // Stats
+        OpStatsLogger openReaderStats = statsLogger.getOpStatsLogger("open_reader");
+        OpStatsLogger blockingReadStats = statsLogger.getOpStatsLogger("blocking_read");
+        Counter readCounter = statsLogger.getCounter("reads");
+
+        AsyncLogReader reader = null;
+        DLSN lastDLSN = null;
+        Long lastTxId = null;
+        while (null == reader) {
+            // initialize the last txid
+            if (null == lastTxId) {
+                switch (readMode) {
+                    case OLDEST:
+                        lastTxId = 0L;
+                        lastDLSN = DLSN.InitialDLSN;
+                        break;
+                    case LATEST:
+                        lastTxId = Long.MAX_VALUE;
+                        try {
+                            lastDLSN = dlm.getLastDLSN();
+                        } catch (IOException ioe) {
+                            continue;
+                        }
+                        break;
+                    case REWIND:
+                        lastTxId = System.currentTimeMillis() - rewindMs;
+                        lastDLSN = null;
+                        break;
+                    case POSITION:
+                        lastTxId = fromTxId;
+                        lastDLSN = null;
+                        break;
+                    default:
+                        logger.warn("Unsupported mode {}", readMode);
+                        printUsage();
+                        System.exit(0);
+                        break;
+                }
+                logger.info("Reading from transaction id = {}, dlsn = {}", lastTxId, lastDLSN);
+            }
+            // Open the reader
+            Stopwatch stopwatch = Stopwatch.createStarted();
+            try {
+                if (null == lastDLSN) {
+                    reader = FutureUtils.result(dlm.openAsyncLogReader(lastTxId));
+                } else {
+                    reader = FutureUtils.result(dlm.openAsyncLogReader(lastDLSN));
+                }
+                long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
+                openReaderStats.registerSuccessfulEvent(elapsedMs);
+                logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}",
+                        lastTxId, lastDLSN);
+            } catch (IOException ioe) {
+                openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.",
+                        new Object[] { streamName, lastTxId, lastDLSN });
+            }
+            if (null == reader) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+                } catch (InterruptedException e) {
+                    logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ",
+                        streamName, e);
+                }
+                continue;
+            }
+            List<LogRecordWithDLSN> records;
+            stopwatch = Stopwatch.createUnstarted();
+            while (true) {
+                try {
+                    stopwatch.start();
+                    records = FutureUtils.result(reader.readBulk(batchSize));
+                    long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
+                    blockingReadStats.registerSuccessfulEvent(elapsedMicros);
+                    if (!records.isEmpty()) {
+                        readCounter.add(records.size());
+                        LogRecordWithDLSN lastRecord = records.get(records.size() - 1);
+                        lastTxId = lastRecord.getTransactionId();
+                        lastDLSN = lastRecord.getDlsn();
+                    }
+                    stopwatch.reset();
+                } catch (IOException e) {
+                    logger.warn("Encountered reading record from stream {} : ", streamName, e);
+                    reader = null;
+                    break;
+                }
+            }
+            try {
+                TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+            } catch (InterruptedException e) {
+                logger.warn("Interrupted from sleep while creating reader for stream {} : ",
+                    streamName, e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerBatchReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerBatchReader.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerBatchReader.java
new file mode 100644
index 0000000..b115192
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerBatchReader.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import java.util.Enumeration;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Read ledgers in batches.
+ */
+public class LedgerBatchReader implements Runnable {
+
+    private static final Logger logger = LoggerFactory.getLogger(LedgerBatchReader.class);
+
+    private final LedgerHandle lh;
+    private final ReadEntryListener readEntryListener;
+    private final int batchSize;
+
+    public LedgerBatchReader(LedgerHandle lh,
+                             ReadEntryListener readEntryListener,
+                             int batchSize) {
+        this.lh = lh;
+        this.batchSize = batchSize;
+        this.readEntryListener = readEntryListener;
+    }
+
+    @Override
+    public void run() {
+        long lac = lh.getLastAddConfirmed();
+
+        long entryId = 0L;
+
+        while (entryId <= lac) {
+            long startEntryId = entryId;
+            long endEntryId = Math.min(startEntryId + batchSize - 1, lac);
+
+            Enumeration<LedgerEntry> entries = null;
+            while (null == entries) {
+                try {
+                    entries = lh.readEntries(startEntryId, endEntryId);
+                } catch (BKException bke) {
+                    logger.error("Encountered exceptions on reading [ {} - {} ] ",
+                            new Object[] { startEntryId, endEntryId, bke });
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+            }
+            if (null == entries) {
+                break;
+            }
+
+            while (entries.hasMoreElements()) {
+                LedgerEntry entry = entries.nextElement();
+                readEntryListener.onEntryComplete(BKException.Code.OK, lh, entry, null);
+            }
+
+            entryId = endEntryId + 1;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java
new file mode 100644
index 0000000..489e5af
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.BookKeeperClientBuilder;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.ZooKeeperClientBuilder;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Benchmark ledger reading.
+ */
+public class LedgerReadBenchmark extends AbstractReaderBenchmark {
+
+    private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class);
+
+    @Override
+    protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) {
+        DistributedLogManager dlm = null;
+        while (null == dlm) {
+            try {
+                dlm = namespace.openLog(streamName);
+            } catch (IOException ioe) {
+                logger.warn("Failed to create dlm for stream {} : ", streamName, ioe);
+            }
+            if (null == dlm) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+                } catch (InterruptedException e) {
+                    logger.warn("Interrupted from sleep while creating dlm for stream {} : ",
+                        streamName, e);
+                }
+            }
+        }
+        logger.info("Created dlm for stream {}.", streamName);
+
+        List<LogSegmentMetadata> segments = null;
+        while (null == segments) {
+            try {
+                segments = dlm.getLogSegments();
+            } catch (IOException ioe) {
+                logger.warn("Failed to get log segments for stream {} : ", streamName, ioe);
+            }
+            if (null == segments) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+                } catch (InterruptedException e) {
+                    logger.warn("Interrupted from sleep while geting log segments for stream {} : ",
+                        streamName, e);
+                }
+            }
+        }
+
+        final Counter readCounter = statsLogger.getCounter("reads");
+
+        logger.info("Reading from log segments : {}", segments);
+
+        ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
+                .uri(uri)
+                .name("benchmark-zkc")
+                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
+                .zkAclId(null)
+                .build();
+        BKDLConfig bkdlConfig;
+        try {
+            bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
+        } catch (IOException e) {
+            return;
+        }
+
+        BookKeeper bk;
+        try {
+            bk = BookKeeperClientBuilder.newBuilder()
+                    .name("benchmark-bkc")
+                    .dlConfig(conf)
+                    .zkServers(bkdlConfig.getBkZkServersForReader())
+                    .ledgersPath(bkdlConfig.getBkLedgersPath())
+                    .build()
+                    .get();
+        } catch (IOException e) {
+            return;
+        }
+
+        final int readConcurrency = conf.getInt("ledger_read_concurrency", 1000);
+        boolean streamRead = conf.getBoolean("ledger_stream_read", true);
+        try {
+            for (LogSegmentMetadata segment : segments) {
+                Stopwatch stopwatch = Stopwatch.createStarted();
+                long lid = segment.getLogSegmentId();
+                LedgerHandle lh = bk.openLedgerNoRecovery(
+                        lid, BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
+                logger.info("It took {} ms to open log segment {}",
+                    new Object[] { stopwatch.elapsed(TimeUnit.MILLISECONDS), (lh.getLastAddConfirmed() + 1), segment });
+                stopwatch.reset().start();
+                Runnable reader;
+                if (streamRead) {
+                    reader = new LedgerStreamReader(lh, new BookkeeperInternalCallbacks.ReadEntryListener() {
+                        @Override
+                        public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) {
+                            readCounter.inc();
+                        }
+                    }, readConcurrency);
+                } else {
+                    reader = new LedgerStreamReader(lh, new BookkeeperInternalCallbacks.ReadEntryListener() {
+                        @Override
+                        public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) {
+                            readCounter.inc();
+                        }
+                    }, readConcurrency);
+                }
+                reader.run();
+                logger.info("It took {} ms to complete reading {} entries from log segment {}",
+                    new Object[] { stopwatch.elapsed(TimeUnit.MILLISECONDS), (lh.getLastAddConfirmed() + 1), segment });
+            }
+        } catch (Exception e) {
+            logger.error("Error on reading bk ", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerStreamReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerStreamReader.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerStreamReader.java
new file mode 100644
index 0000000..11c3482
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerStreamReader.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import java.util.Enumeration;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reading ledger in a streaming way.
+ */
+public class LedgerStreamReader implements Runnable {
+
+    private static final Logger logger = LoggerFactory.getLogger(LedgerStreamReader.class);
+
+    class PendingReadRequest implements AsyncCallback.ReadCallback {
+
+        final long entryId;
+        boolean isDone = false;
+        int rc;
+        LedgerEntry entry = null;
+
+        PendingReadRequest(long entryId) {
+            this.entryId = entryId;
+        }
+
+        void read() {
+            lh.asyncReadEntries(entryId, entryId, this, null);
+        }
+
+        void complete(ReadEntryListener listener) {
+            listener.onEntryComplete(rc, lh, entry, null);
+        }
+
+        @Override
+        public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> enumeration, Object ctx) {
+            this.rc = rc;
+            if (BKException.Code.OK == rc && enumeration.hasMoreElements()) {
+                entry = enumeration.nextElement();
+            } else {
+                entry = null;
+            }
+            isDone = true;
+            // construct a new read request
+            long nextEntry = nextReadEntry.getAndIncrement();
+            if (nextEntry <= lac) {
+                PendingReadRequest nextRead =
+                        new PendingReadRequest(nextEntry);
+                pendingReads.add(nextRead);
+                nextRead.read();
+            }
+            triggerCallbacks();
+        }
+    }
+
+    private final LedgerHandle lh;
+    private final long lac;
+    private final ReadEntryListener readEntryListener;
+    private final int concurrency;
+    private final AtomicLong nextReadEntry = new AtomicLong(0);
+    private final CountDownLatch done = new CountDownLatch(1);
+    private final ConcurrentLinkedQueue<PendingReadRequest> pendingReads =
+            new ConcurrentLinkedQueue<PendingReadRequest>();
+
+    public LedgerStreamReader(LedgerHandle lh,
+                              ReadEntryListener readEntryListener,
+                              int concurrency) {
+        this.lh = lh;
+        this.lac = lh.getLastAddConfirmed();
+        this.readEntryListener = readEntryListener;
+        this.concurrency = concurrency;
+        for (int i = 0; i < concurrency; i++) {
+            long entryId = nextReadEntry.getAndIncrement();
+            if (entryId > lac) {
+                break;
+            }
+            PendingReadRequest request = new PendingReadRequest(entryId);
+            pendingReads.add(request);
+            request.read();
+        }
+        if (pendingReads.isEmpty()) {
+            done.countDown();
+        }
+    }
+
+    synchronized void triggerCallbacks() {
+        PendingReadRequest request;
+        while ((request = pendingReads.peek()) != null) {
+            if (!request.isDone) {
+                break;
+            }
+            pendingReads.remove();
+            request.complete(readEntryListener);
+        }
+        if (pendingReads.isEmpty()) {
+            done.countDown();
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            done.await();
+        } catch (InterruptedException e) {
+            logger.info("Interrupted on stream reading ledger {} : ", lh.getId(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/ReadMode.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/ReadMode.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/ReadMode.java
new file mode 100644
index 0000000..ea5ed36
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/ReadMode.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+/**
+ * The read mode for streaming read benchmark.
+ */
+public enum ReadMode {
+    OLDEST,
+    LATEST,
+    REWIND,
+    POSITION
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java
new file mode 100644
index 0000000..d3083ca
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import java.io.File;
+import java.net.URI;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Benchmark Streams.
+ */
+public abstract class StreamBenchmark {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamBenchmark.class);
+
+    private static final String USAGE = "StreamBenchmark <benchmark-class> [options]";
+
+    protected final Options options = new Options();
+    protected URI uri;
+    protected DistributedLogConfiguration conf;
+    protected StatsProvider statsProvider;
+    protected String streamName;
+
+    protected StreamBenchmark() {
+        options.addOption("c", "conf", true, "Configuration File");
+        options.addOption("u", "uri", true, "DistributedLog URI");
+        options.addOption("p", "stats-provider", true, "Stats Provider");
+        options.addOption("s", "stream", true, "Stream Name");
+        options.addOption("h", "help", false, "Print usage.");
+    }
+
+    protected Options getOptions() {
+        return options;
+    }
+
+    protected void printUsage() {
+        HelpFormatter hf = new HelpFormatter();
+        hf.printHelp(USAGE, options);
+    }
+
+    protected void parseCommandLine(String[] args)
+            throws Exception {
+        BasicParser parser = new BasicParser();
+        CommandLine cmdline = parser.parse(options, args);
+        if (cmdline.hasOption("h")) {
+            printUsage();
+            System.exit(0);
+        }
+        if (cmdline.hasOption("u")) {
+            this.uri = URI.create(cmdline.getOptionValue("u"));
+        } else {
+            printUsage();
+            System.exit(0);
+        }
+        this.conf = new DistributedLogConfiguration();
+        if (cmdline.hasOption("c")) {
+            String configFile = cmdline.getOptionValue("c");
+            this.conf.loadConf(new File(configFile).toURI().toURL());
+        }
+        if (cmdline.hasOption("p")) {
+            statsProvider = ReflectionUtils.newInstance(cmdline.getOptionValue("p"), StatsProvider.class);
+        } else {
+            statsProvider = new NullStatsProvider();
+        }
+        if (cmdline.hasOption("s")) {
+            this.streamName = cmdline.getOptionValue("s");
+        } else {
+            printUsage();
+            System.exit(0);
+        }
+        parseCommandLine(cmdline);
+    }
+
+    protected abstract void parseCommandLine(CommandLine cmdline);
+
+    protected void run(String[] args) throws Exception {
+        logger.info("Parsing arguments for benchmark : {}", args);
+        // parse command line
+        parseCommandLine(args);
+        statsProvider.start(conf);
+        // run the benchmark
+        StatsLogger statsLogger = statsProvider.getStatsLogger("dl");
+        DistributedLogNamespace namespace =
+                DistributedLogNamespaceBuilder.newBuilder()
+                        .conf(conf)
+                        .uri(uri)
+                        .statsLogger(statsLogger)
+                        .build();
+        try {
+            benchmark(namespace, streamName, statsProvider.getStatsLogger("benchmark"));
+        } finally {
+            namespace.close();
+            statsProvider.stop();
+        }
+    }
+
+    protected abstract void benchmark(DistributedLogNamespace namespace,
+                                      String logName,
+                                      StatsLogger statsLogger);
+
+    public static void main(String[] args) throws Exception {
+        if (args.length <= 0) {
+            System.err.println(USAGE);
+            return;
+        }
+        String benchmarkClassName = args[0];
+        StreamBenchmark benchmark = ReflectionUtils.newInstance(
+                benchmarkClassName, StreamBenchmark.class);
+        benchmark.run(args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
new file mode 100644
index 0000000..4abb317
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogReader;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Benchmark on {@link org.apache.distributedlog.LogReader} reading from a stream.
+ */
+public class SyncReaderBenchmark extends AbstractReaderBenchmark {
+
+    private static final Logger logger = LoggerFactory.getLogger(SyncReaderBenchmark.class);
+
+    public SyncReaderBenchmark() {}
+
+    @Override
+    protected void benchmark(DistributedLogNamespace namespace, String streamName, StatsLogger statsLogger) {
+        DistributedLogManager dlm = null;
+        while (null == dlm) {
+            try {
+                dlm = namespace.openLog(streamName);
+            } catch (IOException ioe) {
+                logger.warn("Failed to create dlm for stream {} : ", streamName, ioe);
+            }
+            if (null == dlm) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+                } catch (InterruptedException e) {
+                    logger.warn("Interrupted from sleep while creating dlm for stream {} : ",
+                        streamName, e);
+                }
+            }
+        }
+        OpStatsLogger openReaderStats = statsLogger.getOpStatsLogger("open_reader");
+        OpStatsLogger nonBlockingReadStats = statsLogger.getOpStatsLogger("non_blocking_read");
+        OpStatsLogger blockingReadStats = statsLogger.getOpStatsLogger("blocking_read");
+        Counter nullReadCounter = statsLogger.getCounter("null_read");
+
+        logger.info("Created dlm for stream {}.", streamName);
+        LogReader reader = null;
+        Long lastTxId = null;
+        while (null == reader) {
+            // initialize the last txid
+            if (null == lastTxId) {
+                switch (readMode) {
+                    case OLDEST:
+                        lastTxId = 0L;
+                        break;
+                    case LATEST:
+                        try {
+                            lastTxId = dlm.getLastTxId();
+                        } catch (IOException ioe) {
+                            continue;
+                        }
+                        break;
+                    case REWIND:
+                        lastTxId = System.currentTimeMillis() - rewindMs;
+                        break;
+                    case POSITION:
+                        lastTxId = fromTxId;
+                        break;
+                    default:
+                        logger.warn("Unsupported mode {}", readMode);
+                        printUsage();
+                        System.exit(0);
+                        break;
+                }
+                logger.info("Reading from transaction id {}", lastTxId);
+            }
+            // Open the reader
+            Stopwatch stopwatch = Stopwatch.createStarted();
+            try {
+                reader = dlm.getInputStream(lastTxId);
+                long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
+                openReaderStats.registerSuccessfulEvent(elapsedMs);
+                logger.info("It took {} ms to position the reader to transaction id {}", lastTxId);
+            } catch (IOException ioe) {
+                openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                logger.warn("Failed to create reader for stream {} reading from {}.", streamName, lastTxId);
+            }
+            if (null == reader) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+                } catch (InterruptedException e) {
+                    logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ",
+                        streamName, e);
+                }
+                continue;
+            }
+
+            // read loop
+
+            LogRecord record;
+            boolean nonBlocking = false;
+            stopwatch = Stopwatch.createUnstarted();
+            long numCatchupReads = 0L;
+            long numCatchupBytes = 0L;
+            Stopwatch catchupStopwatch = Stopwatch.createStarted();
+            while (true) {
+                try {
+                    stopwatch.start();
+                    record = reader.readNext(nonBlocking);
+                    if (null != record) {
+                        long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
+                        if (nonBlocking) {
+                            nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros);
+                        } else {
+                            numCatchupBytes += record.getPayload().length;
+                            ++numCatchupReads;
+                            blockingReadStats.registerSuccessfulEvent(elapsedMicros);
+                        }
+                        lastTxId = record.getTransactionId();
+                    } else {
+                        nullReadCounter.inc();
+                    }
+                    if (null == record && !nonBlocking) {
+                        nonBlocking = true;
+                        catchupStopwatch.stop();
+                        logger.info("Catchup {} records (total {} bytes) in {} milliseconds",
+                                new Object[] { numCatchupReads, numCatchupBytes,
+                                    stopwatch.elapsed(TimeUnit.MILLISECONDS) });
+                    }
+                    stopwatch.reset();
+                } catch (IOException e) {
+                    logger.warn("Encountered reading record from stream {} : ", streamName, e);
+                    reader = null;
+                    break;
+                }
+            }
+            try {
+                TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+            } catch (InterruptedException e) {
+                logger.warn("Interrupted from sleep while creating reader for stream {} : ",
+                    streamName, e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/package-info.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/package-info.java
new file mode 100644
index 0000000..b95a40f
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stream level benchmarks.
+ */
+package org.apache.distributedlog.benchmark.stream;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/ShiftableRateLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/ShiftableRateLimiter.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/ShiftableRateLimiter.java
new file mode 100644
index 0000000..03c561c
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/ShiftableRateLimiter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.utils;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A wrapper over rate limiter.
+ */
+public class ShiftableRateLimiter implements Runnable {
+
+    private final RateLimiter rateLimiter;
+    private final ScheduledExecutorService executor;
+    private final double initialRate, maxRate, changeRate;
+    private final long changeInterval;
+    private final TimeUnit changeIntervalUnit;
+    private double nextRate;
+
+    public ShiftableRateLimiter(double initialRate,
+                                double maxRate,
+                                double changeRate,
+                                long changeInterval,
+                                TimeUnit changeIntervalUnit) {
+        this.initialRate = initialRate;
+        this.maxRate = maxRate;
+        this.changeRate = changeRate;
+        this.nextRate = initialRate;
+        this.changeInterval = changeInterval;
+        this.changeIntervalUnit = changeIntervalUnit;
+        this.rateLimiter = RateLimiter.create(initialRate);
+        this.executor = Executors.newSingleThreadScheduledExecutor();
+        this.executor.scheduleAtFixedRate(this, changeInterval, changeInterval, changeIntervalUnit);
+    }
+
+    public ShiftableRateLimiter duplicate() {
+        return new ShiftableRateLimiter(
+                initialRate,
+                maxRate,
+                changeRate,
+                changeInterval,
+                changeIntervalUnit);
+    }
+
+    @Override
+    public void run() {
+        this.nextRate = Math.min(nextRate + changeRate, maxRate);
+        this.rateLimiter.setRate(nextRate);
+    }
+
+    public RateLimiter getLimiter() {
+        return this.rateLimiter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/package-info.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/package-info.java
new file mode 100644
index 0000000..c650bab
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utils for benchmarking.
+ */
+package org.apache.distributedlog.benchmark.utils;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/resources/findbugsExclude.xml b/distributedlog-benchmark/src/main/resources/findbugsExclude.xml
index b7a1ecb..0ab2b6b 100644
--- a/distributedlog-benchmark/src/main/resources/findbugsExclude.xml
+++ b/distributedlog-benchmark/src/main/resources/findbugsExclude.xml
@@ -18,6 +18,6 @@
 <FindBugsFilter>
   <Match>
     <!-- generated code, we can't be held responsible for findbugs in it //-->
-    <Class name="~com\.twitter\.distributedlog\.benchmark\.thrift.*" />
+    <Class name="~org\.apache\.distributedlog\.benchmark\.thrift.*" />
   </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/thrift/loadtest.thrift
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/thrift/loadtest.thrift b/distributedlog-benchmark/src/main/thrift/loadtest.thrift
index 6d98cec..48c5d5a 100644
--- a/distributedlog-benchmark/src/main/thrift/loadtest.thrift
+++ b/distributedlog-benchmark/src/main/thrift/loadtest.thrift
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-namespace java com.twitter.distributedlog.benchmark.thrift
+namespace java org.apache.distributedlog.benchmark.thrift
 
 struct Message {
     1: i64 publishTime;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-client/pom.xml b/distributedlog-client/pom.xml
index aad5093..f09caf1 100644
--- a/distributedlog-client/pom.xml
+++ b/distributedlog-client/pom.xml
@@ -137,7 +137,7 @@
           <properties>
             <property>
               <name>listener</name>
-              <value>com.twitter.distributedlog.TimedOutTestsListener</value>
+              <value>org.apache.distributedlog.TimedOutTestsListener</value>
             </property>
           </properties>
         </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java
deleted file mode 100644
index de74f5a..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Client Config.
- */
-public class ClientConfig {
-    int redirectBackoffStartMs = 25;
-    int redirectBackoffMaxMs = 100;
-    int maxRedirects = -1;
-    int requestTimeoutMs = -1;
-    boolean thriftmux = false;
-    boolean streamFailfast = false;
-    String streamNameRegex = ".*";
-    boolean handshakeWithClientInfo = true;
-    long periodicHandshakeIntervalMs = TimeUnit.MINUTES.toMillis(5);
-    long periodicOwnershipSyncIntervalMs = TimeUnit.MINUTES.toMillis(5);
-    boolean periodicDumpOwnershipCacheEnabled = false;
-    long periodicDumpOwnershipCacheIntervalMs = TimeUnit.MINUTES.toMillis(10);
-    boolean enableHandshakeTracing = false;
-    boolean enableChecksum = true;
-
-    public ClientConfig setMaxRedirects(int maxRedirects) {
-        this.maxRedirects = maxRedirects;
-        return this;
-    }
-
-    public int getMaxRedirects() {
-        return this.maxRedirects;
-    }
-
-    public ClientConfig setRequestTimeoutMs(int timeoutInMillis) {
-        this.requestTimeoutMs = timeoutInMillis;
-        return this;
-    }
-
-    public int getRequestTimeoutMs() {
-        return this.requestTimeoutMs;
-    }
-
-    public ClientConfig setRedirectBackoffStartMs(int ms) {
-        this.redirectBackoffStartMs = ms;
-        return this;
-    }
-
-    public int getRedirectBackoffStartMs() {
-        return this.redirectBackoffStartMs;
-    }
-
-    public ClientConfig setRedirectBackoffMaxMs(int ms) {
-        this.redirectBackoffMaxMs = ms;
-        return this;
-    }
-
-    public int getRedirectBackoffMaxMs() {
-        return this.redirectBackoffMaxMs;
-    }
-
-    public ClientConfig setThriftMux(boolean enabled) {
-        this.thriftmux = enabled;
-        return this;
-    }
-
-    public boolean getThriftMux() {
-        return this.thriftmux;
-    }
-
-    public ClientConfig setStreamFailfast(boolean enabled) {
-        this.streamFailfast = enabled;
-        return this;
-    }
-
-    public boolean getStreamFailfast() {
-        return this.streamFailfast;
-    }
-
-    public ClientConfig setStreamNameRegex(String nameRegex) {
-        checkNotNull(nameRegex);
-        this.streamNameRegex = nameRegex;
-        return this;
-    }
-
-    public String getStreamNameRegex() {
-        return this.streamNameRegex;
-    }
-
-    public ClientConfig setHandshakeWithClientInfo(boolean enabled) {
-        this.handshakeWithClientInfo = enabled;
-        return this;
-    }
-
-    public boolean getHandshakeWithClientInfo() {
-        return this.handshakeWithClientInfo;
-    }
-
-    public ClientConfig setPeriodicHandshakeIntervalMs(long intervalMs) {
-        this.periodicHandshakeIntervalMs = intervalMs;
-        return this;
-    }
-
-    public long getPeriodicHandshakeIntervalMs() {
-        return this.periodicHandshakeIntervalMs;
-    }
-
-    public ClientConfig setPeriodicOwnershipSyncIntervalMs(long intervalMs) {
-        this.periodicOwnershipSyncIntervalMs = intervalMs;
-        return this;
-    }
-
-    public long getPeriodicOwnershipSyncIntervalMs() {
-        return this.periodicOwnershipSyncIntervalMs;
-    }
-
-    public ClientConfig setPeriodicDumpOwnershipCacheEnabled(boolean enabled) {
-        this.periodicDumpOwnershipCacheEnabled = enabled;
-        return this;
-    }
-
-    public boolean isPeriodicDumpOwnershipCacheEnabled() {
-        return this.periodicDumpOwnershipCacheEnabled;
-    }
-
-    public ClientConfig setPeriodicDumpOwnershipCacheIntervalMs(long intervalMs) {
-        this.periodicDumpOwnershipCacheIntervalMs = intervalMs;
-        return this;
-    }
-
-    public long getPeriodicDumpOwnershipCacheIntervalMs() {
-        return this.periodicDumpOwnershipCacheIntervalMs;
-    }
-
-    public ClientConfig setHandshakeTracingEnabled(boolean enabled) {
-        this.enableHandshakeTracing = enabled;
-        return this;
-    }
-
-    public boolean isHandshakeTracingEnabled() {
-        return this.enableHandshakeTracing;
-    }
-
-    public ClientConfig setChecksumEnabled(boolean enabled) {
-        this.enableChecksum = enabled;
-        return this;
-    }
-
-    public boolean isChecksumEnabled() {
-        return this.enableChecksum;
-    }
-
-    public static ClientConfig newConfig(ClientConfig config) {
-        ClientConfig newConfig = new ClientConfig();
-        newConfig.setMaxRedirects(config.getMaxRedirects())
-                 .setRequestTimeoutMs(config.getRequestTimeoutMs())
-                 .setRedirectBackoffStartMs(config.getRedirectBackoffStartMs())
-                 .setRedirectBackoffMaxMs(config.getRedirectBackoffMaxMs())
-                 .setThriftMux(config.getThriftMux())
-                 .setStreamFailfast(config.getStreamFailfast())
-                 .setStreamNameRegex(config.getStreamNameRegex())
-                 .setHandshakeWithClientInfo(config.getHandshakeWithClientInfo())
-                 .setPeriodicHandshakeIntervalMs(config.getPeriodicHandshakeIntervalMs())
-                 .setPeriodicDumpOwnershipCacheEnabled(config.isPeriodicDumpOwnershipCacheEnabled())
-                 .setPeriodicDumpOwnershipCacheIntervalMs(config.getPeriodicDumpOwnershipCacheIntervalMs())
-                 .setHandshakeTracingEnabled(config.isHandshakeTracingEnabled())
-                 .setChecksumEnabled(config.isChecksumEnabled());
-        return newConfig;
-    }
-}