You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:53 UTC
[48/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Worker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Worker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Worker.java
new file mode 100644
index 0000000..a948092
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Worker.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark;
+
+import java.io.Closeable;
+
+/**
+ * Worker to run benchmark.
+ */
+public interface Worker extends Closeable, Runnable {
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
new file mode 100644
index 0000000..9e96765
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.benchmark.utils.ShiftableRateLimiter;
+import org.apache.distributedlog.client.DistributedLogMultiStreamWriter;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.service.DistributedLogClientBuilder;
+import org.apache.distributedlog.util.SchedulerUtils;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.finagle.thrift.ClientId;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration$;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Benchmark for distributedlog proxy client.
+ */
+public class WriterWorker implements Worker {
+
+ static final Logger LOG = LoggerFactory.getLogger(WriterWorker.class);
+
+ final String streamPrefix;
+ final int startStreamId;
+ final int endStreamId;
+ final int writeConcurrency;
+ final int messageSizeBytes;
+ final int hostConnectionCoreSize;
+ final int hostConnectionLimit;
+ final ExecutorService executorService;
+ final ShiftableRateLimiter rateLimiter;
+ final URI dlUri;
+ final DLZkServerSet[] serverSets;
+ final List<String> finagleNames;
+ final Random random;
+ final List<String> streamNames;
+ final int numStreams;
+ final int batchSize;
+ final boolean thriftmux;
+ final boolean handshakeWithClientInfo;
+ final int sendBufferSize;
+ final int recvBufferSize;
+ final boolean enableBatching;
+ final int batchBufferSize;
+ final int batchFlushIntervalMicros;
+ private final String routingServiceFinagleName;
+
+ volatile boolean running = true;
+
+ final StatsReceiver statsReceiver;
+ final StatsLogger statsLogger;
+ final OpStatsLogger requestStat;
+ final StatsLogger exceptionsLogger;
+ final StatsLogger dlErrorCodeLogger;
+
+ // callback thread
+ final ExecutorService executor;
+
+ public WriterWorker(String streamPrefix,
+ URI uri,
+ int startStreamId,
+ int endStreamId,
+ ShiftableRateLimiter rateLimiter,
+ int writeConcurrency,
+ int messageSizeBytes,
+ int batchSize,
+ int hostConnectionCoreSize,
+ int hostConnectionLimit,
+ List<String> serverSetPaths,
+ List<String> finagleNames,
+ StatsReceiver statsReceiver,
+ StatsLogger statsLogger,
+ boolean thriftmux,
+ boolean handshakeWithClientInfo,
+ int sendBufferSize,
+ int recvBufferSize,
+ boolean enableBatching,
+ int batchBufferSize,
+ int batchFlushIntervalMicros,
+ String routingServiceFinagleName) {
+ checkArgument(startStreamId <= endStreamId);
+ checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty());
+ this.streamPrefix = streamPrefix;
+ this.dlUri = uri;
+ this.startStreamId = startStreamId;
+ this.endStreamId = endStreamId;
+ this.rateLimiter = rateLimiter;
+ this.writeConcurrency = writeConcurrency;
+ this.messageSizeBytes = messageSizeBytes;
+ this.statsReceiver = statsReceiver;
+ this.statsLogger = statsLogger;
+ this.requestStat = this.statsLogger.getOpStatsLogger("requests");
+ this.exceptionsLogger = statsLogger.scope("exceptions");
+ this.dlErrorCodeLogger = statsLogger.scope("dl_error_code");
+ this.executorService = Executors.newCachedThreadPool();
+ this.random = new Random(System.currentTimeMillis());
+ this.batchSize = batchSize;
+ this.hostConnectionCoreSize = hostConnectionCoreSize;
+ this.hostConnectionLimit = hostConnectionLimit;
+ this.thriftmux = thriftmux;
+ this.handshakeWithClientInfo = handshakeWithClientInfo;
+ this.sendBufferSize = sendBufferSize;
+ this.recvBufferSize = recvBufferSize;
+ this.enableBatching = enableBatching;
+ this.batchBufferSize = batchBufferSize;
+ this.batchFlushIntervalMicros = batchFlushIntervalMicros;
+ this.finagleNames = finagleNames;
+ this.serverSets = createServerSets(serverSetPaths);
+ this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ this.routingServiceFinagleName = routingServiceFinagleName;
+
+ // Streams
+ streamNames = new ArrayList<String>(endStreamId - startStreamId);
+ for (int i = startStreamId; i < endStreamId; i++) {
+ streamNames.add(String.format("%s_%d", streamPrefix, i));
+ }
+ numStreams = streamNames.size();
+ LOG.info("Writing to {} streams : {}", numStreams, streamNames);
+ }
+
+ protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) {
+ DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()];
+ for (int i = 0; i < serverSets.length; i++) {
+ String serverSetPath = serverSetPaths.get(i);
+ serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000);
+ }
+ return serverSets;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.running = false;
+ SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES);
+ for (DLZkServerSet serverSet: serverSets) {
+ serverSet.close();
+ }
+ }
+
+ private DistributedLogClient buildDlogClient() {
+ ClientBuilder clientBuilder = ClientBuilder.get()
+ .hostConnectionLimit(hostConnectionLimit)
+ .hostConnectionCoresize(hostConnectionCoreSize)
+ .tcpConnectTimeout(Duration$.MODULE$.fromMilliseconds(200))
+ .connectTimeout(Duration$.MODULE$.fromMilliseconds(200))
+ .requestTimeout(Duration$.MODULE$.fromSeconds(10))
+ .sendBufferSize(sendBufferSize)
+ .recvBufferSize(recvBufferSize);
+
+ ClientId clientId = ClientId$.MODULE$.apply("dlog_loadtest_writer");
+
+ DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder()
+ .clientId(clientId)
+ .clientBuilder(clientBuilder)
+ .thriftmux(thriftmux)
+ .redirectBackoffStartMs(100)
+ .redirectBackoffMaxMs(500)
+ .requestTimeoutMs(10000)
+ .statsReceiver(statsReceiver)
+ .streamNameRegex("^" + streamPrefix + "_[0-9]+$")
+ .handshakeWithClientInfo(handshakeWithClientInfo)
+ .periodicHandshakeIntervalMs(TimeUnit.SECONDS.toMillis(30))
+ .periodicOwnershipSyncIntervalMs(TimeUnit.MINUTES.toMillis(5))
+ .periodicDumpOwnershipCache(true)
+ .handshakeTracing(true)
+ .serverRoutingServiceFinagleNameStr(routingServiceFinagleName)
+ .name("writer");
+
+ if (!finagleNames.isEmpty()) {
+ String local = finagleNames.get(0);
+ String[] remotes = new String[finagleNames.size() - 1];
+ finagleNames.subList(1, finagleNames.size()).toArray(remotes);
+
+ builder = builder.finagleNameStrs(local, remotes);
+ } else if (serverSets.length != 0){
+ ServerSet local = serverSets[0].getServerSet();
+ ServerSet[] remotes = new ServerSet[serverSets.length - 1];
+ for (int i = 1; i < serverSets.length; i++) {
+ remotes[i - 1] = serverSets[i].getServerSet();
+ }
+ builder = builder.serverSets(local, remotes);
+ } else {
+ builder = builder.uri(dlUri);
+ }
+
+ return builder.build();
+ }
+
+ ByteBuffer buildBuffer(long requestMillis, int messageSizeBytes) {
+ ByteBuffer data;
+ try {
+ data = ByteBuffer.wrap(Utils.generateMessage(requestMillis, messageSizeBytes));
+ return data;
+ } catch (TException e) {
+ LOG.error("Error generating message : ", e);
+ return null;
+ }
+ }
+
+ List<ByteBuffer> buildBufferList(int batchSize, long requestMillis, int messageSizeBytes) {
+ ArrayList<ByteBuffer> bufferList = new ArrayList<ByteBuffer>(batchSize);
+ for (int i = 0; i < batchSize; i++) {
+ ByteBuffer buf = buildBuffer(requestMillis, messageSizeBytes);
+ if (null == buf) {
+ return null;
+ }
+ bufferList.add(buf);
+ }
+ return bufferList;
+ }
+
+ class TimedRequestHandler implements FutureEventListener<DLSN>, Runnable {
+ final String streamName;
+ final long requestMillis;
+ DLSN dlsn = null;
+ Throwable cause = null;
+
+ TimedRequestHandler(String streamName,
+ long requestMillis) {
+ this.streamName = streamName;
+ this.requestMillis = requestMillis;
+ }
+ @Override
+ public void onSuccess(DLSN value) {
+ dlsn = value;
+ executor.submit(this);
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ this.cause = cause;
+ executor.submit(this);
+ }
+
+ @Override
+ public void run() {
+ if (null != dlsn) {
+ requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
+ } else {
+ LOG.error("Failed to publish to {} : ", streamName, cause);
+ requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
+ exceptionsLogger.getCounter(cause.getClass().getName()).inc();
+ if (cause instanceof DLException) {
+ DLException dle = (DLException) cause;
+ dlErrorCodeLogger.getCounter(dle.getCode().toString()).inc();
+ }
+ }
+ }
+ }
+
+ class Writer implements Runnable {
+
+ final int idx;
+ final DistributedLogClient dlc;
+ DistributedLogMultiStreamWriter writer = null;
+ final ShiftableRateLimiter limiter;
+
+ Writer(int idx) {
+ this.idx = idx;
+ this.dlc = buildDlogClient();
+ if (enableBatching) {
+ writer = DistributedLogMultiStreamWriter.newBuilder()
+ .client(this.dlc)
+ .streams(streamNames)
+ .compressionCodec(CompressionCodec.Type.NONE)
+ .flushIntervalMicros(batchFlushIntervalMicros)
+ .bufferSize(batchBufferSize)
+ .firstSpeculativeTimeoutMs(9000)
+ .maxSpeculativeTimeoutMs(9000)
+ .requestTimeoutMs(10000)
+ .speculativeBackoffMultiplier(2)
+ .build();
+ }
+ this.limiter = rateLimiter.duplicate();
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Started writer {}.", idx);
+ while (running) {
+ this.limiter.getLimiter().acquire();
+ final String streamName = streamNames.get(random.nextInt(numStreams));
+ final long requestMillis = System.currentTimeMillis();
+ final ByteBuffer data = buildBuffer(requestMillis, messageSizeBytes);
+ if (null == data) {
+ break;
+ }
+ if (null != writer) {
+ writer.write(data).addEventListener(
+ new TimedRequestHandler(streamName, requestMillis));
+ } else {
+ dlc.write(streamName, data).addEventListener(
+ new TimedRequestHandler(streamName, requestMillis));
+ }
+ }
+ if (null != writer) {
+ writer.close();
+ }
+ dlc.close();
+ }
+ }
+
+ class BulkWriter implements Runnable {
+
+ final int idx;
+ final DistributedLogClient dlc;
+
+ BulkWriter(int idx) {
+ this.idx = idx;
+ this.dlc = buildDlogClient();
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Started writer {}.", idx);
+ while (running) {
+ rateLimiter.getLimiter().acquire(batchSize);
+ String streamName = streamNames.get(random.nextInt(numStreams));
+ final long requestMillis = System.currentTimeMillis();
+ final List<ByteBuffer> data = buildBufferList(batchSize, requestMillis, messageSizeBytes);
+ if (null == data) {
+ break;
+ }
+ List<Future<DLSN>> results = dlc.writeBulk(streamName, data);
+ for (Future<DLSN> result : results) {
+ result.addEventListener(new TimedRequestHandler(streamName, requestMillis));
+ }
+ }
+ dlc.close();
+ }
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Starting writer (concurrency = {}, prefix = {}, batchSize = {})",
+ new Object[] { writeConcurrency, streamPrefix, batchSize });
+ try {
+ for (int i = 0; i < writeConcurrency; i++) {
+ Runnable writer = null;
+ if (batchSize > 0) {
+ writer = new BulkWriter(i);
+ } else {
+ writer = new Writer(i);
+ }
+ executorService.submit(writer);
+ }
+ } catch (Throwable t) {
+ LOG.error("Unhandled exception caught", t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/package-info.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/package-info.java
new file mode 100644
index 0000000..7e87644
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Benchmarks for distributedlog.
+ */
+package org.apache.distributedlog.benchmark;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AbstractReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AbstractReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AbstractReaderBenchmark.java
new file mode 100644
index 0000000..a1f1f9f
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AbstractReaderBenchmark.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.commons.cli.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractReaderBenchmark extends StreamBenchmark {
+
+ private static final Logger logger = LoggerFactory.getLogger(SyncReaderBenchmark.class);
+
+ protected ReadMode readMode = ReadMode.LATEST;
+ protected long fromTxId = DistributedLogConstants.INVALID_TXID;
+ protected long rewindMs = 0L;
+ protected int batchSize = 1;
+
+ protected AbstractReaderBenchmark() {
+ options.addOption("t", "tx-id", true,
+ "Transaction ID to start read from when reading in mode 'position'");
+ options.addOption("r", "rewind", true,
+ "Time to rewind back to read from when reading in mode 'rewind' (in milliseconds)");
+ options.addOption("m", "mode", true,
+ "Read Mode : [oldest, latest, rewind, position]");
+ options.addOption("b", "batch-size", true, "Read batch size");
+ }
+
+ @Override
+ protected void parseCommandLine(CommandLine cmdline) {
+ if (cmdline.hasOption("m")) {
+ String mode = cmdline.getOptionValue("m");
+ try {
+ readMode = ReadMode.valueOf(mode.toUpperCase());
+ } catch (IllegalArgumentException iae) {
+ logger.error("Invalid read mode {}.", mode);
+ printUsage();
+ System.exit(0);
+ }
+ } else {
+ printUsage();
+ System.exit(0);
+ }
+ if (cmdline.hasOption("t")) {
+ fromTxId = Long.parseLong(cmdline.getOptionValue("t"));
+ }
+ if (cmdline.hasOption("r")) {
+ rewindMs = Long.parseLong(cmdline.getOptionValue("r"));
+ }
+ if (cmdline.hasOption("b")) {
+ batchSize = Integer.parseInt(cmdline.getOptionValue("b"));
+ }
+ logger.info("Start reading from transaction id {}, rewind {} ms.", fromTxId, rewindMs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
new file mode 100644
index 0000000..4930b8a
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.AsyncLogReader;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.util.FutureUtils;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Benchmark on {@link org.apache.distributedlog.AsyncLogReader} reading from a stream.
+ */
+public class AsyncReaderBenchmark extends AbstractReaderBenchmark {
+
+ private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class);
+
+ @Override
+ protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) {
+ DistributedLogManager dlm = null;
+ while (null == dlm) {
+ try {
+ dlm = namespace.openLog(streamName);
+ } catch (IOException ioe) {
+ logger.warn("Failed to create dlm for stream {} : ", streamName, ioe);
+ }
+ if (null == dlm) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted from sleep while creating dlm for stream {} : ",
+ streamName, e);
+ }
+ }
+ }
+ logger.info("Created dlm for stream {}.", streamName);
+
+ // Stats
+ OpStatsLogger openReaderStats = statsLogger.getOpStatsLogger("open_reader");
+ OpStatsLogger blockingReadStats = statsLogger.getOpStatsLogger("blocking_read");
+ Counter readCounter = statsLogger.getCounter("reads");
+
+ AsyncLogReader reader = null;
+ DLSN lastDLSN = null;
+ Long lastTxId = null;
+ while (null == reader) {
+ // initialize the last txid
+ if (null == lastTxId) {
+ switch (readMode) {
+ case OLDEST:
+ lastTxId = 0L;
+ lastDLSN = DLSN.InitialDLSN;
+ break;
+ case LATEST:
+ lastTxId = Long.MAX_VALUE;
+ try {
+ lastDLSN = dlm.getLastDLSN();
+ } catch (IOException ioe) {
+ continue;
+ }
+ break;
+ case REWIND:
+ lastTxId = System.currentTimeMillis() - rewindMs;
+ lastDLSN = null;
+ break;
+ case POSITION:
+ lastTxId = fromTxId;
+ lastDLSN = null;
+ break;
+ default:
+ logger.warn("Unsupported mode {}", readMode);
+ printUsage();
+ System.exit(0);
+ break;
+ }
+ logger.info("Reading from transaction id = {}, dlsn = {}", lastTxId, lastDLSN);
+ }
+ // Open the reader
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ if (null == lastDLSN) {
+ reader = FutureUtils.result(dlm.openAsyncLogReader(lastTxId));
+ } else {
+ reader = FutureUtils.result(dlm.openAsyncLogReader(lastDLSN));
+ }
+ long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
+ openReaderStats.registerSuccessfulEvent(elapsedMs);
+ logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}",
+ lastTxId, lastDLSN);
+ } catch (IOException ioe) {
+ openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.",
+ new Object[] { streamName, lastTxId, lastDLSN });
+ }
+ if (null == reader) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ",
+ streamName, e);
+ }
+ continue;
+ }
+ List<LogRecordWithDLSN> records;
+ stopwatch = Stopwatch.createUnstarted();
+ while (true) {
+ try {
+ stopwatch.start();
+ records = FutureUtils.result(reader.readBulk(batchSize));
+ long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
+ blockingReadStats.registerSuccessfulEvent(elapsedMicros);
+ if (!records.isEmpty()) {
+ readCounter.add(records.size());
+ LogRecordWithDLSN lastRecord = records.get(records.size() - 1);
+ lastTxId = lastRecord.getTransactionId();
+ lastDLSN = lastRecord.getDlsn();
+ }
+ stopwatch.reset();
+ } catch (IOException e) {
+ logger.warn("Encountered reading record from stream {} : ", streamName, e);
+ reader = null;
+ break;
+ }
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted from sleep while creating reader for stream {} : ",
+ streamName, e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerBatchReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerBatchReader.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerBatchReader.java
new file mode 100644
index 0000000..b115192
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerBatchReader.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import java.util.Enumeration;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Read ledgers in batches.
+ */
+public class LedgerBatchReader implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(LedgerBatchReader.class);
+
+ private final LedgerHandle lh;
+ private final ReadEntryListener readEntryListener;
+ private final int batchSize;
+
+ public LedgerBatchReader(LedgerHandle lh,
+ ReadEntryListener readEntryListener,
+ int batchSize) {
+ this.lh = lh;
+ this.batchSize = batchSize;
+ this.readEntryListener = readEntryListener;
+ }
+
+ @Override
+ public void run() {
+ long lac = lh.getLastAddConfirmed();
+
+ long entryId = 0L;
+
+ while (entryId <= lac) {
+ long startEntryId = entryId;
+ long endEntryId = Math.min(startEntryId + batchSize - 1, lac);
+
+ Enumeration<LedgerEntry> entries = null;
+ while (null == entries) {
+ try {
+ entries = lh.readEntries(startEntryId, endEntryId);
+ } catch (BKException bke) {
+ logger.error("Encountered exceptions on reading [ {} - {} ] ",
+ new Object[] { startEntryId, endEntryId, bke });
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ if (null == entries) {
+ break;
+ }
+
+ while (entries.hasMoreElements()) {
+ LedgerEntry entry = entries.nextElement();
+ readEntryListener.onEntryComplete(BKException.Code.OK, lh, entry, null);
+ }
+
+ entryId = endEntryId + 1;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java
new file mode 100644
index 0000000..489e5af
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.BookKeeperClientBuilder;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.ZooKeeperClientBuilder;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Benchmark ledger reading.
+ */
+public class LedgerReadBenchmark extends AbstractReaderBenchmark {
+
+ private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class);
+
+ @Override
+ protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) {
+ DistributedLogManager dlm = null;
+ while (null == dlm) {
+ try {
+ dlm = namespace.openLog(streamName);
+ } catch (IOException ioe) {
+ logger.warn("Failed to create dlm for stream {} : ", streamName, ioe);
+ }
+ if (null == dlm) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted from sleep while creating dlm for stream {} : ",
+ streamName, e);
+ }
+ }
+ }
+ logger.info("Created dlm for stream {}.", streamName);
+
+ List<LogSegmentMetadata> segments = null;
+ while (null == segments) {
+ try {
+ segments = dlm.getLogSegments();
+ } catch (IOException ioe) {
+ logger.warn("Failed to get log segments for stream {} : ", streamName, ioe);
+ }
+ if (null == segments) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted from sleep while geting log segments for stream {} : ",
+ streamName, e);
+ }
+ }
+ }
+
+ final Counter readCounter = statsLogger.getCounter("reads");
+
+ logger.info("Reading from log segments : {}", segments);
+
+ ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
+ .uri(uri)
+ .name("benchmark-zkc")
+ .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
+ .zkAclId(null)
+ .build();
+ BKDLConfig bkdlConfig;
+ try {
+ bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri);
+ } catch (IOException e) {
+ return;
+ }
+
+ BookKeeper bk;
+ try {
+ bk = BookKeeperClientBuilder.newBuilder()
+ .name("benchmark-bkc")
+ .dlConfig(conf)
+ .zkServers(bkdlConfig.getBkZkServersForReader())
+ .ledgersPath(bkdlConfig.getBkLedgersPath())
+ .build()
+ .get();
+ } catch (IOException e) {
+ return;
+ }
+
+ final int readConcurrency = conf.getInt("ledger_read_concurrency", 1000);
+ boolean streamRead = conf.getBoolean("ledger_stream_read", true);
+ try {
+ for (LogSegmentMetadata segment : segments) {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ long lid = segment.getLogSegmentId();
+ LedgerHandle lh = bk.openLedgerNoRecovery(
+ lid, BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8));
+ logger.info("It took {} ms to open log segment {}",
+ new Object[] { stopwatch.elapsed(TimeUnit.MILLISECONDS), (lh.getLastAddConfirmed() + 1), segment });
+ stopwatch.reset().start();
+ Runnable reader;
+ if (streamRead) {
+ reader = new LedgerStreamReader(lh, new BookkeeperInternalCallbacks.ReadEntryListener() {
+ @Override
+ public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) {
+ readCounter.inc();
+ }
+ }, readConcurrency);
+ } else {
+ reader = new LedgerStreamReader(lh, new BookkeeperInternalCallbacks.ReadEntryListener() {
+ @Override
+ public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) {
+ readCounter.inc();
+ }
+ }, readConcurrency);
+ }
+ reader.run();
+ logger.info("It took {} ms to complete reading {} entries from log segment {}",
+ new Object[] { stopwatch.elapsed(TimeUnit.MILLISECONDS), (lh.getLastAddConfirmed() + 1), segment });
+ }
+ } catch (Exception e) {
+ logger.error("Error on reading bk ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerStreamReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerStreamReader.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerStreamReader.java
new file mode 100644
index 0000000..11c3482
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerStreamReader.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import java.util.Enumeration;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reading ledger in a streaming way.
+ */
+public class LedgerStreamReader implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(LedgerStreamReader.class);
+
+ class PendingReadRequest implements AsyncCallback.ReadCallback {
+
+ final long entryId;
+ boolean isDone = false;
+ int rc;
+ LedgerEntry entry = null;
+
+ PendingReadRequest(long entryId) {
+ this.entryId = entryId;
+ }
+
+ void read() {
+ lh.asyncReadEntries(entryId, entryId, this, null);
+ }
+
+ void complete(ReadEntryListener listener) {
+ listener.onEntryComplete(rc, lh, entry, null);
+ }
+
+ @Override
+ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> enumeration, Object ctx) {
+ this.rc = rc;
+ if (BKException.Code.OK == rc && enumeration.hasMoreElements()) {
+ entry = enumeration.nextElement();
+ } else {
+ entry = null;
+ }
+ isDone = true;
+ // construct a new read request
+ long nextEntry = nextReadEntry.getAndIncrement();
+ if (nextEntry <= lac) {
+ PendingReadRequest nextRead =
+ new PendingReadRequest(nextEntry);
+ pendingReads.add(nextRead);
+ nextRead.read();
+ }
+ triggerCallbacks();
+ }
+ }
+
+ private final LedgerHandle lh;
+ private final long lac;
+ private final ReadEntryListener readEntryListener;
+ private final int concurrency;
+ private final AtomicLong nextReadEntry = new AtomicLong(0);
+ private final CountDownLatch done = new CountDownLatch(1);
+ private final ConcurrentLinkedQueue<PendingReadRequest> pendingReads =
+ new ConcurrentLinkedQueue<PendingReadRequest>();
+
+ public LedgerStreamReader(LedgerHandle lh,
+ ReadEntryListener readEntryListener,
+ int concurrency) {
+ this.lh = lh;
+ this.lac = lh.getLastAddConfirmed();
+ this.readEntryListener = readEntryListener;
+ this.concurrency = concurrency;
+ for (int i = 0; i < concurrency; i++) {
+ long entryId = nextReadEntry.getAndIncrement();
+ if (entryId > lac) {
+ break;
+ }
+ PendingReadRequest request = new PendingReadRequest(entryId);
+ pendingReads.add(request);
+ request.read();
+ }
+ if (pendingReads.isEmpty()) {
+ done.countDown();
+ }
+ }
+
+ synchronized void triggerCallbacks() {
+ PendingReadRequest request;
+ while ((request = pendingReads.peek()) != null) {
+ if (!request.isDone) {
+ break;
+ }
+ pendingReads.remove();
+ request.complete(readEntryListener);
+ }
+ if (pendingReads.isEmpty()) {
+ done.countDown();
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ done.await();
+ } catch (InterruptedException e) {
+ logger.info("Interrupted on stream reading ledger {} : ", lh.getId(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/ReadMode.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/ReadMode.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/ReadMode.java
new file mode 100644
index 0000000..ea5ed36
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/ReadMode.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+/**
+ * The read mode for streaming read benchmark.
+ */
+public enum ReadMode {
+ OLDEST,
+ LATEST,
+ REWIND,
+ POSITION
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java
new file mode 100644
index 0000000..d3083ca
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import java.io.File;
+import java.net.URI;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Benchmark Streams.
+ */
+public abstract class StreamBenchmark {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamBenchmark.class);
+
+ private static final String USAGE = "StreamBenchmark <benchmark-class> [options]";
+
+ protected final Options options = new Options();
+ protected URI uri;
+ protected DistributedLogConfiguration conf;
+ protected StatsProvider statsProvider;
+ protected String streamName;
+
+ protected StreamBenchmark() {
+ options.addOption("c", "conf", true, "Configuration File");
+ options.addOption("u", "uri", true, "DistributedLog URI");
+ options.addOption("p", "stats-provider", true, "Stats Provider");
+ options.addOption("s", "stream", true, "Stream Name");
+ options.addOption("h", "help", false, "Print usage.");
+ }
+
+ protected Options getOptions() {
+ return options;
+ }
+
+ protected void printUsage() {
+ HelpFormatter hf = new HelpFormatter();
+ hf.printHelp(USAGE, options);
+ }
+
+ protected void parseCommandLine(String[] args)
+ throws Exception {
+ BasicParser parser = new BasicParser();
+ CommandLine cmdline = parser.parse(options, args);
+ if (cmdline.hasOption("h")) {
+ printUsage();
+ System.exit(0);
+ }
+ if (cmdline.hasOption("u")) {
+ this.uri = URI.create(cmdline.getOptionValue("u"));
+ } else {
+ printUsage();
+ System.exit(0);
+ }
+ this.conf = new DistributedLogConfiguration();
+ if (cmdline.hasOption("c")) {
+ String configFile = cmdline.getOptionValue("c");
+ this.conf.loadConf(new File(configFile).toURI().toURL());
+ }
+ if (cmdline.hasOption("p")) {
+ statsProvider = ReflectionUtils.newInstance(cmdline.getOptionValue("p"), StatsProvider.class);
+ } else {
+ statsProvider = new NullStatsProvider();
+ }
+ if (cmdline.hasOption("s")) {
+ this.streamName = cmdline.getOptionValue("s");
+ } else {
+ printUsage();
+ System.exit(0);
+ }
+ parseCommandLine(cmdline);
+ }
+
+ protected abstract void parseCommandLine(CommandLine cmdline);
+
+ protected void run(String[] args) throws Exception {
+ logger.info("Parsing arguments for benchmark : {}", args);
+ // parse command line
+ parseCommandLine(args);
+ statsProvider.start(conf);
+ // run the benchmark
+ StatsLogger statsLogger = statsProvider.getStatsLogger("dl");
+ DistributedLogNamespace namespace =
+ DistributedLogNamespaceBuilder.newBuilder()
+ .conf(conf)
+ .uri(uri)
+ .statsLogger(statsLogger)
+ .build();
+ try {
+ benchmark(namespace, streamName, statsProvider.getStatsLogger("benchmark"));
+ } finally {
+ namespace.close();
+ statsProvider.stop();
+ }
+ }
+
+ protected abstract void benchmark(DistributedLogNamespace namespace,
+ String logName,
+ StatsLogger statsLogger);
+
+ public static void main(String[] args) throws Exception {
+ if (args.length <= 0) {
+ System.err.println(USAGE);
+ return;
+ }
+ String benchmarkClassName = args[0];
+ StreamBenchmark benchmark = ReflectionUtils.newInstance(
+ benchmarkClassName, StreamBenchmark.class);
+ benchmark.run(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
new file mode 100644
index 0000000..4abb317
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.stream;
+
+import com.google.common.base.Stopwatch;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogReader;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Benchmark on {@link org.apache.distributedlog.LogReader} reading from a stream.
+ */
+public class SyncReaderBenchmark extends AbstractReaderBenchmark {
+
+ private static final Logger logger = LoggerFactory.getLogger(SyncReaderBenchmark.class);
+
+ public SyncReaderBenchmark() {}
+
+ @Override
+ protected void benchmark(DistributedLogNamespace namespace, String streamName, StatsLogger statsLogger) {
+ DistributedLogManager dlm = null;
+ while (null == dlm) {
+ try {
+ dlm = namespace.openLog(streamName);
+ } catch (IOException ioe) {
+ logger.warn("Failed to create dlm for stream {} : ", streamName, ioe);
+ }
+ if (null == dlm) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted from sleep while creating dlm for stream {} : ",
+ streamName, e);
+ }
+ }
+ }
+ OpStatsLogger openReaderStats = statsLogger.getOpStatsLogger("open_reader");
+ OpStatsLogger nonBlockingReadStats = statsLogger.getOpStatsLogger("non_blocking_read");
+ OpStatsLogger blockingReadStats = statsLogger.getOpStatsLogger("blocking_read");
+ Counter nullReadCounter = statsLogger.getCounter("null_read");
+
+ logger.info("Created dlm for stream {}.", streamName);
+ LogReader reader = null;
+ Long lastTxId = null;
+ while (null == reader) {
+ // initialize the last txid
+ if (null == lastTxId) {
+ switch (readMode) {
+ case OLDEST:
+ lastTxId = 0L;
+ break;
+ case LATEST:
+ try {
+ lastTxId = dlm.getLastTxId();
+ } catch (IOException ioe) {
+ continue;
+ }
+ break;
+ case REWIND:
+ lastTxId = System.currentTimeMillis() - rewindMs;
+ break;
+ case POSITION:
+ lastTxId = fromTxId;
+ break;
+ default:
+ logger.warn("Unsupported mode {}", readMode);
+ printUsage();
+ System.exit(0);
+ break;
+ }
+ logger.info("Reading from transaction id {}", lastTxId);
+ }
+ // Open the reader
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ reader = dlm.getInputStream(lastTxId);
+ long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS);
+ openReaderStats.registerSuccessfulEvent(elapsedMs);
+ logger.info("It took {} ms to position the reader to transaction id {}", lastTxId);
+ } catch (IOException ioe) {
+ openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ logger.warn("Failed to create reader for stream {} reading from {}.", streamName, lastTxId);
+ }
+ if (null == reader) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ",
+ streamName, e);
+ }
+ continue;
+ }
+
+ // read loop
+
+ LogRecord record;
+ boolean nonBlocking = false;
+ stopwatch = Stopwatch.createUnstarted();
+ long numCatchupReads = 0L;
+ long numCatchupBytes = 0L;
+ Stopwatch catchupStopwatch = Stopwatch.createStarted();
+ while (true) {
+ try {
+ stopwatch.start();
+ record = reader.readNext(nonBlocking);
+ if (null != record) {
+ long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
+ if (nonBlocking) {
+ nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros);
+ } else {
+ numCatchupBytes += record.getPayload().length;
+ ++numCatchupReads;
+ blockingReadStats.registerSuccessfulEvent(elapsedMicros);
+ }
+ lastTxId = record.getTransactionId();
+ } else {
+ nullReadCounter.inc();
+ }
+ if (null == record && !nonBlocking) {
+ nonBlocking = true;
+ catchupStopwatch.stop();
+ logger.info("Catchup {} records (total {} bytes) in {} milliseconds",
+ new Object[] { numCatchupReads, numCatchupBytes,
+ stopwatch.elapsed(TimeUnit.MILLISECONDS) });
+ }
+ stopwatch.reset();
+ } catch (IOException e) {
+ logger.warn("Encountered reading record from stream {} : ", streamName, e);
+ reader = null;
+ break;
+ }
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds());
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted from sleep while creating reader for stream {} : ",
+ streamName, e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/package-info.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/package-info.java
new file mode 100644
index 0000000..b95a40f
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stream level benchmarks.
+ */
+package org.apache.distributedlog.benchmark.stream;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/ShiftableRateLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/ShiftableRateLimiter.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/ShiftableRateLimiter.java
new file mode 100644
index 0000000..03c561c
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/ShiftableRateLimiter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.benchmark.utils;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A wrapper over rate limiter.
+ */
+public class ShiftableRateLimiter implements Runnable {
+
+ private final RateLimiter rateLimiter;
+ private final ScheduledExecutorService executor;
+ private final double initialRate, maxRate, changeRate;
+ private final long changeInterval;
+ private final TimeUnit changeIntervalUnit;
+ private double nextRate;
+
+ public ShiftableRateLimiter(double initialRate,
+ double maxRate,
+ double changeRate,
+ long changeInterval,
+ TimeUnit changeIntervalUnit) {
+ this.initialRate = initialRate;
+ this.maxRate = maxRate;
+ this.changeRate = changeRate;
+ this.nextRate = initialRate;
+ this.changeInterval = changeInterval;
+ this.changeIntervalUnit = changeIntervalUnit;
+ this.rateLimiter = RateLimiter.create(initialRate);
+ this.executor = Executors.newSingleThreadScheduledExecutor();
+ this.executor.scheduleAtFixedRate(this, changeInterval, changeInterval, changeIntervalUnit);
+ }
+
+ public ShiftableRateLimiter duplicate() {
+ return new ShiftableRateLimiter(
+ initialRate,
+ maxRate,
+ changeRate,
+ changeInterval,
+ changeIntervalUnit);
+ }
+
+ @Override
+ public void run() {
+ this.nextRate = Math.min(nextRate + changeRate, maxRate);
+ this.rateLimiter.setRate(nextRate);
+ }
+
+ public RateLimiter getLimiter() {
+ return this.rateLimiter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/package-info.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/package-info.java
new file mode 100644
index 0000000..c650bab
--- /dev/null
+++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utils for benchmarking.
+ */
+package org.apache.distributedlog.benchmark.utils;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/resources/findbugsExclude.xml b/distributedlog-benchmark/src/main/resources/findbugsExclude.xml
index b7a1ecb..0ab2b6b 100644
--- a/distributedlog-benchmark/src/main/resources/findbugsExclude.xml
+++ b/distributedlog-benchmark/src/main/resources/findbugsExclude.xml
@@ -18,6 +18,6 @@
<FindBugsFilter>
<Match>
<!-- generated code, we can't be held responsible for findbugs in it //-->
- <Class name="~com\.twitter\.distributedlog\.benchmark\.thrift.*" />
+ <Class name="~org\.apache\.distributedlog\.benchmark\.thrift.*" />
</Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/thrift/loadtest.thrift
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/thrift/loadtest.thrift b/distributedlog-benchmark/src/main/thrift/loadtest.thrift
index 6d98cec..48c5d5a 100644
--- a/distributedlog-benchmark/src/main/thrift/loadtest.thrift
+++ b/distributedlog-benchmark/src/main/thrift/loadtest.thrift
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-namespace java com.twitter.distributedlog.benchmark.thrift
+namespace java org.apache.distributedlog.benchmark.thrift
struct Message {
1: i64 publishTime;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-client/pom.xml b/distributedlog-client/pom.xml
index aad5093..f09caf1 100644
--- a/distributedlog-client/pom.xml
+++ b/distributedlog-client/pom.xml
@@ -137,7 +137,7 @@
<properties>
<property>
<name>listener</name>
- <value>com.twitter.distributedlog.TimedOutTestsListener</value>
+ <value>org.apache.distributedlog.TimedOutTestsListener</value>
</property>
</properties>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java
deleted file mode 100644
index de74f5a..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Client Config.
- */
-public class ClientConfig {
- int redirectBackoffStartMs = 25;
- int redirectBackoffMaxMs = 100;
- int maxRedirects = -1;
- int requestTimeoutMs = -1;
- boolean thriftmux = false;
- boolean streamFailfast = false;
- String streamNameRegex = ".*";
- boolean handshakeWithClientInfo = true;
- long periodicHandshakeIntervalMs = TimeUnit.MINUTES.toMillis(5);
- long periodicOwnershipSyncIntervalMs = TimeUnit.MINUTES.toMillis(5);
- boolean periodicDumpOwnershipCacheEnabled = false;
- long periodicDumpOwnershipCacheIntervalMs = TimeUnit.MINUTES.toMillis(10);
- boolean enableHandshakeTracing = false;
- boolean enableChecksum = true;
-
- public ClientConfig setMaxRedirects(int maxRedirects) {
- this.maxRedirects = maxRedirects;
- return this;
- }
-
- public int getMaxRedirects() {
- return this.maxRedirects;
- }
-
- public ClientConfig setRequestTimeoutMs(int timeoutInMillis) {
- this.requestTimeoutMs = timeoutInMillis;
- return this;
- }
-
- public int getRequestTimeoutMs() {
- return this.requestTimeoutMs;
- }
-
- public ClientConfig setRedirectBackoffStartMs(int ms) {
- this.redirectBackoffStartMs = ms;
- return this;
- }
-
- public int getRedirectBackoffStartMs() {
- return this.redirectBackoffStartMs;
- }
-
- public ClientConfig setRedirectBackoffMaxMs(int ms) {
- this.redirectBackoffMaxMs = ms;
- return this;
- }
-
- public int getRedirectBackoffMaxMs() {
- return this.redirectBackoffMaxMs;
- }
-
- public ClientConfig setThriftMux(boolean enabled) {
- this.thriftmux = enabled;
- return this;
- }
-
- public boolean getThriftMux() {
- return this.thriftmux;
- }
-
- public ClientConfig setStreamFailfast(boolean enabled) {
- this.streamFailfast = enabled;
- return this;
- }
-
- public boolean getStreamFailfast() {
- return this.streamFailfast;
- }
-
- public ClientConfig setStreamNameRegex(String nameRegex) {
- checkNotNull(nameRegex);
- this.streamNameRegex = nameRegex;
- return this;
- }
-
- public String getStreamNameRegex() {
- return this.streamNameRegex;
- }
-
- public ClientConfig setHandshakeWithClientInfo(boolean enabled) {
- this.handshakeWithClientInfo = enabled;
- return this;
- }
-
- public boolean getHandshakeWithClientInfo() {
- return this.handshakeWithClientInfo;
- }
-
- public ClientConfig setPeriodicHandshakeIntervalMs(long intervalMs) {
- this.periodicHandshakeIntervalMs = intervalMs;
- return this;
- }
-
- public long getPeriodicHandshakeIntervalMs() {
- return this.periodicHandshakeIntervalMs;
- }
-
- public ClientConfig setPeriodicOwnershipSyncIntervalMs(long intervalMs) {
- this.periodicOwnershipSyncIntervalMs = intervalMs;
- return this;
- }
-
- public long getPeriodicOwnershipSyncIntervalMs() {
- return this.periodicOwnershipSyncIntervalMs;
- }
-
- public ClientConfig setPeriodicDumpOwnershipCacheEnabled(boolean enabled) {
- this.periodicDumpOwnershipCacheEnabled = enabled;
- return this;
- }
-
- public boolean isPeriodicDumpOwnershipCacheEnabled() {
- return this.periodicDumpOwnershipCacheEnabled;
- }
-
- public ClientConfig setPeriodicDumpOwnershipCacheIntervalMs(long intervalMs) {
- this.periodicDumpOwnershipCacheIntervalMs = intervalMs;
- return this;
- }
-
- public long getPeriodicDumpOwnershipCacheIntervalMs() {
- return this.periodicDumpOwnershipCacheIntervalMs;
- }
-
- public ClientConfig setHandshakeTracingEnabled(boolean enabled) {
- this.enableHandshakeTracing = enabled;
- return this;
- }
-
- public boolean isHandshakeTracingEnabled() {
- return this.enableHandshakeTracing;
- }
-
- public ClientConfig setChecksumEnabled(boolean enabled) {
- this.enableChecksum = enabled;
- return this;
- }
-
- public boolean isChecksumEnabled() {
- return this.enableChecksum;
- }
-
- public static ClientConfig newConfig(ClientConfig config) {
- ClientConfig newConfig = new ClientConfig();
- newConfig.setMaxRedirects(config.getMaxRedirects())
- .setRequestTimeoutMs(config.getRequestTimeoutMs())
- .setRedirectBackoffStartMs(config.getRedirectBackoffStartMs())
- .setRedirectBackoffMaxMs(config.getRedirectBackoffMaxMs())
- .setThriftMux(config.getThriftMux())
- .setStreamFailfast(config.getStreamFailfast())
- .setStreamNameRegex(config.getStreamNameRegex())
- .setHandshakeWithClientInfo(config.getHandshakeWithClientInfo())
- .setPeriodicHandshakeIntervalMs(config.getPeriodicHandshakeIntervalMs())
- .setPeriodicDumpOwnershipCacheEnabled(config.isPeriodicDumpOwnershipCacheEnabled())
- .setPeriodicDumpOwnershipCacheIntervalMs(config.getPeriodicDumpOwnershipCacheIntervalMs())
- .setHandshakeTracingEnabled(config.isHandshakeTracingEnabled())
- .setChecksumEnabled(config.isChecksumEnabled());
- return newConfig;
- }
-}