You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:48 UTC
[43/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
new file mode 100644
index 0000000..b3f3368
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecordSet;
+import org.apache.distributedlog.LogRecordSetBuffer;
+import org.apache.distributedlog.client.speculative.DefaultSpeculativeRequestExecutionPolicy;
+import org.apache.distributedlog.client.speculative.SpeculativeRequestExecutionPolicy;
+import org.apache.distributedlog.client.speculative.SpeculativeRequestExecutor;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.service.DistributedLogClient;
+import com.twitter.finagle.IndividualRequestTimeoutException;
+import com.twitter.util.Duration;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Write to multiple streams.
+ */
+public class DistributedLogMultiStreamWriter implements Runnable {
+
+ /**
+ * Create a new builder to create a multi stream writer.
+ *
+ * @return a new builder to create a multi stream writer.
+ */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for the multi stream writer.
+ */
+ public static class Builder {
+
+ private DistributedLogClient client = null;
+ private List<String> streams = null;
+ private int bufferSize = 16 * 1024; // 16k
+ private long flushIntervalMicros = 2000; // 2ms
+ private CompressionCodec.Type codec = CompressionCodec.Type.NONE;
+ private ScheduledExecutorService executorService = null;
+ private long requestTimeoutMs = 500; // 500ms
+ private int firstSpeculativeTimeoutMs = 50; // 50ms
+ private int maxSpeculativeTimeoutMs = 200; // 200ms
+ private float speculativeBackoffMultiplier = 2;
+ private Ticker ticker = Ticker.systemTicker();
+
+ private Builder() {}
+
+ /**
+ * Set the distributedlog client used for multi stream writer.
+ *
+ * @param client
+ * distributedlog client
+ * @return builder
+ */
+ public Builder client(DistributedLogClient client) {
+ this.client = client;
+ return this;
+ }
+
+ /**
+ * Set the list of streams to write to.
+ *
+ * @param streams
+ * list of streams to write
+ * @return builder
+ */
+ public Builder streams(List<String> streams) {
+ this.streams = streams;
+ return this;
+ }
+
+ /**
+ * Set the output buffer size.
+ *
+ * <p>If output buffer size is 0, the writes will be transmitted to
+ * wire immediately.
+ *
+ * @param bufferSize
+ * output buffer size
+ * @return builder
+ */
+ public Builder bufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ /**
+ * Set the flush interval in milliseconds.
+ *
+ * @param flushIntervalMs
+ * flush interval in milliseconds.
+ * @return builder
+ */
+ public Builder flushIntervalMs(int flushIntervalMs) {
+ this.flushIntervalMicros = TimeUnit.MILLISECONDS.toMicros(flushIntervalMs);
+ return this;
+ }
+
+ /**
+ * Set the flush interval in microseconds.
+ *
+ * @param flushIntervalMicros
+ * flush interval in microseconds.
+ * @return builder
+ */
+ public Builder flushIntervalMicros(int flushIntervalMicros) {
+ this.flushIntervalMicros = flushIntervalMicros;
+ return this;
+ }
+
+ /**
+ * Set compression codec.
+ *
+ * @param codec compression codec.
+ * @return builder
+ */
+ public Builder compressionCodec(CompressionCodec.Type codec) {
+ this.codec = codec;
+ return this;
+ }
+
+ /**
+ * Set the scheduler to flush output buffers.
+ *
+ * @param executorService
+ * executor service to flush output buffers.
+ * @return builder
+ */
+ public Builder scheduler(ScheduledExecutorService executorService) {
+ this.executorService = executorService;
+ return this;
+ }
+
+ /**
+ * Set request timeout in milliseconds.
+ *
+ * @param requestTimeoutMs
+ * request timeout in milliseconds.
+ * @return builder
+ */
+ public Builder requestTimeoutMs(long requestTimeoutMs) {
+ this.requestTimeoutMs = requestTimeoutMs;
+ return this;
+ }
+
+ /**
+ * Set the first speculative timeout in milliseconds.
+ *
+ * <p>The multi-streams writer does speculative writes on streams.
+ * The write issues first write request to a stream, if the write request
+ * doesn't respond within speculative timeout. it issues next write request
+ * to a different stream. It does such speculative retries until receive
+ * a success or request timeout ({@link #requestTimeoutMs(long)}).
+ *
+ * <p>This setting is to configure the first speculative timeout, in milliseconds.
+ *
+ * @param timeoutMs
+ * timeout in milliseconds
+ * @return builder
+ */
+ public Builder firstSpeculativeTimeoutMs(int timeoutMs) {
+ this.firstSpeculativeTimeoutMs = timeoutMs;
+ return this;
+ }
+
+ /**
+ * Set the max speculative timeout in milliseconds.
+ *
+ * <p>The multi-streams writer does speculative writes on streams.
+ * The write issues first write request to a stream, if the write request
+ * doesn't respond within speculative timeout. it issues next write request
+ * to a different stream. It does such speculative retries until receive
+ * a success or request timeout ({@link #requestTimeoutMs(long)}).
+ *
+ * <p>This setting is to configure the max speculative timeout, in milliseconds.
+ *
+ * @param timeoutMs
+ * timeout in milliseconds
+ * @return builder
+ */
+ public Builder maxSpeculativeTimeoutMs(int timeoutMs) {
+ this.maxSpeculativeTimeoutMs = timeoutMs;
+ return this;
+ }
+
+ /**
+ * Set the speculative timeout backoff multiplier.
+ *
+ * <p>The multi-streams writer does speculative writes on streams.
+ * The write issues first write request to a stream, if the write request
+ * doesn't respond within speculative timeout. it issues next write request
+ * to a different stream. It does such speculative retries until receive
+ * a success or request timeout ({@link #requestTimeoutMs(long)}).
+ *
+ * <p>This setting is to configure the speculative timeout backoff multiplier.
+ *
+ * @param multiplier
+ * backoff multiplier
+ * @return builder
+ */
+ public Builder speculativeBackoffMultiplier(float multiplier) {
+ this.speculativeBackoffMultiplier = multiplier;
+ return this;
+ }
+
+ /**
+ * Ticker for timing.
+ *
+ * @param ticker
+ * ticker
+ * @return builder
+ * @see Ticker
+ */
+ public Builder clockTicker(Ticker ticker) {
+ this.ticker = ticker;
+ return this;
+ }
+
+ /**
+ * Build the multi stream writer.
+ *
+ * @return the multi stream writer.
+ */
+ public DistributedLogMultiStreamWriter build() {
+ checkArgument((null != streams && !streams.isEmpty()),
+ "No streams provided");
+ checkNotNull(client,
+ "No distributedlog client provided");
+ checkNotNull(codec,
+ "No compression codec provided");
+ checkArgument(firstSpeculativeTimeoutMs > 0
+ && firstSpeculativeTimeoutMs <= maxSpeculativeTimeoutMs
+ && speculativeBackoffMultiplier > 0
+ && maxSpeculativeTimeoutMs < requestTimeoutMs,
+ "Invalid speculative timeout settings");
+ return new DistributedLogMultiStreamWriter(
+ streams,
+ client,
+ Math.min(bufferSize, MAX_LOGRECORDSET_SIZE),
+ flushIntervalMicros,
+ requestTimeoutMs,
+ firstSpeculativeTimeoutMs,
+ maxSpeculativeTimeoutMs,
+ speculativeBackoffMultiplier,
+ codec,
+ ticker,
+ executorService);
+ }
+ }
+
+ /**
+ * Pending Write Request.
+ */
+ class PendingWriteRequest implements FutureEventListener<DLSN>,
+ SpeculativeRequestExecutor {
+
+ private final LogRecordSetBuffer recordSet;
+ private AtomicBoolean complete = new AtomicBoolean(false);
+ private final Stopwatch stopwatch = Stopwatch.createStarted(clockTicker);
+ private int nextStream;
+ private int numTriedStreams = 0;
+
+ PendingWriteRequest(LogRecordSetBuffer recordSet) {
+ this.recordSet = recordSet;
+ this.nextStream = Math.abs(nextStreamId.incrementAndGet()) % numStreams;
+ }
+
+ synchronized String sendNextWrite() {
+ long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ if (elapsedMs > requestTimeoutMs || numTriedStreams >= numStreams) {
+ fail(new IndividualRequestTimeoutException(Duration.fromMilliseconds(elapsedMs)));
+ return null;
+ }
+ try {
+ return sendWriteToStream(nextStream);
+ } finally {
+ nextStream = (nextStream + 1) % numStreams;
+ ++numTriedStreams;
+ }
+ }
+
+ synchronized String sendWriteToStream(int streamId) {
+ String stream = getStream(streamId);
+ client.writeRecordSet(stream, recordSet)
+ .addEventListener(this);
+ return stream;
+ }
+
+ @Override
+ public void onSuccess(DLSN dlsn) {
+ if (!complete.compareAndSet(false, true)) {
+ return;
+ }
+ recordSet.completeTransmit(
+ dlsn.getLogSegmentSequenceNo(),
+ dlsn.getEntryId(),
+ dlsn.getSlotId());
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ sendNextWrite();
+ }
+
+ private void fail(Throwable cause) {
+ if (!complete.compareAndSet(false, true)) {
+ return;
+ }
+ recordSet.abortTransmit(cause);
+ }
+
+ @Override
+ public Future<Boolean> issueSpeculativeRequest() {
+ return Future.value(!complete.get() && null != sendNextWrite());
+ }
+ }
+
+ private final int numStreams;
+ private final List<String> streams;
+ private final DistributedLogClient client;
+ private final int bufferSize;
+ private final long requestTimeoutMs;
+ private final SpeculativeRequestExecutionPolicy speculativePolicy;
+ private final Ticker clockTicker;
+ private final CompressionCodec.Type codec;
+ private final ScheduledExecutorService scheduler;
+ private final boolean ownScheduler;
+ private final AtomicInteger nextStreamId;
+ private LogRecordSet.Writer recordSetWriter;
+
+ private DistributedLogMultiStreamWriter(List<String> streams,
+ DistributedLogClient client,
+ int bufferSize,
+ long flushIntervalMicros,
+ long requestTimeoutMs,
+ int firstSpecultiveTimeoutMs,
+ int maxSpeculativeTimeoutMs,
+ float speculativeBackoffMultiplier,
+ CompressionCodec.Type codec,
+ Ticker clockTicker,
+ ScheduledExecutorService scheduler) {
+ this.streams = Lists.newArrayList(streams);
+ this.numStreams = this.streams.size();
+ this.client = client;
+ this.bufferSize = bufferSize;
+ this.requestTimeoutMs = requestTimeoutMs;
+ this.codec = codec;
+ this.clockTicker = clockTicker;
+ if (null == scheduler) {
+ this.scheduler = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("MultiStreamWriterFlushThread-%d")
+ .build());
+ this.ownScheduler = true;
+ } else {
+ this.scheduler = scheduler;
+ this.ownScheduler = false;
+ }
+ this.speculativePolicy = new DefaultSpeculativeRequestExecutionPolicy(
+ firstSpecultiveTimeoutMs,
+ maxSpeculativeTimeoutMs,
+ speculativeBackoffMultiplier);
+ // shuffle the streams
+ Collections.shuffle(this.streams);
+ this.nextStreamId = new AtomicInteger(0);
+ this.recordSetWriter = newRecordSetWriter();
+
+ if (flushIntervalMicros > 0) {
+ this.scheduler.scheduleAtFixedRate(
+ this,
+ flushIntervalMicros,
+ flushIntervalMicros,
+ TimeUnit.MICROSECONDS);
+ }
+ }
+
+ String getStream(int streamId) {
+ return streams.get(streamId);
+ }
+
+ synchronized LogRecordSet.Writer getLogRecordSetWriter() {
+ return recordSetWriter;
+ }
+
+ private LogRecordSet.Writer newRecordSetWriter() {
+ return LogRecordSet.newWriter(
+ bufferSize,
+ codec);
+ }
+
+ public synchronized Future<DLSN> write(ByteBuffer buffer) {
+ int logRecordSize = buffer.remaining();
+ if (logRecordSize > MAX_LOGRECORD_SIZE) {
+ return Future.exception(new LogRecordTooLongException(
+ "Log record of size " + logRecordSize + " written when only "
+ + MAX_LOGRECORD_SIZE + " is allowed"));
+ }
+ // if exceed max number of bytes
+ if ((recordSetWriter.getNumBytes() + logRecordSize) > MAX_LOGRECORDSET_SIZE) {
+ flush();
+ }
+ Promise<DLSN> writePromise = new Promise<DLSN>();
+ try {
+ recordSetWriter.writeRecord(buffer, writePromise);
+ } catch (LogRecordTooLongException e) {
+ return Future.exception(e);
+ } catch (WriteException e) {
+ recordSetWriter.abortTransmit(e);
+ recordSetWriter = newRecordSetWriter();
+ return Future.exception(e);
+ }
+ if (recordSetWriter.getNumBytes() >= bufferSize) {
+ flush();
+ }
+ return writePromise;
+ }
+
+ @Override
+ public void run() {
+ flush();
+ }
+
+ private void flush() {
+ LogRecordSet.Writer recordSetToFlush;
+ synchronized (this) {
+ if (recordSetWriter.getNumRecords() == 0) {
+ return;
+ }
+ recordSetToFlush = recordSetWriter;
+ recordSetWriter = newRecordSetWriter();
+ }
+ transmit(recordSetToFlush);
+ }
+
+ private void transmit(LogRecordSet.Writer recordSetToFlush) {
+ PendingWriteRequest writeRequest =
+ new PendingWriteRequest(recordSetToFlush);
+ this.speculativePolicy.initiateSpeculativeRequest(scheduler, writeRequest);
+ }
+
+ public void close() {
+ if (ownScheduler) {
+ this.scheduler.shutdown();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/MonitorServiceClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/MonitorServiceClient.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/MonitorServiceClient.java
new file mode 100644
index 0000000..ed6269b
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/MonitorServiceClient.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.monitor;
+
+import com.twitter.util.Future;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Interface for distributedlog monitor service.
+ */
+public interface MonitorServiceClient {
+
+ /**
+ * Check a given stream.
+ *
+ * @param stream
+ * stream.
+ * @return check result.
+ */
+ Future<Void> check(String stream);
+
+ /**
+ * Send heartbeat to the stream and its readers.
+ *
+ * @param stream
+ * stream.
+ * @return check result.
+ */
+ Future<Void> heartbeat(String stream);
+
+ /**
+ * Get current ownership distribution from current monitor service view.
+ *
+ * @return current ownership distribution
+ */
+ Map<SocketAddress, Set<String>> getStreamOwnershipDistribution();
+
+ /**
+ * Enable/Disable accepting new stream on a given proxy.
+ *
+ * @param enabled
+ * flag to enable/disable accepting new streams on a given proxy
+ * @return void
+ */
+ Future<Void> setAcceptNewStream(boolean enabled);
+
+ /**
+ * Close the client.
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/package-info.java
new file mode 100644
index 0000000..d7e2c94
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Monitor Client.
+ */
+package org.apache.distributedlog.client.monitor;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/OwnershipCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/OwnershipCache.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/OwnershipCache.java
new file mode 100644
index 0000000..f3c24ca
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/OwnershipCache.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.ownership;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.distributedlog.client.ClientConfig;
+import org.apache.distributedlog.client.stats.OwnershipStatsLogger;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.net.SocketAddress;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client Side Ownership Cache.
+ */
+public class OwnershipCache implements TimerTask {
+
+ private static final Logger logger = LoggerFactory.getLogger(OwnershipCache.class);
+
+ private final ConcurrentHashMap<String, SocketAddress> stream2Addresses =
+ new ConcurrentHashMap<String, SocketAddress>();
+ private final ConcurrentHashMap<SocketAddress, Set<String>> address2Streams =
+ new ConcurrentHashMap<SocketAddress, Set<String>>();
+ private final ClientConfig clientConfig;
+ private final HashedWheelTimer timer;
+
+ // Stats
+ private final OwnershipStatsLogger ownershipStatsLogger;
+
+ public OwnershipCache(ClientConfig clientConfig,
+ HashedWheelTimer timer,
+ StatsReceiver statsReceiver,
+ StatsReceiver streamStatsReceiver) {
+ this.clientConfig = clientConfig;
+ this.timer = timer;
+ this.ownershipStatsLogger = new OwnershipStatsLogger(statsReceiver, streamStatsReceiver);
+ scheduleDumpOwnershipCache();
+ }
+
+ private void scheduleDumpOwnershipCache() {
+ if (clientConfig.isPeriodicDumpOwnershipCacheEnabled()
+ && clientConfig.getPeriodicDumpOwnershipCacheIntervalMs() > 0) {
+ timer.newTimeout(this, clientConfig.getPeriodicDumpOwnershipCacheIntervalMs(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (timeout.isCancelled()) {
+ return;
+ }
+ logger.info("Ownership cache : {} streams cached, {} hosts cached",
+ stream2Addresses.size(), address2Streams.size());
+ logger.info("Cached streams : {}", stream2Addresses);
+ scheduleDumpOwnershipCache();
+ }
+
+ public OwnershipStatsLogger getOwnershipStatsLogger() {
+ return ownershipStatsLogger;
+ }
+
+ /**
+ * Update ownership of <i>stream</i> to <i>addr</i>.
+ *
+ * @param stream
+ * Stream Name.
+ * @param addr
+ * Owner Address.
+ * @return true if owner is updated
+ */
+ public boolean updateOwner(String stream, SocketAddress addr) {
+ // update ownership
+ SocketAddress oldAddr = stream2Addresses.putIfAbsent(stream, addr);
+ if (null != oldAddr && oldAddr.equals(addr)) {
+ return true;
+ }
+ if (null != oldAddr) {
+ if (stream2Addresses.replace(stream, oldAddr, addr)) {
+ // Store the relevant mappings for this topic and host combination
+ logger.info("Storing ownership for stream : {}, old host : {}, new host : {}.",
+ new Object[] { stream, oldAddr, addr });
+ StringBuilder sb = new StringBuilder();
+ sb.append("Ownership changed '")
+ .append(oldAddr).append("' -> '").append(addr).append("'");
+ removeOwnerFromStream(stream, oldAddr, sb.toString());
+
+ // update stats
+ ownershipStatsLogger.onRemove(stream);
+ ownershipStatsLogger.onAdd(stream);
+ } else {
+ logger.warn("Ownership of stream : {} has been changed from {} to {} when storing host : {}.",
+ new Object[] { stream, oldAddr, stream2Addresses.get(stream), addr });
+ return false;
+ }
+ } else {
+ logger.info("Storing ownership for stream : {}, host : {}.", stream, addr);
+ // update stats
+ ownershipStatsLogger.onAdd(stream);
+ }
+
+ Set<String> streamsForHost = address2Streams.get(addr);
+ if (null == streamsForHost) {
+ Set<String> newStreamsForHost = new HashSet<String>();
+ streamsForHost = address2Streams.putIfAbsent(addr, newStreamsForHost);
+ if (null == streamsForHost) {
+ streamsForHost = newStreamsForHost;
+ }
+ }
+ synchronized (streamsForHost) {
+ // check whether the ownership changed, since it might happend after replace succeed
+ if (addr.equals(stream2Addresses.get(stream))) {
+ streamsForHost.add(stream);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Get the cached owner for stream <code>stream</code>.
+ *
+ * @param stream
+ * stream to lookup ownership
+ * @return owner's address
+ */
+ public SocketAddress getOwner(String stream) {
+ SocketAddress address = stream2Addresses.get(stream);
+ if (null == address) {
+ ownershipStatsLogger.onMiss(stream);
+ } else {
+ ownershipStatsLogger.onHit(stream);
+ }
+ return address;
+ }
+
+ /**
+ * Remove the owner <code>addr</code> from <code>stream</code> for a given <code>reason</code>.
+ *
+ * @param stream stream name
+ * @param addr owner address
+ * @param reason reason to remove ownership
+ */
+ public void removeOwnerFromStream(String stream, SocketAddress addr, String reason) {
+ if (stream2Addresses.remove(stream, addr)) {
+ logger.info("Removed stream to host mapping for (stream: {} -> host: {}) : reason = '{}'.",
+ new Object[] { stream, addr, reason });
+ }
+ Set<String> streamsForHost = address2Streams.get(addr);
+ if (null != streamsForHost) {
+ synchronized (streamsForHost) {
+ if (streamsForHost.remove(stream)) {
+ logger.info("Removed stream ({}) from host {} : reason = '{}'.",
+ new Object[] { stream, addr, reason });
+ if (streamsForHost.isEmpty()) {
+ address2Streams.remove(addr, streamsForHost);
+ }
+ ownershipStatsLogger.onRemove(stream);
+ }
+ }
+ }
+ }
+
+ /**
+ * Remove all streams from host <code>addr</code>.
+ *
+ * @param addr
+ * host to remove ownerships
+ */
+ public void removeAllStreamsFromOwner(SocketAddress addr) {
+ logger.info("Remove streams mapping for host {}", addr);
+ Set<String> streamsForHost = address2Streams.get(addr);
+ if (null != streamsForHost) {
+ synchronized (streamsForHost) {
+ for (String s : streamsForHost) {
+ if (stream2Addresses.remove(s, addr)) {
+ logger.info("Removing mapping for stream : {} from host : {}", s, addr);
+ ownershipStatsLogger.onRemove(s);
+ }
+ }
+ address2Streams.remove(addr, streamsForHost);
+ }
+ }
+ }
+
+ /**
+ * Get the number cached streams.
+ *
+ * @return number cached streams.
+ */
+ public int getNumCachedStreams() {
+ return stream2Addresses.size();
+ }
+
+ /**
+ * Get the stream ownership distribution across proxies.
+ *
+ * @return stream ownership distribution
+ */
+ public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() {
+ return ImmutableMap.copyOf(address2Streams);
+ }
+
+ /**
+ * Get the stream ownership mapping.
+ *
+ * @return stream ownership mapping.
+ */
+ public Map<String, SocketAddress> getStreamOwnerMapping() {
+ return stream2Addresses;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/package-info.java
new file mode 100644
index 0000000..486bd6f
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utils for managing ownership at client side.
+ */
+package org.apache.distributedlog.client.ownership;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/package-info.java
new file mode 100644
index 0000000..d22b0da
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Client.
+ */
+package org.apache.distributedlog.client;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ClusterClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ClusterClient.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ClusterClient.java
new file mode 100644
index 0000000..9b5c7f6
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ClusterClient.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import com.twitter.finagle.Service;
+import com.twitter.finagle.thrift.ThriftClientRequest;
+import com.twitter.util.Future;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Cluster client.
+ */
+public class ClusterClient {
+
+ private final Service<ThriftClientRequest, byte[]> client;
+ private final DistributedLogService.ServiceIface service;
+
+ public ClusterClient(Service<ThriftClientRequest, byte[]> client,
+ DistributedLogService.ServiceIface service) {
+ this.client = client;
+ this.service = service;
+ }
+
+ public Service<ThriftClientRequest, byte[]> getClient() {
+ return client;
+ }
+
+ public DistributedLogService.ServiceIface getService() {
+ return service;
+ }
+
+ public Future<BoxedUnit> close() {
+ return client.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/HostProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/HostProvider.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/HostProvider.java
new file mode 100644
index 0000000..769cca8
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/HostProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import java.net.SocketAddress;
+import java.util.Set;
+
+/**
+ * Provider to provider list of hosts for handshaking.
+ */
+public interface HostProvider {
+
+ /**
+ * Get the list of hosts for handshaking.
+ *
+ * @return list of hosts for handshaking.
+ */
+ Set<SocketAddress> getHosts();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
new file mode 100644
index 0000000..6ef1d8e
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import org.apache.distributedlog.client.ClientConfig;
+import org.apache.distributedlog.client.stats.ClientStats;
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import com.twitter.finagle.Service;
+import com.twitter.finagle.ThriftMux;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId;
+import com.twitter.finagle.thrift.ThriftClientFramedCodec;
+import com.twitter.finagle.thrift.ThriftClientRequest;
+import com.twitter.util.Duration;
+import com.twitter.util.Future;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import scala.Option;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Client talks to a single proxy.
+ */
+public class ProxyClient {
+
+ /**
+ * Builder to build a proxy client talking to given host <code>address</code>.
+ */
+ public interface Builder {
+ /**
+ * Build a proxy client to <code>address</code>.
+ *
+ * @param address
+ * proxy address
+ * @return proxy client
+ */
+ ProxyClient build(SocketAddress address);
+ }
+
+ public static Builder newBuilder(String clientName,
+ ClientId clientId,
+ ClientBuilder clientBuilder,
+ ClientConfig clientConfig,
+ ClientStats clientStats) {
+ return new DefaultBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats);
+ }
+
+ /**
+ * Default Builder for {@link ProxyClient}.
+ */
+ public static class DefaultBuilder implements Builder {
+
+ private final String clientName;
+ private final ClientId clientId;
+ private final ClientBuilder clientBuilder;
+ private final ClientStats clientStats;
+
+ private DefaultBuilder(String clientName,
+ ClientId clientId,
+ ClientBuilder clientBuilder,
+ ClientConfig clientConfig,
+ ClientStats clientStats) {
+ this.clientName = clientName;
+ this.clientId = clientId;
+ this.clientStats = clientStats;
+ // client builder
+ ClientBuilder builder = setDefaultSettings(
+ null == clientBuilder ? getDefaultClientBuilder(clientConfig) : clientBuilder);
+ this.clientBuilder = configureThriftMux(builder, clientId, clientConfig);
+ }
+
+ @SuppressWarnings("unchecked")
+ private ClientBuilder configureThriftMux(ClientBuilder builder,
+ ClientId clientId,
+ ClientConfig clientConfig) {
+ if (clientConfig.getThriftMux()) {
+ return builder.stack(ThriftMux.client().withClientId(clientId));
+ } else {
+ return builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId)));
+ }
+ }
+
+ private ClientBuilder getDefaultClientBuilder(ClientConfig clientConfig) {
+ ClientBuilder builder = ClientBuilder.get()
+ .tcpConnectTimeout(Duration.fromMilliseconds(200))
+ .connectTimeout(Duration.fromMilliseconds(200))
+ .requestTimeout(Duration.fromSeconds(1));
+ if (!clientConfig.getThriftMux()) {
+ builder = builder.hostConnectionLimit(1);
+ }
+ return builder;
+ }
+
+ @SuppressWarnings("unchecked")
+ private ClientBuilder setDefaultSettings(ClientBuilder builder) {
+ return builder.name(clientName)
+ .failFast(false)
+ .noFailureAccrual()
+ // disable retries on finagle client builder, as there is only one host per finagle client
+ // we should throw exception immediately on first failure, so DL client could quickly detect
+ // failures and retry other proxies.
+ .retries(1)
+ .keepAlive(true);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public ProxyClient build(SocketAddress address) {
+ Service<ThriftClientRequest, byte[]> client =
+ ClientBuilder.safeBuildFactory(
+ clientBuilder
+ .hosts((InetSocketAddress) address)
+ .reportTo(clientStats.getFinagleStatsReceiver(address))
+ ).toService();
+ DistributedLogService.ServiceIface service =
+ new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory());
+ return new ProxyClient(address, client, service);
+ }
+
+ }
+
+ private final SocketAddress address;
+ private final Service<ThriftClientRequest, byte[]> client;
+ private final DistributedLogService.ServiceIface service;
+
+ protected ProxyClient(SocketAddress address,
+ Service<ThriftClientRequest, byte[]> client,
+ DistributedLogService.ServiceIface service) {
+ this.address = address;
+ this.client = client;
+ this.service = service;
+ }
+
+ public SocketAddress getAddress() {
+ return address;
+ }
+
+ public Service<ThriftClientRequest, byte[]> getClient() {
+ return client;
+ }
+
+ public DistributedLogService.ServiceIface getService() {
+ return service;
+ }
+
+ public Future<BoxedUnit> close() {
+ return client.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java
new file mode 100644
index 0000000..17b70be
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import org.apache.distributedlog.client.ClientConfig;
+import org.apache.distributedlog.client.stats.ClientStats;
+import org.apache.distributedlog.client.stats.OpStats;
+import org.apache.distributedlog.thrift.service.ClientInfo;
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import com.twitter.util.FutureEventListener;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager manages clients (channels) to proxies.
+ */
+public class ProxyClientManager implements TimerTask {
+
+ private static final Logger logger = LoggerFactory.getLogger(ProxyClientManager.class);
+
+ private final ClientConfig clientConfig;
+ private final ProxyClient.Builder clientBuilder;
+ private final HashedWheelTimer timer;
+ private final HostProvider hostProvider;
+ private volatile Timeout periodicHandshakeTask;
+ private final ConcurrentHashMap<SocketAddress, ProxyClient> address2Services =
+ new ConcurrentHashMap<SocketAddress, ProxyClient>();
+ private final CopyOnWriteArraySet<ProxyListener> proxyListeners =
+ new CopyOnWriteArraySet<ProxyListener>();
+ private volatile boolean closed = false;
+ private volatile boolean periodicHandshakeEnabled = true;
+ private final Stopwatch lastOwnershipSyncStopwatch;
+
+ private final OpStats handshakeStats;
+
+ public ProxyClientManager(ClientConfig clientConfig,
+ ProxyClient.Builder clientBuilder,
+ HashedWheelTimer timer,
+ HostProvider hostProvider,
+ ClientStats clientStats) {
+ this.clientConfig = clientConfig;
+ this.clientBuilder = clientBuilder;
+ this.timer = timer;
+ this.hostProvider = hostProvider;
+ this.handshakeStats = clientStats.getOpStats("handshake");
+ scheduleHandshake();
+ this.lastOwnershipSyncStopwatch = Stopwatch.createStarted();
+ }
+
+ private void scheduleHandshake() {
+ if (clientConfig.getPeriodicHandshakeIntervalMs() > 0) {
+ periodicHandshakeTask = timer.newTimeout(this,
+ clientConfig.getPeriodicHandshakeIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ void setPeriodicHandshakeEnabled(boolean enabled) {
+ this.periodicHandshakeEnabled = enabled;
+ }
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (timeout.isCancelled() || closed) {
+ return;
+ }
+ if (periodicHandshakeEnabled) {
+ final boolean syncOwnerships = lastOwnershipSyncStopwatch.elapsed(TimeUnit.MILLISECONDS)
+ >= clientConfig.getPeriodicOwnershipSyncIntervalMs();
+
+ final Set<SocketAddress> hostsSnapshot = hostProvider.getHosts();
+ final AtomicInteger numHosts = new AtomicInteger(hostsSnapshot.size());
+ final AtomicInteger numStreams = new AtomicInteger(0);
+ final AtomicInteger numSuccesses = new AtomicInteger(0);
+ final AtomicInteger numFailures = new AtomicInteger(0);
+ final ConcurrentMap<SocketAddress, Integer> streamDistributions =
+ new ConcurrentHashMap<SocketAddress, Integer>();
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ for (SocketAddress host : hostsSnapshot) {
+ final SocketAddress address = host;
+ final ProxyClient client = getClient(address);
+ handshake(address, client, new FutureEventListener<ServerInfo>() {
+ @Override
+ public void onSuccess(ServerInfo serverInfo) {
+ numStreams.addAndGet(serverInfo.getOwnershipsSize());
+ numSuccesses.incrementAndGet();
+ notifyHandshakeSuccess(address, client, serverInfo, false, stopwatch);
+ if (clientConfig.isHandshakeTracingEnabled()) {
+ streamDistributions.putIfAbsent(address, serverInfo.getOwnershipsSize());
+ }
+ complete();
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ numFailures.incrementAndGet();
+ notifyHandshakeFailure(address, client, cause, stopwatch);
+ complete();
+ }
+
+ private void complete() {
+ if (0 == numHosts.decrementAndGet()) {
+ if (syncOwnerships) {
+ logger.info("Periodic handshaked with {} hosts : {} streams returned,"
+ + " {} hosts succeeded, {} hosts failed",
+ new Object[] {
+ hostsSnapshot.size(),
+ numStreams.get(),
+ numSuccesses.get(),
+ numFailures.get()});
+ if (clientConfig.isHandshakeTracingEnabled()) {
+ logger.info("Periodic handshaked stream distribution : {}", streamDistributions);
+ }
+ }
+ }
+ }
+ }, false, syncOwnerships);
+ }
+
+ if (syncOwnerships) {
+ lastOwnershipSyncStopwatch.reset().start();
+ }
+ }
+ scheduleHandshake();
+ }
+
+ /**
+ * Register a proxy <code>listener</code> on proxy related changes.
+ *
+ * @param listener
+ * proxy listener
+ */
+ public void registerProxyListener(ProxyListener listener) {
+ proxyListeners.add(listener);
+ }
+
+ private void notifyHandshakeSuccess(SocketAddress address,
+ ProxyClient client,
+ ServerInfo serverInfo,
+ boolean logging,
+ Stopwatch stopwatch) {
+ if (logging) {
+ if (null != serverInfo && serverInfo.isSetOwnerships()) {
+ logger.info("Handshaked with {} : {} ownerships returned.",
+ address, serverInfo.getOwnerships().size());
+ } else {
+ logger.info("Handshaked with {} : no ownerships returned", address);
+ }
+ }
+ handshakeStats.completeRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1);
+ for (ProxyListener listener : proxyListeners) {
+ listener.onHandshakeSuccess(address, client, serverInfo);
+ }
+ }
+
+ private void notifyHandshakeFailure(SocketAddress address,
+ ProxyClient client,
+ Throwable cause,
+ Stopwatch stopwatch) {
+ handshakeStats.failRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1);
+ for (ProxyListener listener : proxyListeners) {
+ listener.onHandshakeFailure(address, client, cause);
+ }
+ }
+
+ /**
+ * Retrieve a client to proxy <code>address</code>.
+ *
+ * @param address
+ * proxy address
+ * @return proxy client
+ */
+ public ProxyClient getClient(final SocketAddress address) {
+ ProxyClient sc = address2Services.get(address);
+ if (null != sc) {
+ return sc;
+ }
+ return createClient(address);
+ }
+
+ /**
+ * Remove the client to proxy <code>address</code>.
+ *
+ * @param address
+ * proxy address
+ */
+ public void removeClient(SocketAddress address) {
+ ProxyClient sc = address2Services.remove(address);
+ if (null != sc) {
+ logger.info("Removed host {}.", address);
+ sc.close();
+ }
+ }
+
+ /**
+ * Remove the client <code>sc</code> to proxy <code>address</code>.
+ *
+ * @param address
+ * proxy address
+ * @param sc
+ * proxy client
+ */
+ public void removeClient(SocketAddress address, ProxyClient sc) {
+ if (address2Services.remove(address, sc)) {
+ logger.info("Remove client {} to host {}.", sc, address);
+ sc.close();
+ }
+ }
+
+ /**
+ * Create a client to proxy <code>address</code>.
+ *
+ * @param address
+ * proxy address
+ * @return proxy client
+ */
+ public ProxyClient createClient(final SocketAddress address) {
+ final ProxyClient sc = clientBuilder.build(address);
+ ProxyClient oldSC = address2Services.putIfAbsent(address, sc);
+ if (null != oldSC) {
+ sc.close();
+ return oldSC;
+ } else {
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ FutureEventListener<ServerInfo> listener = new FutureEventListener<ServerInfo>() {
+ @Override
+ public void onSuccess(ServerInfo serverInfo) {
+ notifyHandshakeSuccess(address, sc, serverInfo, true, stopwatch);
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ notifyHandshakeFailure(address, sc, cause, stopwatch);
+ }
+ };
+ // send a ping messaging after creating connections.
+ handshake(address, sc, listener, true, true);
+ return sc;
+ }
+ }
+
+ /**
+ * Handshake with a given proxy.
+ *
+ * @param address
+ * proxy address
+ * @param sc
+ * proxy client
+ * @param listener
+ * listener on handshake result
+ */
+ private void handshake(SocketAddress address,
+ ProxyClient sc,
+ FutureEventListener<ServerInfo> listener,
+ boolean logging,
+ boolean getOwnerships) {
+ if (clientConfig.getHandshakeWithClientInfo()) {
+ ClientInfo clientInfo = new ClientInfo();
+ clientInfo.setGetOwnerships(getOwnerships);
+ clientInfo.setStreamNameRegex(clientConfig.getStreamNameRegex());
+ if (logging) {
+ logger.info("Handshaking with {} : {}", address, clientInfo);
+ }
+ sc.getService().handshakeWithClientInfo(clientInfo)
+ .addEventListener(listener);
+ } else {
+ if (logging) {
+ logger.info("Handshaking with {}", address);
+ }
+ sc.getService().handshake().addEventListener(listener);
+ }
+ }
+
+ /**
+ * Handshake with all proxies.
+ *
+ * <p>NOTE: this is a synchronous call.
+ */
+ public void handshake() {
+ Set<SocketAddress> hostsSnapshot = hostProvider.getHosts();
+ logger.info("Handshaking with {} hosts.", hostsSnapshot.size());
+ final CountDownLatch latch = new CountDownLatch(hostsSnapshot.size());
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ for (SocketAddress host: hostsSnapshot) {
+ final SocketAddress address = host;
+ final ProxyClient client = getClient(address);
+ handshake(address, client, new FutureEventListener<ServerInfo>() {
+ @Override
+ public void onSuccess(ServerInfo serverInfo) {
+ notifyHandshakeSuccess(address, client, serverInfo, true, stopwatch);
+ latch.countDown();
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ notifyHandshakeFailure(address, client, cause, stopwatch);
+ latch.countDown();
+ }
+ }, true, true);
+ }
+ try {
+ latch.await(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted on handshaking with servers : ", e);
+ }
+ }
+
+ /**
+ * Return number of proxies managed by client manager.
+ *
+ * @return number of proxies managed by client manager.
+ */
+ public int getNumProxies() {
+ return address2Services.size();
+ }
+
+ /**
+ * Return all clients.
+ *
+ * @return all clients.
+ */
+ public Map<SocketAddress, ProxyClient> getAllClients() {
+ return ImmutableMap.copyOf(address2Services);
+ }
+
+ public void close() {
+ closed = true;
+ Timeout task = periodicHandshakeTask;
+ if (null != task) {
+ task.cancel();
+ }
+ for (ProxyClient sc : address2Services.values()) {
+ sc.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java
new file mode 100644
index 0000000..0a6b076
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import java.net.SocketAddress;
+
+/**
+ * Listener on server changes.
+ */
+public interface ProxyListener {
+ /**
+ * When a proxy's server info changed, it would be notified.
+ *
+ * @param address
+ * proxy address
+ * @param client
+ * proxy client that executes handshaking
+ * @param serverInfo
+ * proxy's server info
+ */
+ void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo);
+
+ /**
+ * Failed to handshake with a proxy.
+ *
+ * @param address
+ * proxy address
+ * @param client
+ * proxy client
+ * @param cause
+ * failure reason
+ */
+ void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause);
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java
new file mode 100644
index 0000000..4161afb
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Clients that interact with individual proxies.
+ */
+package org.apache.distributedlog.client.proxy;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java
new file mode 100644
index 0000000..2ac5be3
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.resolver;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Default implementation of {@link RegionResolver}.
+ */
+public class DefaultRegionResolver implements RegionResolver {
+
+ private static final String DEFAULT_REGION = "default-region";
+
+ private final Map<SocketAddress, String> regionOverrides =
+ new HashMap<SocketAddress, String>();
+ private final ConcurrentMap<SocketAddress, String> regionMap =
+ new ConcurrentHashMap<SocketAddress, String>();
+
+ public DefaultRegionResolver() {
+ }
+
+ public DefaultRegionResolver(Map<SocketAddress, String> regionOverrides) {
+ this.regionOverrides.putAll(regionOverrides);
+ }
+
+ @Override
+ public String resolveRegion(SocketAddress address) {
+ String region = regionMap.get(address);
+ if (null == region) {
+ region = doResolveRegion(address);
+ regionMap.put(address, region);
+ }
+ return region;
+ }
+
+ private String doResolveRegion(SocketAddress address) {
+ String region = regionOverrides.get(address);
+ if (null != region) {
+ return region;
+ }
+
+ String domainName;
+ if (address instanceof InetSocketAddress) {
+ InetSocketAddress iAddr = (InetSocketAddress) address;
+ domainName = iAddr.getHostName();
+ } else {
+ domainName = address.toString();
+ }
+ String[] parts = domainName.split("\\.");
+ if (parts.length <= 0) {
+ return DEFAULT_REGION;
+ }
+ String hostName = parts[0];
+ String[] labels = hostName.split("-");
+ if (labels.length != 4) {
+ return DEFAULT_REGION;
+ }
+ return labels[0];
+ }
+
+ @Override
+ public void removeCachedHost(SocketAddress address) {
+ regionMap.remove(address);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java
new file mode 100644
index 0000000..023799c
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.resolver;
+
+import java.net.SocketAddress;
+
+/**
+ * Resolve address to region.
+ */
+public interface RegionResolver {
+
+ /**
+ * Resolve address to region.
+ *
+ * @param address
+ * socket address
+ * @return region
+ */
+ String resolveRegion(SocketAddress address);
+
+ /**
+ * Remove cached host.
+ *
+ * @param address
+ * socket address.
+ */
+ void removeCachedHost(SocketAddress address);
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java
new file mode 100644
index 0000000..81cda2f
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Resolver to resolve network addresses.
+ */
+package org.apache.distributedlog.client.resolver;