You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:46 UTC
[41/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
new file mode 100644
index 0000000..f1da33c
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.speculative;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of {@link SpeculativeRequestExecutionPolicy}.
+ */
+public class DefaultSpeculativeRequestExecutionPolicy implements SpeculativeRequestExecutionPolicy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultSpeculativeRequestExecutionPolicy.class);
+ final int firstSpeculativeRequestTimeout;
+ final int maxSpeculativeRequestTimeout;
+ final float backoffMultiplier;
+ int nextSpeculativeRequestTimeout;
+
+ public DefaultSpeculativeRequestExecutionPolicy(int firstSpeculativeRequestTimeout,
+ int maxSpeculativeRequestTimeout,
+ float backoffMultiplier) {
+ this.firstSpeculativeRequestTimeout = firstSpeculativeRequestTimeout;
+ this.maxSpeculativeRequestTimeout = maxSpeculativeRequestTimeout;
+ this.backoffMultiplier = backoffMultiplier;
+ this.nextSpeculativeRequestTimeout = firstSpeculativeRequestTimeout;
+
+ if (backoffMultiplier <= 0) {
+ throw new IllegalArgumentException("Invalid value provided for backoffMultiplier");
+ }
+
+ // Prevent potential over flow
+ if (Math.round((double) maxSpeculativeRequestTimeout * (double) backoffMultiplier) > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("Invalid values for maxSpeculativeRequestTimeout and backoffMultiplier");
+ }
+ }
+
+ @VisibleForTesting
+ int getNextSpeculativeRequestTimeout() {
+ return nextSpeculativeRequestTimeout;
+ }
+
+ /**
+ * Initialize the speculative request execution policy.
+ *
+ * @param scheduler The scheduler service to issue the speculative request
+ * @param requestExecutor The executor is used to issue the actual speculative requests
+ */
+ @Override
+ public void initiateSpeculativeRequest(final ScheduledExecutorService scheduler,
+ final SpeculativeRequestExecutor requestExecutor) {
+ issueSpeculativeRequest(scheduler, requestExecutor);
+ }
+
+ private void issueSpeculativeRequest(final ScheduledExecutorService scheduler,
+ final SpeculativeRequestExecutor requestExecutor) {
+ Future<Boolean> issueNextRequest = requestExecutor.issueSpeculativeRequest();
+ issueNextRequest.addEventListener(new FutureEventListener<Boolean>() {
+ // we want this handler to run immediately after we push the big red button!
+ @Override
+ public void onSuccess(Boolean issueNextRequest) {
+ if (issueNextRequest) {
+ scheduleSpeculativeRequest(scheduler, requestExecutor, nextSpeculativeRequestTimeout);
+ nextSpeculativeRequestTimeout = Math.min(maxSpeculativeRequestTimeout,
+ (int) (nextSpeculativeRequestTimeout * backoffMultiplier));
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Stopped issuing speculative requests for {}, "
+ + "speculativeReadTimeout = {}", requestExecutor, nextSpeculativeRequestTimeout);
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable thrown) {
+ LOG.warn("Failed to issue speculative request for {}, speculativeReadTimeout = {} : ",
+ new Object[] { requestExecutor, nextSpeculativeRequestTimeout, thrown });
+ }
+ });
+ }
+
+ private void scheduleSpeculativeRequest(final ScheduledExecutorService scheduler,
+ final SpeculativeRequestExecutor requestExecutor,
+ final int speculativeRequestTimeout) {
+ try {
+ scheduler.schedule(new Runnable() {
+ @Override
+ public void run() {
+ issueSpeculativeRequest(scheduler, requestExecutor);
+ }
+ }, speculativeRequestTimeout, TimeUnit.MILLISECONDS);
+ } catch (RejectedExecutionException re) {
+ if (!scheduler.isShutdown()) {
+ LOG.warn("Failed to schedule speculative request for {}, speculativeReadTimeout = {} : ",
+ new Object[]{requestExecutor, speculativeRequestTimeout, re});
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
new file mode 100644
index 0000000..faf45c2
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.speculative;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Speculative request execution policy.
+ */
+public interface SpeculativeRequestExecutionPolicy {
+ /**
+ * Initialize the speculative request execution policy and initiate requests.
+ *
+ * @param scheduler The scheduler service to issue the speculative request
+ * @param requestExecutor The executor is used to issue the actual speculative requests
+ */
+ void initiateSpeculativeRequest(ScheduledExecutorService scheduler,
+ SpeculativeRequestExecutor requestExecutor);
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
new file mode 100644
index 0000000..68fe8b0
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.speculative;
+
+import com.twitter.util.Future;
+
+/**
+ * Executor to execute speculative requests.
+ */
+public interface SpeculativeRequestExecutor {
+
+ /**
+ * Issues a speculative request and indicates if more speculative requests should be issued.
+ *
+ * @return whether more speculative requests should be issued.
+ */
+ Future<Boolean> issueSpeculativeRequest();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
new file mode 100644
index 0000000..4bdd4b1
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Speculative Mechanism.
+ */
+package org.apache.distributedlog.client.speculative;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
new file mode 100644
index 0000000..c2dcddd
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.stats;
+
+import org.apache.distributedlog.client.resolver.RegionResolver;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Client Stats.
+ */
+public class ClientStats {
+
+ // Region Resolver
+ private final RegionResolver regionResolver;
+
+ // Stats
+ private final StatsReceiver statsReceiver;
+ private final ClientStatsLogger clientStatsLogger;
+ private final boolean enableRegionStats;
+ private final ConcurrentMap<String, ClientStatsLogger> regionClientStatsLoggers;
+ private final ConcurrentMap<String, OpStats> opStatsMap;
+
+ public ClientStats(StatsReceiver statsReceiver,
+ boolean enableRegionStats,
+ RegionResolver regionResolver) {
+ this.statsReceiver = statsReceiver;
+ this.clientStatsLogger = new ClientStatsLogger(statsReceiver);
+ this.enableRegionStats = enableRegionStats;
+ this.regionClientStatsLoggers = new ConcurrentHashMap<String, ClientStatsLogger>();
+ this.regionResolver = regionResolver;
+ this.opStatsMap = new ConcurrentHashMap<String, OpStats>();
+ }
+
+ public OpStats getOpStats(String op) {
+ OpStats opStats = opStatsMap.get(op);
+ if (null != opStats) {
+ return opStats;
+ }
+ OpStats newStats = new OpStats(statsReceiver.scope(op),
+ enableRegionStats, regionResolver);
+ OpStats oldStats = opStatsMap.putIfAbsent(op, newStats);
+ if (null == oldStats) {
+ return newStats;
+ } else {
+ return oldStats;
+ }
+ }
+
+ private ClientStatsLogger getRegionClientStatsLogger(SocketAddress address) {
+ String region = regionResolver.resolveRegion(address);
+ return getRegionClientStatsLogger(region);
+ }
+
+ private ClientStatsLogger getRegionClientStatsLogger(String region) {
+ ClientStatsLogger statsLogger = regionClientStatsLoggers.get(region);
+ if (null == statsLogger) {
+ ClientStatsLogger newStatsLogger = new ClientStatsLogger(statsReceiver.scope(region));
+ ClientStatsLogger oldStatsLogger = regionClientStatsLoggers.putIfAbsent(region, newStatsLogger);
+ if (null == oldStatsLogger) {
+ statsLogger = newStatsLogger;
+ } else {
+ statsLogger = oldStatsLogger;
+ }
+ }
+ return statsLogger;
+ }
+
+ public StatsReceiver getFinagleStatsReceiver(SocketAddress addr) {
+ if (enableRegionStats && null != addr) {
+ return getRegionClientStatsLogger(addr).getStatsReceiver();
+ } else {
+ return clientStatsLogger.getStatsReceiver();
+ }
+ }
+
+ public void completeProxyRequest(SocketAddress addr, StatusCode code, long startTimeNanos) {
+ clientStatsLogger.completeProxyRequest(code, startTimeNanos);
+ if (enableRegionStats && null != addr) {
+ getRegionClientStatsLogger(addr).completeProxyRequest(code, startTimeNanos);
+ }
+ }
+
+ public void failProxyRequest(SocketAddress addr, Throwable cause, long startTimeNanos) {
+ clientStatsLogger.failProxyRequest(cause, startTimeNanos);
+ if (enableRegionStats && null != addr) {
+ getRegionClientStatsLogger(addr).failProxyRequest(cause, startTimeNanos);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
new file mode 100644
index 0000000..530c632
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.stats;
+
+import org.apache.distributedlog.thrift.service.StatusCode;
+import com.twitter.finagle.stats.Counter;
+import com.twitter.finagle.stats.Stat;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Stats Logger to collect client stats.
+ */
+public class ClientStatsLogger {
+
+ // Stats
+ private final StatsReceiver statsReceiver;
+ private final StatsReceiver responseStatsReceiver;
+ private final ConcurrentMap<StatusCode, Counter> responseStats =
+ new ConcurrentHashMap<StatusCode, Counter>();
+ private final StatsReceiver exceptionStatsReceiver;
+ private final ConcurrentMap<Class<?>, Counter> exceptionStats =
+ new ConcurrentHashMap<Class<?>, Counter>();
+
+ private final Stat proxySuccessLatencyStat;
+ private final Stat proxyFailureLatencyStat;
+
+ public ClientStatsLogger(StatsReceiver statsReceiver) {
+ this.statsReceiver = statsReceiver;
+ responseStatsReceiver = statsReceiver.scope("responses");
+ exceptionStatsReceiver = statsReceiver.scope("exceptions");
+ StatsReceiver proxyLatencyStatReceiver = statsReceiver.scope("proxy_request_latency");
+ proxySuccessLatencyStat = proxyLatencyStatReceiver.stat0("success");
+ proxyFailureLatencyStat = proxyLatencyStatReceiver.stat0("failure");
+ }
+
+ public StatsReceiver getStatsReceiver() {
+ return statsReceiver;
+ }
+
+ private Counter getResponseCounter(StatusCode code) {
+ Counter counter = responseStats.get(code);
+ if (null == counter) {
+ Counter newCounter = responseStatsReceiver.counter0(code.name());
+ Counter oldCounter = responseStats.putIfAbsent(code, newCounter);
+ counter = null != oldCounter ? oldCounter : newCounter;
+ }
+ return counter;
+ }
+
+ private Counter getExceptionCounter(Class<?> cls) {
+ Counter counter = exceptionStats.get(cls);
+ if (null == counter) {
+ Counter newCounter = exceptionStatsReceiver.counter0(cls.getName());
+ Counter oldCounter = exceptionStats.putIfAbsent(cls, newCounter);
+ counter = null != oldCounter ? oldCounter : newCounter;
+ }
+ return counter;
+ }
+
+ public void completeProxyRequest(StatusCode code, long startTimeNanos) {
+ getResponseCounter(code).incr();
+ proxySuccessLatencyStat.add(elapsedMicroSec(startTimeNanos));
+ }
+
+ public void failProxyRequest(Throwable cause, long startTimeNanos) {
+ getExceptionCounter(cause.getClass()).incr();
+ proxyFailureLatencyStat.add(elapsedMicroSec(startTimeNanos));
+ }
+
+ static long elapsedMicroSec(long startNanoTime) {
+ return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanoTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
new file mode 100644
index 0000000..7a49faa
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.stats;
+
+import org.apache.distributedlog.client.resolver.RegionResolver;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Op Stats.
+ */
+public class OpStats {
+
+ // Region Resolver
+ private final RegionResolver regionResolver;
+
+ // Stats
+ private final StatsReceiver statsReceiver;
+ private final OpStatsLogger opStatsLogger;
+ private final boolean enableRegionStats;
+ private final ConcurrentMap<String, OpStatsLogger> regionOpStatsLoggers;
+
+ public OpStats(StatsReceiver statsReceiver,
+ boolean enableRegionStats,
+ RegionResolver regionResolver) {
+ this.statsReceiver = statsReceiver;
+ this.opStatsLogger = new OpStatsLogger(statsReceiver);
+ this.enableRegionStats = enableRegionStats;
+ this.regionOpStatsLoggers = new ConcurrentHashMap<String, OpStatsLogger>();
+ this.regionResolver = regionResolver;
+ }
+
+ private OpStatsLogger getRegionOpStatsLogger(SocketAddress address) {
+ String region = regionResolver.resolveRegion(address);
+ return getRegionOpStatsLogger(region);
+ }
+
+ private OpStatsLogger getRegionOpStatsLogger(String region) {
+ OpStatsLogger statsLogger = regionOpStatsLoggers.get(region);
+ if (null == statsLogger) {
+ OpStatsLogger newStatsLogger = new OpStatsLogger(statsReceiver.scope(region));
+ OpStatsLogger oldStatsLogger = regionOpStatsLoggers.putIfAbsent(region, newStatsLogger);
+ if (null == oldStatsLogger) {
+ statsLogger = newStatsLogger;
+ } else {
+ statsLogger = oldStatsLogger;
+ }
+ }
+ return statsLogger;
+ }
+
+ public void completeRequest(SocketAddress addr, long micros, int numTries) {
+ opStatsLogger.completeRequest(micros, numTries);
+ if (enableRegionStats && null != addr) {
+ getRegionOpStatsLogger(addr).completeRequest(micros, numTries);
+ }
+ }
+
+ public void failRequest(SocketAddress addr, long micros, int numTries) {
+ opStatsLogger.failRequest(micros, numTries);
+ if (enableRegionStats && null != addr) {
+ getRegionOpStatsLogger(addr).failRequest(micros, numTries);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
new file mode 100644
index 0000000..b94b4be
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.stats;
+
+import com.twitter.finagle.stats.Stat;
+import com.twitter.finagle.stats.StatsReceiver;
+
+/**
+ * Stats Logger per operation type.
+ */
+public class OpStatsLogger {
+
+ private final Stat successLatencyStat;
+ private final Stat failureLatencyStat;
+ private final Stat redirectStat;
+
+ public OpStatsLogger(StatsReceiver statsReceiver) {
+ StatsReceiver latencyStatReceiver = statsReceiver.scope("latency");
+ successLatencyStat = latencyStatReceiver.stat0("success");
+ failureLatencyStat = latencyStatReceiver.stat0("failure");
+ StatsReceiver redirectStatReceiver = statsReceiver.scope("redirects");
+ redirectStat = redirectStatReceiver.stat0("times");
+ }
+
+ public void completeRequest(long micros, int numTries) {
+ successLatencyStat.add(micros);
+ redirectStat.add(numTries);
+ }
+
+ public void failRequest(long micros, int numTries) {
+ failureLatencyStat.add(micros);
+ redirectStat.add(numTries);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
new file mode 100644
index 0000000..110e99a
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.stats;
+
+import com.twitter.finagle.stats.Counter;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Stats Logger for ownerships.
+ */
+public class OwnershipStatsLogger {
+
+ /**
+ * Ownership related stats.
+ */
+ public static class OwnershipStat {
+ private final Counter hits;
+ private final Counter misses;
+ private final Counter removes;
+ private final Counter redirects;
+ private final Counter adds;
+
+ OwnershipStat(StatsReceiver ownershipStats) {
+ hits = ownershipStats.counter0("hits");
+ misses = ownershipStats.counter0("misses");
+ adds = ownershipStats.counter0("adds");
+ removes = ownershipStats.counter0("removes");
+ redirects = ownershipStats.counter0("redirects");
+ }
+
+ public void onHit() {
+ hits.incr();
+ }
+
+ public void onMiss() {
+ misses.incr();
+ }
+
+ public void onAdd() {
+ adds.incr();
+ }
+
+ public void onRemove() {
+ removes.incr();
+ }
+
+ public void onRedirect() {
+ redirects.incr();
+ }
+
+ }
+
+ private final OwnershipStat ownershipStat;
+ private final StatsReceiver ownershipStatsReceiver;
+ private final ConcurrentMap<String, OwnershipStat> ownershipStats =
+ new ConcurrentHashMap<String, OwnershipStat>();
+
+ public OwnershipStatsLogger(StatsReceiver statsReceiver,
+ StatsReceiver streamStatsReceiver) {
+ this.ownershipStat = new OwnershipStat(statsReceiver.scope("ownership"));
+ this.ownershipStatsReceiver = streamStatsReceiver.scope("perstream_ownership");
+ }
+
+ private OwnershipStat getOwnershipStat(String stream) {
+ OwnershipStat stat = ownershipStats.get(stream);
+ if (null == stat) {
+ OwnershipStat newStat = new OwnershipStat(ownershipStatsReceiver.scope(stream));
+ OwnershipStat oldStat = ownershipStats.putIfAbsent(stream, newStat);
+ stat = null != oldStat ? oldStat : newStat;
+ }
+ return stat;
+ }
+
+ public void onMiss(String stream) {
+ ownershipStat.onMiss();
+ getOwnershipStat(stream).onMiss();
+ }
+
+ public void onHit(String stream) {
+ ownershipStat.onHit();
+ getOwnershipStat(stream).onHit();
+ }
+
+ public void onRedirect(String stream) {
+ ownershipStat.onRedirect();
+ getOwnershipStat(stream).onRedirect();
+ }
+
+ public void onRemove(String stream) {
+ ownershipStat.onRemove();
+ getOwnershipStat(stream).onRemove();
+ }
+
+ public void onAdd(String stream) {
+ ownershipStat.onAdd();
+ getOwnershipStat(stream).onAdd();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
new file mode 100644
index 0000000..106d3fc
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Client side stats utils.
+ */
+package org.apache.distributedlog.client.stats;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
new file mode 100644
index 0000000..68e6825
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Socket Address identifier for a DL proxy.
+ */
+public class DLSocketAddress {
+
+ private static final int VERSION = 1;
+
+ private static final String COLON = ":";
+ private static final String SEP = ";";
+
+ private final int shard;
+ private final InetSocketAddress socketAddress;
+
+ public DLSocketAddress(int shard, InetSocketAddress socketAddress) {
+ this.shard = shard;
+ this.socketAddress = socketAddress;
+ }
+
+ /**
+ * Shard id for dl write proxy.
+ *
+ * @return shard id for dl write proxy.
+ */
+ public int getShard() {
+ return shard;
+ }
+
+ /**
+ * Socket address for dl write proxy.
+ *
+ * @return socket address for dl write proxy
+ */
+ public InetSocketAddress getSocketAddress() {
+ return socketAddress;
+ }
+
+ /**
+ * Serialize the write proxy identifier to string.
+ *
+ * @return serialized write proxy identifier.
+ */
+ public String serialize() {
+ return toLockId(socketAddress, shard);
+ }
+
+ @Override
+ public int hashCode() {
+ return socketAddress.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof DLSocketAddress)) {
+ return false;
+ }
+ DLSocketAddress other = (DLSocketAddress) obj;
+ return shard == other.shard && socketAddress.equals(other.socketAddress);
+ }
+
+ @Override
+ public String toString() {
+ return toLockId(socketAddress, shard);
+ }
+
+ /**
+ * Deserialize proxy address from a string representation.
+ *
+ * @param lockId
+ * string representation of the proxy address.
+ * @return proxy address.
+ * @throws IOException
+ */
+ public static DLSocketAddress deserialize(String lockId) throws IOException {
+ String parts[] = lockId.split(SEP);
+ if (3 != parts.length) {
+ throw new IOException("Invalid dl socket address " + lockId);
+ }
+ int version;
+ try {
+ version = Integer.parseInt(parts[0]);
+ } catch (NumberFormatException nfe) {
+ throw new IOException("Invalid version found in " + lockId, nfe);
+ }
+ if (VERSION != version) {
+ throw new IOException("Invalid version " + version + " found in " + lockId + ", expected " + VERSION);
+ }
+ int shardId;
+ try {
+ shardId = Integer.parseInt(parts[1]);
+ } catch (NumberFormatException nfe) {
+ throw new IOException("Invalid shard id found in " + lockId, nfe);
+ }
+ InetSocketAddress address = parseSocketAddress(parts[2]);
+ return new DLSocketAddress(shardId, address);
+ }
+
+ /**
+ * Parse the inet socket address from the string representation.
+ *
+ * @param addr
+ * string representation
+ * @return inet socket address
+ */
+ public static InetSocketAddress parseSocketAddress(String addr) {
+ String[] parts = addr.split(COLON);
+ checkArgument(parts.length == 2);
+ String hostname = parts[0];
+ int port = Integer.parseInt(parts[1]);
+ return new InetSocketAddress(hostname, port);
+ }
+
+ public static InetSocketAddress getSocketAddress(int port) throws UnknownHostException {
+ return new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), port);
+ }
+
+ /**
+ * Convert inet socket address to the string representation.
+ *
+ * @param address
+ * inet socket address.
+ * @return string representation of inet socket address.
+ */
+ public static String toString(InetSocketAddress address) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(address.getHostName()).append(COLON).append(address.getPort());
+ return sb.toString();
+ }
+
+ public static String toLockId(InetSocketAddress address, int shard) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(VERSION).append(SEP).append(shard).append(SEP).append(toString(address));
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
new file mode 100644
index 0000000..9f30815
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecordSetBuffer;
+import com.twitter.util.Future;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Interface for distributedlog client.
+ */
+public interface DistributedLogClient {
+ /**
+ * Write <i>data</i> to a given <i>stream</i>.
+ *
+ * @param stream
+ * Stream Name.
+ * @param data
+ * Data to write.
+ * @return a future representing a sequence id returned for this write.
+ */
+ Future<DLSN> write(String stream, ByteBuffer data);
+
+ /**
+ * Write record set to a given <i>stream</i>.
+ *
+ * <p>The record set is built from {@link org.apache.distributedlog.LogRecordSet.Writer}
+ *
+ * @param stream stream to write to
+ * @param recordSet record set
+ */
+ Future<DLSN> writeRecordSet(String stream, LogRecordSetBuffer recordSet);
+
+ /**
+ * Write <i>data</i> in bulk to a given <i>stream</i>.
+ *
+ * <p>Return a list of Future dlsns, one for each submitted buffer. In the event of a partial
+ * failure--ex. some specific buffer write fails, all subsequent writes
+ * will also fail.
+ *
+ * @param stream
+ * Stream Name.
+ * @param data
+ * Data to write.
+ * @return a list of futures, one for each submitted buffer.
+ */
+ List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data);
+
+ /**
+ * Truncate the stream to a given <i>dlsn</i>.
+ *
+ * @param stream
+ * Stream Name.
+ * @param dlsn
+ * DLSN to truncate until.
+ * @return a future representing the truncation.
+ */
+ Future<Boolean> truncate(String stream, DLSN dlsn);
+
+ /**
+ * Release the ownership of a stream <i>stream</i>.
+ *
+ * @param stream
+ * Stream Name to release.
+ * @return a future representing the release operation.
+ */
+ Future<Void> release(String stream);
+
+ /**
+ * Delete a given stream <i>stream</i>.
+ *
+ * @param stream
+ * Stream Name to delete.
+ * @return a future representing the delete operation.
+ */
+ Future<Void> delete(String stream);
+
+ /**
+ * Create a stream with name <i>stream</i>.
+ *
+ * @param stream
+ * Stream Name to create.
+ * @return a future representing the create operation.
+ */
+ Future<Void> create(String stream);
+
+ /**
+ * Close the client.
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
new file mode 100644
index 0000000..0e2a152
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
@@ -0,0 +1,608 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.client.ClientConfig;
+import org.apache.distributedlog.client.DistributedLogClientImpl;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.client.proxy.ClusterClient;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.client.resolver.RegionResolver;
+import org.apache.distributedlog.client.routing.RegionsRoutingService;
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.client.routing.RoutingUtils;
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import com.twitter.finagle.Name;
+import com.twitter.finagle.Resolver$;
+import com.twitter.finagle.Service;
+import com.twitter.finagle.ThriftMux;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.finagle.thrift.ClientId;
+import com.twitter.finagle.thrift.ThriftClientFramedCodec;
+import com.twitter.finagle.thrift.ThriftClientRequest;
+import com.twitter.util.Duration;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.Random;
+import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+/**
+ * Builder to build {@link DistributedLogClient}.
+ */
+public final class DistributedLogClientBuilder {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientBuilder.class);
+
+ private static final Random random = new Random(System.currentTimeMillis());
+
+ private String name = null;
+ private ClientId clientId = null;
+ private RoutingService.Builder routingServiceBuilder = null;
+ private ClientBuilder clientBuilder = null;
+ private String serverRoutingServiceFinagleName = null;
+ private StatsReceiver statsReceiver = new NullStatsReceiver();
+ private StatsReceiver streamStatsReceiver = new NullStatsReceiver();
+ private ClientConfig clientConfig = new ClientConfig();
+ private boolean enableRegionStats = false;
+ private final RegionResolver regionResolver = new DefaultRegionResolver();
+
+ /**
+ * Create a client builder.
+ *
+ * @return client builder
+ */
+ public static DistributedLogClientBuilder newBuilder() {
+ return new DistributedLogClientBuilder();
+ }
+
+ /**
+ * Create a new client builder from an existing {@code builder}.
+ *
+ * @param builder the existing builder.
+ * @return a new client builder.
+ */
+ public static DistributedLogClientBuilder newBuilder(DistributedLogClientBuilder builder) {
+ DistributedLogClientBuilder newBuilder = new DistributedLogClientBuilder();
+ newBuilder.name = builder.name;
+ newBuilder.clientId = builder.clientId;
+ newBuilder.clientBuilder = builder.clientBuilder;
+ newBuilder.routingServiceBuilder = builder.routingServiceBuilder;
+ newBuilder.statsReceiver = builder.statsReceiver;
+ newBuilder.streamStatsReceiver = builder.streamStatsReceiver;
+ newBuilder.enableRegionStats = builder.enableRegionStats;
+ newBuilder.serverRoutingServiceFinagleName = builder.serverRoutingServiceFinagleName;
+ newBuilder.clientConfig = ClientConfig.newConfig(builder.clientConfig);
+ return newBuilder;
+ }
+
+ // private constructor
+ private DistributedLogClientBuilder() {}
+
+ /**
+ * Client Name.
+ *
+ * @param name
+ * client name
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder name(String name) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.name = name;
+ return newBuilder;
+ }
+
+ /**
+ * Client ID.
+ *
+ * @param clientId
+ * client id
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder clientId(ClientId clientId) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientId = clientId;
+ return newBuilder;
+ }
+
+ /**
+ * Serverset to access proxy services.
+ *
+ * @param serverSet
+ * server set.
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder serverSet(ServerSet serverSet) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(serverSet);
+ newBuilder.enableRegionStats = false;
+ return newBuilder;
+ }
+
+ /**
+ * Server Sets to access proxy services.
+ *
+ * <p>The <i>local</i> server set will be tried first then <i>remotes</i>.
+ *
+ * @param local local server set.
+ * @param remotes remote server sets.
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder serverSets(ServerSet local, ServerSet...remotes) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1];
+ builders[0] = RoutingUtils.buildRoutingService(local);
+ for (int i = 1; i < builders.length; i++) {
+ builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]);
+ }
+ newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder()
+ .resolver(regionResolver)
+ .routingServiceBuilders(builders);
+ newBuilder.enableRegionStats = remotes.length > 0;
+ return newBuilder;
+ }
+
+ /**
+ * Name to access proxy services.
+ *
+ * @param finagleNameStr
+ * finagle name string.
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder finagleNameStr(String finagleNameStr) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr);
+ newBuilder.enableRegionStats = false;
+ return newBuilder;
+ }
+
+ /**
+ * Finagle name strs to access proxy services.
+ *
+ * <p>The <i>local</i> finalge name str will be tried first, then <i>remotes</i>.
+ *
+ * @param local local server set.
+ * @param remotes remote server sets.
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder finagleNameStrs(String local, String...remotes) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1];
+ builders[0] = RoutingUtils.buildRoutingService(local);
+ for (int i = 1; i < builders.length; i++) {
+ builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]);
+ }
+ newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder()
+ .routingServiceBuilders(builders)
+ .resolver(regionResolver);
+ newBuilder.enableRegionStats = remotes.length > 0;
+ return newBuilder;
+ }
+
+ /**
+ * URI to access proxy services.
+ *
+ * <p>Assuming the write proxies are announced under `.write_proxy` of the provided namespace uri.
+ * The builder will convert the dl uri (e.g. distributedlog://{zkserver}/path/to/namespace) to
+ * zookeeper serverset based finagle name str (`zk!{zkserver}!/path/to/namespace/.write_proxy`)
+ *
+ * @param uri namespace uri to access the serverset of write proxies
+ * @return distributedlog builder
+ */
+ public DistributedLogClientBuilder uri(URI uri) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ String zkServers = uri.getAuthority().replace(";", ",");
+ String[] zkServerList = StringUtils.split(zkServers, ',');
+ String finagleNameStr = String.format(
+ "zk!%s!%s/.write_proxy",
+ zkServerList[random.nextInt(zkServerList.length)], // zk server
+ uri.getPath());
+ newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr);
+ newBuilder.enableRegionStats = false;
+ return newBuilder;
+ }
+
+ /**
+ * Address of write proxy to connect.
+ *
+ * @param address
+ * write proxy address.
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder host(SocketAddress address) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(address);
+ newBuilder.enableRegionStats = false;
+ return newBuilder;
+ }
+
+ private DistributedLogClientBuilder routingServiceBuilder(RoutingService.Builder builder) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.routingServiceBuilder = builder;
+ newBuilder.enableRegionStats = false;
+ return newBuilder;
+ }
+
+ /**
+ * Routing Service to access proxy services.
+ *
+ * @param routingService
+ * routing service
+ * @return client builder.
+ */
+ @VisibleForTesting
+ public DistributedLogClientBuilder routingService(RoutingService routingService) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(routingService);
+ newBuilder.enableRegionStats = false;
+ return newBuilder;
+ }
+
+ /**
+ * Stats receiver to expose client stats.
+ *
+ * @param statsReceiver
+ * stats receiver.
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder statsReceiver(StatsReceiver statsReceiver) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.statsReceiver = statsReceiver;
+ return newBuilder;
+ }
+
+ /**
+ * Stream Stats Receiver to expose per stream stats.
+ *
+ * @param streamStatsReceiver
+ * stream stats receiver
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder streamStatsReceiver(StatsReceiver streamStatsReceiver) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.streamStatsReceiver = streamStatsReceiver;
+ return newBuilder;
+ }
+
+ /**
+ * Set underlying finagle client builder.
+ *
+ * @param builder
+ * finagle client builder.
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder clientBuilder(ClientBuilder builder) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientBuilder = builder;
+ return newBuilder;
+ }
+
+ /**
+ * Backoff time when redirecting to an already retried host.
+ *
+ * @param ms
+ * backoff time.
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder redirectBackoffStartMs(int ms) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setRedirectBackoffStartMs(ms);
+ return newBuilder;
+ }
+
+ /**
+ * Max backoff time when redirecting to an already retried host.
+ *
+ * @param ms
+ * backoff time.
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder redirectBackoffMaxMs(int ms) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setRedirectBackoffMaxMs(ms);
+ return newBuilder;
+ }
+
+ /**
+ * Max redirects that is allowed per request.
+ *
+ * <p>If <i>redirects</i> are exhausted, fail the request immediately.
+ *
+ * @param redirects
+ * max redirects allowed before failing a request.
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder maxRedirects(int redirects) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setMaxRedirects(redirects);
+ return newBuilder;
+ }
+
+ /**
+ * Timeout per request in millis.
+ *
+ * @param timeoutMs
+ * timeout per request in millis.
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder requestTimeoutMs(int timeoutMs) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setRequestTimeoutMs(timeoutMs);
+ return newBuilder;
+ }
+
+ /**
+ * Set thriftmux enabled.
+ *
+ * @param enabled
+ * is thriftmux enabled
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder thriftmux(boolean enabled) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setThriftMux(enabled);
+ return newBuilder;
+ }
+
+ /**
+ * Set failfast stream exception handling enabled.
+ *
+ * @param enabled
+ * is failfast exception handling enabled
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder streamFailfast(boolean enabled) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setStreamFailfast(enabled);
+ return newBuilder;
+ }
+
+ /**
+ * Set the regex to match stream names that the client cares about.
+ *
+ * @param nameRegex
+ * stream name regex
+ * @return client builder
+ */
+ public DistributedLogClientBuilder streamNameRegex(String nameRegex) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setStreamNameRegex(nameRegex);
+ return newBuilder;
+ }
+
+ /**
+ * Whether to use the new handshake endpoint to exchange ownership cache.
+ *
+ * <p>Enable this when the servers are updated to support handshaking with client info.
+ *
+ * @param enabled
+ * new handshake endpoint is enabled.
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder handshakeWithClientInfo(boolean enabled) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setHandshakeWithClientInfo(enabled);
+ return newBuilder;
+ }
+
+ /**
+ * Set the periodic handshake interval in milliseconds.
+ *
+ * <p>Every <code>intervalMs</code>, the DL client will handshake with existing proxies again.
+ * If the interval is less than ownership sync interval, the handshake won't sync ownerships. Otherwise, it will.
+ *
+ * @see #periodicOwnershipSyncIntervalMs(long)
+ * @param intervalMs
+ * handshake interval
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder periodicHandshakeIntervalMs(long intervalMs) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setPeriodicHandshakeIntervalMs(intervalMs);
+ return newBuilder;
+ }
+
+ /**
+ * Set the periodic ownership sync interval in milliseconds.
+ *
+ * <p>If periodic handshake is enabled, the handshake will sync ownership if the elapsed time is larger than
+ * sync interval.
+ *
+ * @see #periodicHandshakeIntervalMs(long)
+ * @param intervalMs
+ * interval that handshake should sync ownerships.
+ * @return client builder
+ */
+ public DistributedLogClientBuilder periodicOwnershipSyncIntervalMs(long intervalMs) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setPeriodicOwnershipSyncIntervalMs(intervalMs);
+ return newBuilder;
+ }
+
+ /**
+ * Enable/Disable periodic dumping ownership cache.
+ *
+ * @param enabled
+ * flag to enable/disable periodic dumping ownership cache
+ * @return client builder.
+ */
+ public DistributedLogClientBuilder periodicDumpOwnershipCache(boolean enabled) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setPeriodicDumpOwnershipCacheEnabled(enabled);
+ return newBuilder;
+ }
+
+ /**
+ * Set periodic dumping ownership cache interval.
+ *
+ * @param intervalMs
+ * interval on dumping ownership cache, in millis.
+ * @return client builder
+ */
+ public DistributedLogClientBuilder periodicDumpOwnershipCacheIntervalMs(long intervalMs) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setPeriodicDumpOwnershipCacheIntervalMs(intervalMs);
+ return newBuilder;
+ }
+
+ /**
+ * Enable handshake tracing.
+ *
+ * @param enabled
+ * flag to enable/disable handshake tracing
+ * @return client builder
+ */
+ public DistributedLogClientBuilder handshakeTracing(boolean enabled) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setHandshakeTracingEnabled(enabled);
+ return newBuilder;
+ }
+
+ /**
+ * Enable checksum on requests to the proxy.
+ *
+ * @param enabled
+ * flag to enable/disable checksum
+ * @return client builder
+ */
+ public DistributedLogClientBuilder checksum(boolean enabled) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig.setChecksumEnabled(enabled);
+ return newBuilder;
+ }
+
+ /**
+ * Configure the finagle name string for the server-side routing service.
+ *
+ * @param nameStr name string of the server-side routing service
+ * @return client builder
+ */
+ public DistributedLogClientBuilder serverRoutingServiceFinagleNameStr(String nameStr) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.serverRoutingServiceFinagleName = nameStr;
+ return newBuilder;
+ }
+
+ DistributedLogClientBuilder clientConfig(ClientConfig clientConfig) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ newBuilder.clientConfig = ClientConfig.newConfig(clientConfig);
+ return newBuilder;
+ }
+
+ /**
+ * Build distributedlog client.
+ *
+ * @return distributedlog client.
+ */
+ public DistributedLogClient build() {
+ return buildClient();
+ }
+
+ /**
+ * Build monitor service client.
+ *
+ * @return monitor service client.
+ */
+ public MonitorServiceClient buildMonitorClient() {
+
+ return buildClient();
+ }
+
+ @SuppressWarnings("unchecked")
+ ClusterClient buildServerRoutingServiceClient(String serverRoutingServiceFinagleName) {
+ ClientBuilder builder = this.clientBuilder;
+ if (null == builder) {
+ builder = ClientBuilder.get()
+ .tcpConnectTimeout(Duration.fromMilliseconds(200))
+ .connectTimeout(Duration.fromMilliseconds(200))
+ .requestTimeout(Duration.fromSeconds(1))
+ .retries(20);
+ if (!clientConfig.getThriftMux()) {
+ builder = builder.hostConnectionLimit(1);
+ }
+ }
+ if (clientConfig.getThriftMux()) {
+ builder = builder.stack(ThriftMux.client().withClientId(clientId));
+ } else {
+ builder = builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId)));
+ }
+
+ Name name;
+ try {
+ name = Resolver$.MODULE$.eval(serverRoutingServiceFinagleName);
+ } catch (Exception exc) {
+ logger.error("Exception in Resolver.eval for name {}", serverRoutingServiceFinagleName, exc);
+ throw new RuntimeException(exc);
+ }
+
+ // builder the client
+ Service<ThriftClientRequest, byte[]> client =
+ ClientBuilder.safeBuildFactory(
+ builder.dest(name).reportTo(statsReceiver.scope("routing"))
+ ).toService();
+ DistributedLogService.ServiceIface service =
+ new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory());
+ return new ClusterClient(client, service);
+ }
+
+ DistributedLogClientImpl buildClient() {
+ checkNotNull(name, "No name provided.");
+ checkNotNull(clientId, "No client id provided.");
+ checkNotNull(routingServiceBuilder, "No routing service builder provided.");
+ checkNotNull(statsReceiver, "No stats receiver provided.");
+ if (null == streamStatsReceiver) {
+ streamStatsReceiver = new NullStatsReceiver();
+ }
+
+ Optional<ClusterClient> serverRoutingServiceClient = Optional.absent();
+ if (null != serverRoutingServiceFinagleName) {
+ serverRoutingServiceClient = Optional.of(
+ buildServerRoutingServiceClient(serverRoutingServiceFinagleName));
+ }
+
+ RoutingService routingService = routingServiceBuilder
+ .statsReceiver(statsReceiver.scope("routing"))
+ .build();
+ DistributedLogClientImpl clientImpl =
+ new DistributedLogClientImpl(
+ name,
+ clientId,
+ routingService,
+ clientBuilder,
+ clientConfig,
+ serverRoutingServiceClient,
+ statsReceiver,
+ streamStatsReceiver,
+ regionResolver,
+ enableRegionStats);
+ routingService.startService();
+ clientImpl.handshake();
+ return clientImpl;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java
new file mode 100644
index 0000000..033882f
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Service Client.
+ */
+package org.apache.distributedlog.service;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/resources/findbugsExclude.xml b/distributedlog-client/src/main/resources/findbugsExclude.xml
index 29e1a16..05ee085 100644
--- a/distributedlog-client/src/main/resources/findbugsExclude.xml
+++ b/distributedlog-client/src/main/resources/findbugsExclude.xml
@@ -18,6 +18,6 @@
<FindBugsFilter>
<Match>
<!-- generated code, we can't be held responsible for findbugs in it //-->
- <Class name="~com\.twitter\.distributedlog\.thrift.*" />
+ <Class name="~org\.apache\.distributedlog\.thrift.*" />
</Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java
deleted file mode 100644
index b302439..0000000
--- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.LogRecord;
-import com.twitter.distributedlog.LogRecordSet;
-import com.twitter.distributedlog.LogRecordSetBuffer;
-import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
-import com.twitter.distributedlog.io.CompressionCodec;
-import com.twitter.distributedlog.service.DistributedLogClient;
-import com.twitter.finagle.IndividualRequestTimeoutException;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Test {@link DistributedLogMultiStreamWriter}.
- */
-public class TestDistributedLogMultiStreamWriter {
-
- @Test(timeout = 20000, expected = IllegalArgumentException.class)
- public void testBuildWithNullStreams() throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .build();
- }
-
- @Test(timeout = 20000, expected = IllegalArgumentException.class)
- public void testBuildWithEmptyStreamList() throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.<String>newArrayList())
- .build();
- }
-
- @Test(timeout = 20000, expected = NullPointerException.class)
- public void testBuildWithNullClient() throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .build();
- }
-
- @Test(timeout = 20000, expected = NullPointerException.class)
- public void testBuildWithNullCodec() throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(null)
- .build();
- }
-
- @Test(timeout = 20000, expected = IllegalArgumentException.class)
- public void testBuildWithInvalidSpeculativeSettings1()
- throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(-1)
- .build();
- }
-
- @Test(timeout = 20000, expected = IllegalArgumentException.class)
- public void testBuildWithInvalidSpeculativeSettings2()
- throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(5)
- .build();
- }
-
- @Test(timeout = 20000, expected = IllegalArgumentException.class)
- public void testBuildWithInvalidSpeculativeSettings3()
- throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(-1)
- .build();
- }
-
- @Test(timeout = 20000, expected = IllegalArgumentException.class)
- public void testBuildWithInvalidSpeculativeSettings4()
- throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(10)
- .build();
- }
-
- @Test(timeout = 20000)
- public void testBuildMultiStreamWriter()
- throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(50)
- .build();
- assertTrue(true);
- }
-
- @Test(timeout = 20000)
- public void testBuildWithPeriodicalFlushEnabled() throws Exception {
- ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(50)
- .flushIntervalMs(1000)
- .scheduler(executorService)
- .build();
- verify(executorService, times(1)).scheduleAtFixedRate(writer, 1000000, 1000000, TimeUnit.MICROSECONDS);
- }
-
- @Test(timeout = 20000)
- public void testBuildWithPeriodicalFlushDisabled() throws Exception {
- ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(50)
- .flushIntervalMs(0)
- .scheduler(executorService)
- .build();
- verify(executorService, times(0)).scheduleAtFixedRate(writer, 1000, 1000, TimeUnit.MILLISECONDS);
- writer.close();
- }
-
- @Test(timeout = 20000)
- public void testFlushWhenBufferIsFull() throws Exception {
- DistributedLogClient client = mock(DistributedLogClient.class);
- when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
- .thenReturn(Future.value(new DLSN(1L, 1L, 999L)));
-
- ScheduledExecutorService executorService =
- Executors.newSingleThreadScheduledExecutor();
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(client)
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(100000)
- .maxSpeculativeTimeoutMs(200000)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(500000)
- .flushIntervalMs(0)
- .bufferSize(0)
- .scheduler(executorService)
- .build();
-
- ByteBuffer buffer = ByteBuffer.wrap("test".getBytes(UTF_8));
- writer.write(buffer);
-
- verify(client, times(1)).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
-
- writer.close();
- }
-
- @Test(timeout = 20000)
- public void testFlushWhenExceedMaxLogRecordSetSize()
- throws Exception {
- DistributedLogClient client = mock(DistributedLogClient.class);
- when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
- .thenReturn(Future.value(new DLSN(1L, 1L, 999L)));
- ScheduledExecutorService executorService =
- Executors.newSingleThreadScheduledExecutor();
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(client)
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(100000)
- .maxSpeculativeTimeoutMs(200000)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(500000)
- .flushIntervalMs(0)
- .bufferSize(Integer.MAX_VALUE)
- .scheduler(executorService)
- .build();
-
- byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE - 3 * 100];
- ByteBuffer buffer1 = ByteBuffer.wrap(data);
- writer.write(buffer1);
- verify(client, times(0)).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
- LogRecordSet.Writer recordSetWriter1 = writer.getLogRecordSetWriter();
- assertEquals(1, recordSetWriter1.getNumRecords());
- assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, recordSetWriter1.getNumBytes());
-
- ByteBuffer buffer2 = ByteBuffer.wrap(data);
- writer.write(buffer2);
- verify(client, times(1)).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
- LogRecordSet.Writer recordSetWriter2 = writer.getLogRecordSetWriter();
- assertEquals(1, recordSetWriter2.getNumRecords());
- assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, recordSetWriter2.getNumBytes());
- assertTrue(recordSetWriter1 != recordSetWriter2);
-
- writer.close();
- }
-
- @Test(timeout = 20000)
- public void testWriteTooLargeRecord() throws Exception {
- DistributedLogClient client = mock(DistributedLogClient.class);
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(client)
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(100000)
- .maxSpeculativeTimeoutMs(200000)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(5000000)
- .flushIntervalMs(0)
- .bufferSize(0)
- .build();
-
- byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE + 10];
- ByteBuffer buffer = ByteBuffer.wrap(data);
- Future<DLSN> writeFuture = writer.write(buffer);
- assertTrue(writeFuture.isDefined());
- try {
- Await.result(writeFuture);
- fail("Should fail on writing too long record");
- } catch (LogRecordTooLongException lrtle) {
- // expected
- }
- writer.close();
- }
-
- @Test(timeout = 20000)
- public void testSpeculativeWrite() throws Exception {
- DistributedLogClient client = mock(DistributedLogClient.class);
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(client)
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(5000000)
- .flushIntervalMs(0)
- .bufferSize(0)
- .build();
-
- final String secondStream = writer.getStream(1);
-
- final DLSN dlsn = new DLSN(99L, 88L, 0L);
-
- Mockito.doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- Object[] arguments = invocation.getArguments();
- String stream = (String) arguments[0];
- if (stream.equals(secondStream)) {
- return Future.value(dlsn);
- } else {
- return new Promise<DLSN>();
- }
- }
- }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
-
- byte[] data = "test-test".getBytes(UTF_8);
- ByteBuffer buffer = ByteBuffer.wrap(data);
- Future<DLSN> writeFuture = writer.write(buffer);
- DLSN writeDLSN = Await.result(writeFuture);
- assertEquals(dlsn, writeDLSN);
- writer.close();
- }
-
- @Test(timeout = 20000)
- public void testPeriodicalFlush() throws Exception {
- DistributedLogClient client = mock(DistributedLogClient.class);
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(client)
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(5000000)
- .flushIntervalMs(10)
- .bufferSize(Integer.MAX_VALUE)
- .build();
-
- final DLSN dlsn = new DLSN(99L, 88L, 0L);
-
- Mockito.doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- return Future.value(dlsn);
- }
- }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
-
- byte[] data = "test-test".getBytes(UTF_8);
- ByteBuffer buffer = ByteBuffer.wrap(data);
- Future<DLSN> writeFuture = writer.write(buffer);
- DLSN writeDLSN = Await.result(writeFuture);
- assertEquals(dlsn, writeDLSN);
- writer.close();
- }
-
- @Test(timeout = 20000)
- public void testFailRequestAfterRetriedAllStreams() throws Exception {
- DistributedLogClient client = mock(DistributedLogClient.class);
- when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
- .thenReturn(new Promise<DLSN>());
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(client)
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(5000000)
- .flushIntervalMs(10)
- .bufferSize(Integer.MAX_VALUE)
- .build();
-
- byte[] data = "test-test".getBytes(UTF_8);
- ByteBuffer buffer = ByteBuffer.wrap(data);
- Future<DLSN> writeFuture = writer.write(buffer);
- try {
- Await.result(writeFuture);
- fail("Should fail the request after retries all streams");
- } catch (IndividualRequestTimeoutException e) {
- long timeoutMs = e.timeout().inMilliseconds();
- assertTrue(timeoutMs >= (10 + 20) && timeoutMs < 5000000);
- }
- writer.close();
- }
-}