You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:48 UTC

[43/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
new file mode 100644
index 0000000..b3f3368
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecordSet;
+import org.apache.distributedlog.LogRecordSetBuffer;
+import org.apache.distributedlog.client.speculative.DefaultSpeculativeRequestExecutionPolicy;
+import org.apache.distributedlog.client.speculative.SpeculativeRequestExecutionPolicy;
+import org.apache.distributedlog.client.speculative.SpeculativeRequestExecutor;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.service.DistributedLogClient;
+import com.twitter.finagle.IndividualRequestTimeoutException;
+import com.twitter.util.Duration;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Write to multiple streams.
+ */
+public class DistributedLogMultiStreamWriter implements Runnable {
+
+    /**
+     * Create a new builder to create a multi stream writer.
+     *
+     * @return a new builder to create a multi stream writer.
+     */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for the multi stream writer.
+     */
+    public static class Builder {
+
+        private DistributedLogClient client = null;
+        private List<String> streams = null;
+        private int bufferSize = 16 * 1024; // 16k
+        private long flushIntervalMicros = 2000; // 2ms
+        private CompressionCodec.Type codec = CompressionCodec.Type.NONE;
+        private ScheduledExecutorService executorService = null;
+        private long requestTimeoutMs = 500; // 500ms
+        private int firstSpeculativeTimeoutMs = 50; // 50ms
+        private int maxSpeculativeTimeoutMs = 200; // 200ms
+        private float speculativeBackoffMultiplier = 2;
+        private Ticker ticker = Ticker.systemTicker();
+
+        private Builder() {}
+
+        /**
+         * Set the distributedlog client used for multi stream writer.
+         *
+         * @param client
+         *          distributedlog client
+         * @return builder
+         */
+        public Builder client(DistributedLogClient client) {
+            this.client = client;
+            return this;
+        }
+
+        /**
+         * Set the list of streams to write to.
+         *
+         * @param streams
+         *          list of streams to write
+         * @return builder
+         */
+        public Builder streams(List<String> streams) {
+            this.streams = streams;
+            return this;
+        }
+
+        /**
+         * Set the output buffer size.
+         *
+         * <p>If output buffer size is 0, the writes will be transmitted to
+         * wire immediately.
+         *
+         * @param bufferSize
+         *          output buffer size
+         * @return builder
+         */
+        public Builder bufferSize(int bufferSize) {
+            this.bufferSize = bufferSize;
+            return this;
+        }
+
+        /**
+         * Set the flush interval in milliseconds.
+         *
+         * @param flushIntervalMs
+         *          flush interval in milliseconds.
+         * @return builder
+         */
+        public Builder flushIntervalMs(int flushIntervalMs) {
+            this.flushIntervalMicros = TimeUnit.MILLISECONDS.toMicros(flushIntervalMs);
+            return this;
+        }
+
+        /**
+         * Set the flush interval in microseconds.
+         *
+         * @param flushIntervalMicros
+         *          flush interval in microseconds.
+         * @return builder
+         */
+        public Builder flushIntervalMicros(int flushIntervalMicros) {
+            this.flushIntervalMicros = flushIntervalMicros;
+            return this;
+        }
+
+        /**
+         * Set compression codec.
+         *
+         * @param codec compression codec.
+         * @return builder
+         */
+        public Builder compressionCodec(CompressionCodec.Type codec) {
+            this.codec = codec;
+            return this;
+        }
+
+        /**
+         * Set the scheduler to flush output buffers.
+         *
+         * @param executorService
+         *          executor service to flush output buffers.
+         * @return builder
+         */
+        public Builder scheduler(ScheduledExecutorService executorService) {
+            this.executorService = executorService;
+            return this;
+        }
+
+        /**
+         * Set request timeout in milliseconds.
+         *
+         * @param requestTimeoutMs
+         *          request timeout in milliseconds.
+         * @return builder
+         */
+        public Builder requestTimeoutMs(long requestTimeoutMs) {
+            this.requestTimeoutMs = requestTimeoutMs;
+            return this;
+        }
+
+        /**
+         * Set the first speculative timeout in milliseconds.
+         *
+         * <p>The multi-streams writer does speculative writes on streams.
+         * The write issues first write request to a stream, if the write request
+         * doesn't respond within speculative timeout. it issues next write request
+         * to a different stream. It does such speculative retries until receive
+         * a success or request timeout ({@link #requestTimeoutMs(long)}).
+         *
+         * <p>This setting is to configure the first speculative timeout, in milliseconds.
+         *
+         * @param timeoutMs
+         *          timeout in milliseconds
+         * @return builder
+         */
+        public Builder firstSpeculativeTimeoutMs(int timeoutMs) {
+            this.firstSpeculativeTimeoutMs = timeoutMs;
+            return this;
+        }
+
+        /**
+         * Set the max speculative timeout in milliseconds.
+         *
+         * <p>The multi-streams writer does speculative writes on streams.
+         * The write issues first write request to a stream, if the write request
+         * doesn't respond within speculative timeout. it issues next write request
+         * to a different stream. It does such speculative retries until receive
+         * a success or request timeout ({@link #requestTimeoutMs(long)}).
+         *
+         * <p>This setting is to configure the max speculative timeout, in milliseconds.
+         *
+         * @param timeoutMs
+         *          timeout in milliseconds
+         * @return builder
+         */
+        public Builder maxSpeculativeTimeoutMs(int timeoutMs) {
+            this.maxSpeculativeTimeoutMs = timeoutMs;
+            return this;
+        }
+
+        /**
+         * Set the speculative timeout backoff multiplier.
+         *
+         * <p>The multi-streams writer does speculative writes on streams.
+         * The write issues first write request to a stream, if the write request
+         * doesn't respond within speculative timeout. it issues next write request
+         * to a different stream. It does such speculative retries until receive
+         * a success or request timeout ({@link #requestTimeoutMs(long)}).
+         *
+         * <p>This setting is to configure the speculative timeout backoff multiplier.
+         *
+         * @param multiplier
+         *          backoff multiplier
+         * @return builder
+         */
+        public Builder speculativeBackoffMultiplier(float multiplier) {
+            this.speculativeBackoffMultiplier = multiplier;
+            return this;
+        }
+
+        /**
+         * Ticker for timing.
+         *
+         * @param ticker
+         *          ticker
+         * @return builder
+         * @see Ticker
+         */
+        public Builder clockTicker(Ticker ticker) {
+            this.ticker = ticker;
+            return this;
+        }
+
+        /**
+         * Build the multi stream writer.
+         *
+         * @return the multi stream writer.
+         */
+        public DistributedLogMultiStreamWriter build() {
+            checkArgument((null != streams && !streams.isEmpty()),
+                    "No streams provided");
+            checkNotNull(client,
+                    "No distributedlog client provided");
+            checkNotNull(codec,
+                    "No compression codec provided");
+            checkArgument(firstSpeculativeTimeoutMs > 0
+                    && firstSpeculativeTimeoutMs <= maxSpeculativeTimeoutMs
+                    && speculativeBackoffMultiplier > 0
+                    && maxSpeculativeTimeoutMs < requestTimeoutMs,
+                    "Invalid speculative timeout settings");
+            return new DistributedLogMultiStreamWriter(
+                streams,
+                client,
+                Math.min(bufferSize, MAX_LOGRECORDSET_SIZE),
+                flushIntervalMicros,
+                requestTimeoutMs,
+                firstSpeculativeTimeoutMs,
+                maxSpeculativeTimeoutMs,
+                speculativeBackoffMultiplier,
+                codec,
+                ticker,
+                executorService);
+        }
+    }
+
+    /**
+     * Pending Write Request.
+     */
+    class PendingWriteRequest implements FutureEventListener<DLSN>,
+            SpeculativeRequestExecutor {
+
+        private final LogRecordSetBuffer recordSet;
+        private AtomicBoolean complete = new AtomicBoolean(false);
+        private final Stopwatch stopwatch = Stopwatch.createStarted(clockTicker);
+        private int nextStream;
+        private int numTriedStreams = 0;
+
+        PendingWriteRequest(LogRecordSetBuffer recordSet) {
+            this.recordSet = recordSet;
+            this.nextStream = Math.abs(nextStreamId.incrementAndGet()) % numStreams;
+        }
+
+        synchronized String sendNextWrite() {
+            long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+            if (elapsedMs > requestTimeoutMs || numTriedStreams >= numStreams) {
+                fail(new IndividualRequestTimeoutException(Duration.fromMilliseconds(elapsedMs)));
+                return null;
+            }
+            try {
+                return sendWriteToStream(nextStream);
+            } finally {
+                nextStream = (nextStream + 1) % numStreams;
+                ++numTriedStreams;
+            }
+        }
+
+        synchronized String sendWriteToStream(int streamId) {
+            String stream = getStream(streamId);
+            client.writeRecordSet(stream, recordSet)
+                    .addEventListener(this);
+            return stream;
+        }
+
+        @Override
+        public void onSuccess(DLSN dlsn) {
+            if (!complete.compareAndSet(false, true)) {
+                return;
+            }
+            recordSet.completeTransmit(
+                    dlsn.getLogSegmentSequenceNo(),
+                    dlsn.getEntryId(),
+                    dlsn.getSlotId());
+        }
+
+        @Override
+        public void onFailure(Throwable cause) {
+            sendNextWrite();
+        }
+
+        private void fail(Throwable cause) {
+            if (!complete.compareAndSet(false, true)) {
+                return;
+            }
+            recordSet.abortTransmit(cause);
+        }
+
+        @Override
+        public Future<Boolean> issueSpeculativeRequest() {
+            return Future.value(!complete.get() && null != sendNextWrite());
+        }
+    }
+
+    private final int numStreams;
+    private final List<String> streams;
+    private final DistributedLogClient client;
+    private final int bufferSize;
+    private final long requestTimeoutMs;
+    private final SpeculativeRequestExecutionPolicy speculativePolicy;
+    private final Ticker clockTicker;
+    private final CompressionCodec.Type codec;
+    private final ScheduledExecutorService scheduler;
+    private final boolean ownScheduler;
+    private final AtomicInteger nextStreamId;
+    private LogRecordSet.Writer recordSetWriter;
+
+    private DistributedLogMultiStreamWriter(List<String> streams,
+                                            DistributedLogClient client,
+                                            int bufferSize,
+                                            long flushIntervalMicros,
+                                            long requestTimeoutMs,
+                                            int firstSpecultiveTimeoutMs,
+                                            int maxSpeculativeTimeoutMs,
+                                            float speculativeBackoffMultiplier,
+                                            CompressionCodec.Type codec,
+                                            Ticker clockTicker,
+                                            ScheduledExecutorService scheduler) {
+        this.streams = Lists.newArrayList(streams);
+        this.numStreams = this.streams.size();
+        this.client = client;
+        this.bufferSize = bufferSize;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.codec = codec;
+        this.clockTicker = clockTicker;
+        if (null == scheduler) {
+            this.scheduler = Executors.newSingleThreadScheduledExecutor(
+                    new ThreadFactoryBuilder()
+                            .setDaemon(true)
+                            .setNameFormat("MultiStreamWriterFlushThread-%d")
+                            .build());
+            this.ownScheduler = true;
+        } else {
+            this.scheduler = scheduler;
+            this.ownScheduler = false;
+        }
+        this.speculativePolicy = new DefaultSpeculativeRequestExecutionPolicy(
+                firstSpecultiveTimeoutMs,
+                maxSpeculativeTimeoutMs,
+                speculativeBackoffMultiplier);
+        // shuffle the streams
+        Collections.shuffle(this.streams);
+        this.nextStreamId = new AtomicInteger(0);
+        this.recordSetWriter = newRecordSetWriter();
+
+        if (flushIntervalMicros > 0) {
+            this.scheduler.scheduleAtFixedRate(
+                    this,
+                    flushIntervalMicros,
+                    flushIntervalMicros,
+                    TimeUnit.MICROSECONDS);
+        }
+    }
+
+    String getStream(int streamId) {
+        return streams.get(streamId);
+    }
+
+    synchronized LogRecordSet.Writer getLogRecordSetWriter() {
+        return recordSetWriter;
+    }
+
+    private LogRecordSet.Writer newRecordSetWriter() {
+        return LogRecordSet.newWriter(
+                bufferSize,
+                codec);
+    }
+
+    public synchronized Future<DLSN> write(ByteBuffer buffer) {
+        int logRecordSize = buffer.remaining();
+        if (logRecordSize > MAX_LOGRECORD_SIZE) {
+            return Future.exception(new LogRecordTooLongException(
+                    "Log record of size " + logRecordSize + " written when only "
+                            + MAX_LOGRECORD_SIZE + " is allowed"));
+        }
+        // if exceed max number of bytes
+        if ((recordSetWriter.getNumBytes() + logRecordSize) > MAX_LOGRECORDSET_SIZE) {
+            flush();
+        }
+        Promise<DLSN> writePromise = new Promise<DLSN>();
+        try {
+            recordSetWriter.writeRecord(buffer, writePromise);
+        } catch (LogRecordTooLongException e) {
+            return Future.exception(e);
+        } catch (WriteException e) {
+            recordSetWriter.abortTransmit(e);
+            recordSetWriter = newRecordSetWriter();
+            return Future.exception(e);
+        }
+        if (recordSetWriter.getNumBytes() >= bufferSize) {
+            flush();
+        }
+        return writePromise;
+    }
+
+    @Override
+    public void run() {
+        flush();
+    }
+
+    private void flush() {
+        LogRecordSet.Writer recordSetToFlush;
+        synchronized (this) {
+            if (recordSetWriter.getNumRecords() == 0) {
+                return;
+            }
+            recordSetToFlush = recordSetWriter;
+            recordSetWriter = newRecordSetWriter();
+        }
+        transmit(recordSetToFlush);
+    }
+
+    private void transmit(LogRecordSet.Writer recordSetToFlush) {
+        PendingWriteRequest writeRequest =
+                new PendingWriteRequest(recordSetToFlush);
+        this.speculativePolicy.initiateSpeculativeRequest(scheduler, writeRequest);
+    }
+
+    public void close() {
+        if (ownScheduler) {
+            this.scheduler.shutdown();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/MonitorServiceClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/MonitorServiceClient.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/MonitorServiceClient.java
new file mode 100644
index 0000000..ed6269b
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/MonitorServiceClient.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.monitor;
+
+import com.twitter.util.Future;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Interface for distributedlog monitor service.
+ */
+public interface MonitorServiceClient {
+
+    /**
+     * Check a given stream.
+     *
+     * @param stream
+     *          stream.
+     * @return check result.
+     */
+    Future<Void> check(String stream);
+
+    /**
+     * Send heartbeat to the stream and its readers.
+     *
+     * @param stream
+     *          stream.
+     * @return check result.
+     */
+    Future<Void> heartbeat(String stream);
+
+    /**
+     * Get current ownership distribution from current monitor service view.
+     *
+     * @return current ownership distribution
+     */
+    Map<SocketAddress, Set<String>> getStreamOwnershipDistribution();
+
+    /**
+     * Enable/Disable accepting new stream on a given proxy.
+     *
+     * @param enabled
+     *          flag to enable/disable accepting new streams on a given proxy
+     * @return void
+     */
+    Future<Void> setAcceptNewStream(boolean enabled);
+
+    /**
+     * Close the client.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/package-info.java
new file mode 100644
index 0000000..d7e2c94
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/monitor/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Monitor Client.
+ */
+package org.apache.distributedlog.client.monitor;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/OwnershipCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/OwnershipCache.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/OwnershipCache.java
new file mode 100644
index 0000000..f3c24ca
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/OwnershipCache.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.ownership;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.distributedlog.client.ClientConfig;
+import org.apache.distributedlog.client.stats.OwnershipStatsLogger;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.net.SocketAddress;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client Side Ownership Cache.
+ */
+public class OwnershipCache implements TimerTask {
+
+    private static final Logger logger = LoggerFactory.getLogger(OwnershipCache.class);
+
+    private final ConcurrentHashMap<String, SocketAddress> stream2Addresses =
+            new ConcurrentHashMap<String, SocketAddress>();
+    private final ConcurrentHashMap<SocketAddress, Set<String>> address2Streams =
+            new ConcurrentHashMap<SocketAddress, Set<String>>();
+    private final ClientConfig clientConfig;
+    private final HashedWheelTimer timer;
+
+    // Stats
+    private final OwnershipStatsLogger ownershipStatsLogger;
+
+    public OwnershipCache(ClientConfig clientConfig,
+                          HashedWheelTimer timer,
+                          StatsReceiver statsReceiver,
+                          StatsReceiver streamStatsReceiver) {
+        this.clientConfig = clientConfig;
+        this.timer = timer;
+        this.ownershipStatsLogger = new OwnershipStatsLogger(statsReceiver, streamStatsReceiver);
+        scheduleDumpOwnershipCache();
+    }
+
+    private void scheduleDumpOwnershipCache() {
+        if (clientConfig.isPeriodicDumpOwnershipCacheEnabled()
+            && clientConfig.getPeriodicDumpOwnershipCacheIntervalMs() > 0) {
+            timer.newTimeout(this, clientConfig.getPeriodicDumpOwnershipCacheIntervalMs(),
+                    TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        if (timeout.isCancelled()) {
+            return;
+        }
+        logger.info("Ownership cache : {} streams cached, {} hosts cached",
+                stream2Addresses.size(), address2Streams.size());
+        logger.info("Cached streams : {}", stream2Addresses);
+        scheduleDumpOwnershipCache();
+    }
+
+    public OwnershipStatsLogger getOwnershipStatsLogger() {
+        return ownershipStatsLogger;
+    }
+
+    /**
+     * Update ownership of <i>stream</i> to <i>addr</i>.
+     *
+     * @param stream
+     *          Stream Name.
+     * @param addr
+     *          Owner Address.
+     * @return true if owner is updated
+     */
+    public boolean updateOwner(String stream, SocketAddress addr) {
+        // update ownership
+        SocketAddress oldAddr = stream2Addresses.putIfAbsent(stream, addr);
+        if (null != oldAddr && oldAddr.equals(addr)) {
+            return true;
+        }
+        if (null != oldAddr) {
+            if (stream2Addresses.replace(stream, oldAddr, addr)) {
+                // Store the relevant mappings for this topic and host combination
+                logger.info("Storing ownership for stream : {}, old host : {}, new host : {}.",
+                        new Object[] { stream, oldAddr, addr });
+                StringBuilder sb = new StringBuilder();
+                sb.append("Ownership changed '")
+                  .append(oldAddr).append("' -> '").append(addr).append("'");
+                removeOwnerFromStream(stream, oldAddr, sb.toString());
+
+                // update stats
+                ownershipStatsLogger.onRemove(stream);
+                ownershipStatsLogger.onAdd(stream);
+            } else {
+                logger.warn("Ownership of stream : {} has been changed from {} to {} when storing host : {}.",
+                        new Object[] { stream, oldAddr, stream2Addresses.get(stream), addr });
+                return false;
+            }
+        } else {
+            logger.info("Storing ownership for stream : {}, host : {}.", stream, addr);
+            // update stats
+            ownershipStatsLogger.onAdd(stream);
+        }
+
+        Set<String> streamsForHost = address2Streams.get(addr);
+        if (null == streamsForHost) {
+            Set<String> newStreamsForHost = new HashSet<String>();
+            streamsForHost = address2Streams.putIfAbsent(addr, newStreamsForHost);
+            if (null == streamsForHost) {
+                streamsForHost = newStreamsForHost;
+            }
+        }
+        synchronized (streamsForHost) {
+            // check whether the ownership changed, since it might happend after replace succeed
+            if (addr.equals(stream2Addresses.get(stream))) {
+                streamsForHost.add(stream);
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Get the cached owner for stream <code>stream</code>.
+     *
+     * @param stream
+     *          stream to lookup ownership
+     * @return owner's address
+     */
+    public SocketAddress getOwner(String stream) {
+        SocketAddress address = stream2Addresses.get(stream);
+        if (null == address) {
+            ownershipStatsLogger.onMiss(stream);
+        } else {
+            ownershipStatsLogger.onHit(stream);
+        }
+        return address;
+    }
+
+    /**
+     * Remove the owner <code>addr</code> from <code>stream</code> for a given <code>reason</code>.
+     *
+     * @param stream stream name
+     * @param addr owner address
+     * @param reason reason to remove ownership
+     */
+    public void removeOwnerFromStream(String stream, SocketAddress addr, String reason) {
+        if (stream2Addresses.remove(stream, addr)) {
+            logger.info("Removed stream to host mapping for (stream: {} -> host: {}) : reason = '{}'.",
+                    new Object[] { stream, addr, reason });
+        }
+        Set<String> streamsForHost = address2Streams.get(addr);
+        if (null != streamsForHost) {
+            synchronized (streamsForHost) {
+                if (streamsForHost.remove(stream)) {
+                    logger.info("Removed stream ({}) from host {} : reason = '{}'.",
+                            new Object[] { stream, addr, reason });
+                    if (streamsForHost.isEmpty()) {
+                        address2Streams.remove(addr, streamsForHost);
+                    }
+                    ownershipStatsLogger.onRemove(stream);
+                }
+            }
+        }
+    }
+
+    /**
+     * Remove all streams from host <code>addr</code>.
+     *
+     * @param addr
+     *          host to remove ownerships
+     */
+    public void removeAllStreamsFromOwner(SocketAddress addr) {
+        logger.info("Remove streams mapping for host {}", addr);
+        Set<String> streamsForHost = address2Streams.get(addr);
+        if (null != streamsForHost) {
+            synchronized (streamsForHost) {
+                for (String s : streamsForHost) {
+                    if (stream2Addresses.remove(s, addr)) {
+                        logger.info("Removing mapping for stream : {} from host : {}", s, addr);
+                        ownershipStatsLogger.onRemove(s);
+                    }
+                }
+                address2Streams.remove(addr, streamsForHost);
+            }
+        }
+    }
+
+    /**
+     * Get the number cached streams.
+     *
+     * @return number cached streams.
+     */
+    public int getNumCachedStreams() {
+        return stream2Addresses.size();
+    }
+
+    /**
+     * Get the stream ownership distribution across proxies.
+     *
+     * @return stream ownership distribution
+     */
+    public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() {
+        return ImmutableMap.copyOf(address2Streams);
+    }
+
+    /**
+     * Get the stream ownership mapping.
+     *
+     * @return stream ownership mapping.
+     */
+    public Map<String, SocketAddress> getStreamOwnerMapping() {
+        return stream2Addresses;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/package-info.java
new file mode 100644
index 0000000..486bd6f
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/ownership/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utils for managing ownership at client side.
+ */
+package org.apache.distributedlog.client.ownership;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/package-info.java
new file mode 100644
index 0000000..d22b0da
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Client.
+ */
+package org.apache.distributedlog.client;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ClusterClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ClusterClient.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ClusterClient.java
new file mode 100644
index 0000000..9b5c7f6
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ClusterClient.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import com.twitter.finagle.Service;
+import com.twitter.finagle.thrift.ThriftClientRequest;
+import com.twitter.util.Future;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Cluster client.
+ */
+public class ClusterClient {
+
+    private final Service<ThriftClientRequest, byte[]> client;
+    private final DistributedLogService.ServiceIface service;
+
+    public ClusterClient(Service<ThriftClientRequest, byte[]> client,
+                         DistributedLogService.ServiceIface service) {
+        this.client = client;
+        this.service = service;
+    }
+
+    public Service<ThriftClientRequest, byte[]> getClient() {
+        return client;
+    }
+
+    public DistributedLogService.ServiceIface getService() {
+        return service;
+    }
+
+    public Future<BoxedUnit> close() {
+        return client.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/HostProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/HostProvider.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/HostProvider.java
new file mode 100644
index 0000000..769cca8
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/HostProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import java.net.SocketAddress;
+import java.util.Set;
+
+/**
+ * Provider to provider list of hosts for handshaking.
+ */
+public interface HostProvider {
+
+    /**
+     * Get the list of hosts for handshaking.
+     *
+     * @return list of hosts for handshaking.
+     */
+    Set<SocketAddress> getHosts();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
new file mode 100644
index 0000000..6ef1d8e
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClient.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import org.apache.distributedlog.client.ClientConfig;
+import org.apache.distributedlog.client.stats.ClientStats;
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import com.twitter.finagle.Service;
+import com.twitter.finagle.ThriftMux;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId;
+import com.twitter.finagle.thrift.ThriftClientFramedCodec;
+import com.twitter.finagle.thrift.ThriftClientRequest;
+import com.twitter.util.Duration;
+import com.twitter.util.Future;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import scala.Option;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Client talks to a single proxy.
+ */
+public class ProxyClient {
+
+  /**
+   * Builder to build a proxy client talking to given host <code>address</code>.
+   */
+  public interface Builder {
+        /**
+         * Build a proxy client to <code>address</code>.
+         *
+         * @param address
+         *          proxy address
+         * @return proxy client
+         */
+        ProxyClient build(SocketAddress address);
+    }
+
+    public static Builder newBuilder(String clientName,
+                                     ClientId clientId,
+                                     ClientBuilder clientBuilder,
+                                     ClientConfig clientConfig,
+                                     ClientStats clientStats) {
+        return new DefaultBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats);
+    }
+
+    /**
+     * Default Builder for {@link ProxyClient}.
+     */
+    public static class DefaultBuilder implements Builder {
+
+        private final String clientName;
+        private final ClientId clientId;
+        private final ClientBuilder clientBuilder;
+        private final ClientStats clientStats;
+
+        private DefaultBuilder(String clientName,
+                               ClientId clientId,
+                               ClientBuilder clientBuilder,
+                               ClientConfig clientConfig,
+                               ClientStats clientStats) {
+            this.clientName = clientName;
+            this.clientId = clientId;
+            this.clientStats = clientStats;
+            // client builder
+            ClientBuilder builder = setDefaultSettings(
+                    null == clientBuilder ? getDefaultClientBuilder(clientConfig) : clientBuilder);
+            this.clientBuilder = configureThriftMux(builder, clientId, clientConfig);
+        }
+
+        @SuppressWarnings("unchecked")
+        private ClientBuilder configureThriftMux(ClientBuilder builder,
+                                                 ClientId clientId,
+                                                 ClientConfig clientConfig) {
+            if (clientConfig.getThriftMux()) {
+                return builder.stack(ThriftMux.client().withClientId(clientId));
+            } else {
+                return builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId)));
+            }
+        }
+
+        private ClientBuilder getDefaultClientBuilder(ClientConfig clientConfig) {
+            ClientBuilder builder = ClientBuilder.get()
+                .tcpConnectTimeout(Duration.fromMilliseconds(200))
+                .connectTimeout(Duration.fromMilliseconds(200))
+                .requestTimeout(Duration.fromSeconds(1));
+            if (!clientConfig.getThriftMux()) {
+                builder = builder.hostConnectionLimit(1);
+            }
+            return builder;
+        }
+
+        @SuppressWarnings("unchecked")
+        private ClientBuilder setDefaultSettings(ClientBuilder builder) {
+            return builder.name(clientName)
+                   .failFast(false)
+                   .noFailureAccrual()
+                   // disable retries on finagle client builder, as there is only one host per finagle client
+                   // we should throw exception immediately on first failure, so DL client could quickly detect
+                   // failures and retry other proxies.
+                   .retries(1)
+                   .keepAlive(true);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public ProxyClient build(SocketAddress address) {
+            Service<ThriftClientRequest, byte[]> client =
+                ClientBuilder.safeBuildFactory(
+                        clientBuilder
+                                .hosts((InetSocketAddress) address)
+                                .reportTo(clientStats.getFinagleStatsReceiver(address))
+                ).toService();
+            DistributedLogService.ServiceIface service =
+                    new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory());
+            return new ProxyClient(address, client, service);
+        }
+
+    }
+
+    private final SocketAddress address;
+    private final Service<ThriftClientRequest, byte[]> client;
+    private final DistributedLogService.ServiceIface service;
+
+    protected ProxyClient(SocketAddress address,
+                          Service<ThriftClientRequest, byte[]> client,
+                          DistributedLogService.ServiceIface service) {
+        this.address = address;
+        this.client  = client;
+        this.service = service;
+    }
+
+    public SocketAddress getAddress() {
+        return address;
+    }
+
+    public Service<ThriftClientRequest, byte[]> getClient() {
+        return client;
+    }
+
+    public DistributedLogService.ServiceIface getService() {
+        return service;
+    }
+
+    public Future<BoxedUnit> close() {
+        return client.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java
new file mode 100644
index 0000000..17b70be
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyClientManager.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import org.apache.distributedlog.client.ClientConfig;
+import org.apache.distributedlog.client.stats.ClientStats;
+import org.apache.distributedlog.client.stats.OpStats;
+import org.apache.distributedlog.thrift.service.ClientInfo;
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import com.twitter.util.FutureEventListener;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager manages clients (channels) to proxies.
+ */
+public class ProxyClientManager implements TimerTask {
+
+    private static final Logger logger = LoggerFactory.getLogger(ProxyClientManager.class);
+
+    private final ClientConfig clientConfig;
+    private final ProxyClient.Builder clientBuilder;
+    private final HashedWheelTimer timer;
+    private final HostProvider hostProvider;
+    private volatile Timeout periodicHandshakeTask;
+    private final ConcurrentHashMap<SocketAddress, ProxyClient> address2Services =
+            new ConcurrentHashMap<SocketAddress, ProxyClient>();
+    private final CopyOnWriteArraySet<ProxyListener> proxyListeners =
+            new CopyOnWriteArraySet<ProxyListener>();
+    private volatile boolean closed = false;
+    private volatile boolean periodicHandshakeEnabled = true;
+    private final Stopwatch lastOwnershipSyncStopwatch;
+
+    private final OpStats handshakeStats;
+
+    public ProxyClientManager(ClientConfig clientConfig,
+                              ProxyClient.Builder clientBuilder,
+                              HashedWheelTimer timer,
+                              HostProvider hostProvider,
+                              ClientStats clientStats) {
+        this.clientConfig = clientConfig;
+        this.clientBuilder = clientBuilder;
+        this.timer = timer;
+        this.hostProvider = hostProvider;
+        this.handshakeStats = clientStats.getOpStats("handshake");
+        scheduleHandshake();
+        this.lastOwnershipSyncStopwatch = Stopwatch.createStarted();
+    }
+
+    private void scheduleHandshake() {
+        if (clientConfig.getPeriodicHandshakeIntervalMs() > 0) {
+            periodicHandshakeTask = timer.newTimeout(this,
+                    clientConfig.getPeriodicHandshakeIntervalMs(), TimeUnit.MILLISECONDS);
+        }
+    }
+
+    void setPeriodicHandshakeEnabled(boolean enabled) {
+        this.periodicHandshakeEnabled = enabled;
+    }
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        if (timeout.isCancelled() || closed) {
+            return;
+        }
+        if (periodicHandshakeEnabled) {
+            final boolean syncOwnerships = lastOwnershipSyncStopwatch.elapsed(TimeUnit.MILLISECONDS)
+                >= clientConfig.getPeriodicOwnershipSyncIntervalMs();
+
+            final Set<SocketAddress> hostsSnapshot = hostProvider.getHosts();
+            final AtomicInteger numHosts = new AtomicInteger(hostsSnapshot.size());
+            final AtomicInteger numStreams = new AtomicInteger(0);
+            final AtomicInteger numSuccesses = new AtomicInteger(0);
+            final AtomicInteger numFailures = new AtomicInteger(0);
+            final ConcurrentMap<SocketAddress, Integer> streamDistributions =
+                    new ConcurrentHashMap<SocketAddress, Integer>();
+            final Stopwatch stopwatch = Stopwatch.createStarted();
+            for (SocketAddress host : hostsSnapshot) {
+                final SocketAddress address = host;
+                final ProxyClient client = getClient(address);
+                handshake(address, client, new FutureEventListener<ServerInfo>() {
+                    @Override
+                    public void onSuccess(ServerInfo serverInfo) {
+                        numStreams.addAndGet(serverInfo.getOwnershipsSize());
+                        numSuccesses.incrementAndGet();
+                        notifyHandshakeSuccess(address, client, serverInfo, false, stopwatch);
+                        if (clientConfig.isHandshakeTracingEnabled()) {
+                            streamDistributions.putIfAbsent(address, serverInfo.getOwnershipsSize());
+                        }
+                        complete();
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        numFailures.incrementAndGet();
+                        notifyHandshakeFailure(address, client, cause, stopwatch);
+                        complete();
+                    }
+
+                    private void complete() {
+                        if (0 == numHosts.decrementAndGet()) {
+                            if (syncOwnerships) {
+                                logger.info("Periodic handshaked with {} hosts : {} streams returned,"
+                                    + " {} hosts succeeded, {} hosts failed",
+                                    new Object[] {
+                                        hostsSnapshot.size(),
+                                        numStreams.get(),
+                                        numSuccesses.get(),
+                                        numFailures.get()});
+                                if (clientConfig.isHandshakeTracingEnabled()) {
+                                    logger.info("Periodic handshaked stream distribution : {}", streamDistributions);
+                                }
+                            }
+                        }
+                    }
+                }, false, syncOwnerships);
+            }
+
+            if (syncOwnerships) {
+                lastOwnershipSyncStopwatch.reset().start();
+            }
+        }
+        scheduleHandshake();
+    }
+
+    /**
+     * Register a proxy <code>listener</code> on proxy related changes.
+     *
+     * @param listener
+     *          proxy listener
+     */
+    public void registerProxyListener(ProxyListener listener) {
+        proxyListeners.add(listener);
+    }
+
+    private void notifyHandshakeSuccess(SocketAddress address,
+                                        ProxyClient client,
+                                        ServerInfo serverInfo,
+                                        boolean logging,
+                                        Stopwatch stopwatch) {
+        if (logging) {
+            if (null != serverInfo && serverInfo.isSetOwnerships()) {
+                logger.info("Handshaked with {} : {} ownerships returned.",
+                        address, serverInfo.getOwnerships().size());
+            } else {
+                logger.info("Handshaked with {} : no ownerships returned", address);
+            }
+        }
+        handshakeStats.completeRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1);
+        for (ProxyListener listener : proxyListeners) {
+            listener.onHandshakeSuccess(address, client, serverInfo);
+        }
+    }
+
+    private void notifyHandshakeFailure(SocketAddress address,
+                                        ProxyClient client,
+                                        Throwable cause,
+                                        Stopwatch stopwatch) {
+        handshakeStats.failRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1);
+        for (ProxyListener listener : proxyListeners) {
+            listener.onHandshakeFailure(address, client, cause);
+        }
+    }
+
+    /**
+     * Retrieve a client to proxy <code>address</code>.
+     *
+     * @param address
+     *          proxy address
+     * @return proxy client
+     */
+    public ProxyClient getClient(final SocketAddress address) {
+        ProxyClient sc = address2Services.get(address);
+        if (null != sc) {
+            return sc;
+        }
+        return createClient(address);
+    }
+
+    /**
+     * Remove the client to proxy <code>address</code>.
+     *
+     * @param address
+     *          proxy address
+     */
+    public void removeClient(SocketAddress address) {
+        ProxyClient sc = address2Services.remove(address);
+        if (null != sc) {
+            logger.info("Removed host {}.", address);
+            sc.close();
+        }
+    }
+
+    /**
+     * Remove the client <code>sc</code> to proxy <code>address</code>.
+     *
+     * @param address
+     *          proxy address
+     * @param sc
+     *          proxy client
+     */
+    public void removeClient(SocketAddress address, ProxyClient sc) {
+        if (address2Services.remove(address, sc)) {
+            logger.info("Remove client {} to host {}.", sc, address);
+            sc.close();
+        }
+    }
+
+    /**
+     * Create a client to proxy <code>address</code>.
+     *
+     * @param address
+     *          proxy address
+     * @return proxy client
+     */
+    public ProxyClient createClient(final SocketAddress address) {
+        final ProxyClient sc = clientBuilder.build(address);
+        ProxyClient oldSC = address2Services.putIfAbsent(address, sc);
+        if (null != oldSC) {
+            sc.close();
+            return oldSC;
+        } else {
+            final Stopwatch stopwatch = Stopwatch.createStarted();
+            FutureEventListener<ServerInfo> listener = new FutureEventListener<ServerInfo>() {
+                @Override
+                public void onSuccess(ServerInfo serverInfo) {
+                    notifyHandshakeSuccess(address, sc, serverInfo, true, stopwatch);
+                }
+                @Override
+                public void onFailure(Throwable cause) {
+                    notifyHandshakeFailure(address, sc, cause, stopwatch);
+                }
+            };
+            // send a ping messaging after creating connections.
+            handshake(address, sc, listener, true, true);
+            return sc;
+        }
+    }
+
+    /**
+     * Handshake with a given proxy.
+     *
+     * @param address
+     *          proxy address
+     * @param sc
+     *          proxy client
+     * @param listener
+     *          listener on handshake result
+     */
+    private void handshake(SocketAddress address,
+                           ProxyClient sc,
+                           FutureEventListener<ServerInfo> listener,
+                           boolean logging,
+                           boolean getOwnerships) {
+        if (clientConfig.getHandshakeWithClientInfo()) {
+            ClientInfo clientInfo = new ClientInfo();
+            clientInfo.setGetOwnerships(getOwnerships);
+            clientInfo.setStreamNameRegex(clientConfig.getStreamNameRegex());
+            if (logging) {
+                logger.info("Handshaking with {} : {}", address, clientInfo);
+            }
+            sc.getService().handshakeWithClientInfo(clientInfo)
+                    .addEventListener(listener);
+        } else {
+            if (logging) {
+                logger.info("Handshaking with {}", address);
+            }
+            sc.getService().handshake().addEventListener(listener);
+        }
+    }
+
+    /**
+     * Handshake with all proxies.
+     *
+     * <p>NOTE: this is a synchronous call.
+     */
+    public void handshake() {
+        Set<SocketAddress> hostsSnapshot = hostProvider.getHosts();
+        logger.info("Handshaking with {} hosts.", hostsSnapshot.size());
+        final CountDownLatch latch = new CountDownLatch(hostsSnapshot.size());
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        for (SocketAddress host: hostsSnapshot) {
+            final SocketAddress address = host;
+            final ProxyClient client = getClient(address);
+            handshake(address, client, new FutureEventListener<ServerInfo>() {
+                @Override
+                public void onSuccess(ServerInfo serverInfo) {
+                    notifyHandshakeSuccess(address, client, serverInfo, true, stopwatch);
+                    latch.countDown();
+                }
+                @Override
+                public void onFailure(Throwable cause) {
+                    notifyHandshakeFailure(address, client, cause, stopwatch);
+                    latch.countDown();
+                }
+            }, true, true);
+        }
+        try {
+            latch.await(1, TimeUnit.MINUTES);
+        } catch (InterruptedException e) {
+            logger.warn("Interrupted on handshaking with servers : ", e);
+        }
+    }
+
+    /**
+     * Return number of proxies managed by client manager.
+     *
+     * @return number of proxies managed by client manager.
+     */
+    public int getNumProxies() {
+        return address2Services.size();
+    }
+
+    /**
+     * Return all clients.
+     *
+     * @return all clients.
+     */
+    public Map<SocketAddress, ProxyClient> getAllClients() {
+        return ImmutableMap.copyOf(address2Services);
+    }
+
+    public void close() {
+        closed = true;
+        Timeout task = periodicHandshakeTask;
+        if (null != task) {
+            task.cancel();
+        }
+        for (ProxyClient sc : address2Services.values()) {
+            sc.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java
new file mode 100644
index 0000000..0a6b076
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/ProxyListener.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import java.net.SocketAddress;
+
+/**
+ * Listener on server changes.
+ */
+public interface ProxyListener {
+    /**
+     * When a proxy's server info changed, it would be notified.
+     *
+     * @param address
+     *          proxy address
+     * @param client
+     *          proxy client that executes handshaking
+     * @param serverInfo
+     *          proxy's server info
+     */
+    void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo);
+
+    /**
+     * Failed to handshake with a proxy.
+     *
+     * @param address
+     *          proxy address
+     * @param client
+     *          proxy client
+     * @param cause
+     *          failure reason
+     */
+    void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java
new file mode 100644
index 0000000..4161afb
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/proxy/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Clients that interact with individual proxies.
+ */
+package org.apache.distributedlog.client.proxy;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java
new file mode 100644
index 0000000..2ac5be3
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/DefaultRegionResolver.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.resolver;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Default implementation of {@link RegionResolver}.
+ */
+public class DefaultRegionResolver implements RegionResolver {
+
+    private static final String DEFAULT_REGION = "default-region";
+
+    private final Map<SocketAddress, String> regionOverrides =
+            new HashMap<SocketAddress, String>();
+    private final ConcurrentMap<SocketAddress, String> regionMap =
+            new ConcurrentHashMap<SocketAddress, String>();
+
+    public DefaultRegionResolver() {
+    }
+
+    public DefaultRegionResolver(Map<SocketAddress, String> regionOverrides) {
+        this.regionOverrides.putAll(regionOverrides);
+    }
+
+    @Override
+    public String resolveRegion(SocketAddress address) {
+        String region = regionMap.get(address);
+        if (null == region) {
+            region = doResolveRegion(address);
+            regionMap.put(address, region);
+        }
+        return region;
+    }
+
+    private String doResolveRegion(SocketAddress address) {
+        String region = regionOverrides.get(address);
+        if (null != region) {
+            return region;
+        }
+
+        String domainName;
+        if (address instanceof InetSocketAddress) {
+            InetSocketAddress iAddr = (InetSocketAddress) address;
+            domainName = iAddr.getHostName();
+        } else {
+            domainName = address.toString();
+        }
+        String[] parts = domainName.split("\\.");
+        if (parts.length <= 0) {
+            return DEFAULT_REGION;
+        }
+        String hostName = parts[0];
+        String[] labels = hostName.split("-");
+        if (labels.length != 4) {
+            return DEFAULT_REGION;
+        }
+        return labels[0];
+    }
+
+    @Override
+    public void removeCachedHost(SocketAddress address) {
+        regionMap.remove(address);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java
new file mode 100644
index 0000000..023799c
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/RegionResolver.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.resolver;
+
+import java.net.SocketAddress;
+
+/**
+ * Resolve address to region.
+ */
+public interface RegionResolver {
+
+    /**
+     * Resolve address to region.
+     *
+     * @param address
+     *          socket address
+     * @return region
+     */
+    String resolveRegion(SocketAddress address);
+
+    /**
+     * Remove cached host.
+     *
+     * @param address
+     *          socket address.
+     */
+    void removeCachedHost(SocketAddress address);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java
new file mode 100644
index 0000000..81cda2f
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/resolver/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Resolver to resolve network addresses.
+ */
+package org.apache.distributedlog.client.resolver;