You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:47 UTC

[42/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java
new file mode 100644
index 0000000..666fa31
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ConsistentHashRoutingService.java
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.service.DLSocketAddress;
+import com.twitter.finagle.ChannelException;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.stats.Counter;
+import com.twitter.finagle.stats.Gauge;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.util.Function0;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Seq;
+
+/**
+ * Consistent Hashing Based {@link RoutingService}.
+ */
+public class ConsistentHashRoutingService extends ServerSetRoutingService {
+
+    private static final Logger logger = LoggerFactory.getLogger(ConsistentHashRoutingService.class);
+
+    @Deprecated
+    public static ConsistentHashRoutingService of(ServerSetWatcher serverSetWatcher, int numReplicas) {
+        return new ConsistentHashRoutingService(serverSetWatcher, numReplicas, 300, NullStatsReceiver.get());
+    }
+
+    /**
+     * Builder helper class to build a consistent hash bashed {@link RoutingService}.
+     *
+     * @return builder to build a consistent hash based {@link RoutingService}.
+     */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for building consistent hash based routing service.
+     */
+    public static class Builder implements RoutingService.Builder {
+
+        private ServerSet serverSet;
+        private boolean resolveFromName = false;
+        private int numReplicas;
+        private int blackoutSeconds = 300;
+        private StatsReceiver statsReceiver = NullStatsReceiver.get();
+
+        private Builder() {}
+
+        public Builder serverSet(ServerSet serverSet) {
+            this.serverSet = serverSet;
+            return this;
+        }
+
+        public Builder resolveFromName(boolean enabled) {
+            this.resolveFromName = enabled;
+            return this;
+        }
+
+        public Builder numReplicas(int numReplicas) {
+            this.numReplicas = numReplicas;
+            return this;
+        }
+
+        public Builder blackoutSeconds(int seconds) {
+            this.blackoutSeconds = seconds;
+            return this;
+        }
+
+        public Builder statsReceiver(StatsReceiver statsReceiver) {
+            this.statsReceiver = statsReceiver;
+            return this;
+        }
+
+        @Override
+        public RoutingService build() {
+            checkNotNull(serverSet, "No serverset provided.");
+            checkNotNull(statsReceiver, "No stats receiver provided.");
+            checkArgument(numReplicas > 0, "Invalid number of replicas : " + numReplicas);
+            return new ConsistentHashRoutingService(new TwitterServerSetWatcher(serverSet, resolveFromName),
+                numReplicas, blackoutSeconds, statsReceiver);
+        }
+    }
+
+    static class ConsistentHash {
+        private final HashFunction hashFunction;
+        private final int numOfReplicas;
+        private final SortedMap<Long, SocketAddress> circle;
+
+        // Stats
+        protected final Counter hostAddedCounter;
+        protected final Counter hostRemovedCounter;
+
+        ConsistentHash(HashFunction hashFunction,
+                       int numOfReplicas,
+                       StatsReceiver statsReceiver) {
+            this.hashFunction = hashFunction;
+            this.numOfReplicas = numOfReplicas;
+            this.circle = new TreeMap<Long, SocketAddress>();
+
+            this.hostAddedCounter = statsReceiver.counter0("adds");
+            this.hostRemovedCounter = statsReceiver.counter0("removes");
+        }
+
+        private String replicaName(int shardId, int replica, String address) {
+            if (shardId < 0) {
+                shardId = UNKNOWN_SHARD_ID;
+            }
+
+            StringBuilder sb = new StringBuilder(100);
+            sb.append("shard-");
+            sb.append(shardId);
+            sb.append('-');
+            sb.append(replica);
+            sb.append('-');
+            sb.append(address);
+
+            return sb.toString();
+        }
+
+        private Long replicaHash(int shardId, int replica, String address) {
+            return hashFunction.hashUnencodedChars(replicaName(shardId, replica, address)).asLong();
+        }
+
+        private Long replicaHash(int shardId, int replica, SocketAddress address) {
+            return replicaHash(shardId, replica, address.toString());
+        }
+
+        public synchronized void add(int shardId, SocketAddress address) {
+            String addressStr = address.toString();
+            for (int i = 0; i < numOfReplicas; i++) {
+                Long hash = replicaHash(shardId, i, addressStr);
+                circle.put(hash, address);
+            }
+            hostAddedCounter.incr();
+        }
+
+        public synchronized void remove(int shardId, SocketAddress address) {
+            for (int i = 0; i < numOfReplicas; i++) {
+                long hash = replicaHash(shardId, i, address);
+                SocketAddress oldAddress = circle.get(hash);
+                if (null != oldAddress && oldAddress.equals(address)) {
+                    circle.remove(hash);
+                }
+            }
+            hostRemovedCounter.incr();
+        }
+
+        public SocketAddress get(String key, RoutingContext rContext) {
+            long hash = hashFunction.hashUnencodedChars(key).asLong();
+            return find(hash, rContext);
+        }
+
+        private synchronized SocketAddress find(long hash, RoutingContext rContext) {
+            if (circle.isEmpty()) {
+                return null;
+            }
+
+            Iterator<Map.Entry<Long, SocketAddress>> iterator =
+                    circle.tailMap(hash).entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, SocketAddress> entry = iterator.next();
+                if (!rContext.isTriedHost(entry.getValue())) {
+                    return entry.getValue();
+                }
+            }
+            // the tail map has been checked
+            iterator = circle.headMap(hash).entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, SocketAddress> entry = iterator.next();
+                if (!rContext.isTriedHost(entry.getValue())) {
+                    return entry.getValue();
+                }
+            }
+
+            return null;
+        }
+
+        private synchronized Pair<Long, SocketAddress> get(long hash) {
+            if (circle.isEmpty()) {
+                return null;
+            }
+
+            if (!circle.containsKey(hash)) {
+                SortedMap<Long, SocketAddress> tailMap = circle.tailMap(hash);
+                hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
+            }
+            return Pair.of(hash, circle.get(hash));
+        }
+
+        synchronized void dumpHashRing() {
+            for (Map.Entry<Long, SocketAddress> entry : circle.entrySet()) {
+                logger.info(entry.getKey() + " : " + entry.getValue());
+            }
+        }
+
+    }
+
+    class BlackoutHost implements TimerTask {
+        final int shardId;
+        final SocketAddress address;
+
+        BlackoutHost(int shardId, SocketAddress address) {
+            this.shardId = shardId;
+            this.address = address;
+            numBlackoutHosts.incrementAndGet();
+        }
+
+        @Override
+        public void run(Timeout timeout) throws Exception {
+            numBlackoutHosts.decrementAndGet();
+            if (!timeout.isExpired()) {
+                return;
+            }
+            Set<SocketAddress> removedList = new HashSet<SocketAddress>();
+            boolean joined;
+            // add the shard back
+            synchronized (shardId2Address) {
+                SocketAddress curHost = shardId2Address.get(shardId);
+                if (null != curHost) {
+                    // there is already new shard joint, so drop the host.
+                    logger.info("Blackout Shard {} ({}) was already replaced by {} permanently.",
+                            new Object[] { shardId, address, curHost });
+                    joined = false;
+                } else {
+                    join(shardId, address, removedList);
+                    joined = true;
+                }
+            }
+            if (joined) {
+                for (RoutingListener listener : listeners) {
+                    listener.onServerJoin(address);
+                }
+            } else {
+                for (RoutingListener listener : listeners) {
+                    listener.onServerLeft(address);
+                }
+            }
+        }
+    }
+
+    protected final HashedWheelTimer hashedWheelTimer;
+    protected final HashFunction hashFunction = Hashing.md5();
+    protected final ConsistentHash circle;
+    protected final Map<Integer, SocketAddress> shardId2Address =
+            new HashMap<Integer, SocketAddress>();
+    protected final Map<SocketAddress, Integer> address2ShardId =
+            new HashMap<SocketAddress, Integer>();
+
+    // blackout period
+    protected final int blackoutSeconds;
+
+    // stats
+    protected final StatsReceiver statsReceiver;
+    protected final AtomicInteger numBlackoutHosts;
+    protected final Gauge numBlackoutHostsGauge;
+    protected final Gauge numHostsGauge;
+
+    private static final int UNKNOWN_SHARD_ID = -1;
+
+    ConsistentHashRoutingService(ServerSetWatcher serverSetWatcher,
+                                 int numReplicas,
+                                 int blackoutSeconds,
+                                 StatsReceiver statsReceiver) {
+        super(serverSetWatcher);
+        this.circle = new ConsistentHash(hashFunction, numReplicas, statsReceiver.scope("ring"));
+        this.hashedWheelTimer = new HashedWheelTimer(new ThreadFactoryBuilder()
+                .setNameFormat("ConsistentHashRoutingService-Timer-%d").build());
+        this.blackoutSeconds = blackoutSeconds;
+        // stats
+        this.statsReceiver = statsReceiver;
+        this.numBlackoutHosts = new AtomicInteger(0);
+        this.numBlackoutHostsGauge = this.statsReceiver.addGauge(gaugeName("num_blackout_hosts"),
+                new Function0<Object>() {
+                    @Override
+                    public Object apply() {
+                        return (float) numBlackoutHosts.get();
+                    }
+                });
+        this.numHostsGauge = this.statsReceiver.addGauge(gaugeName("num_hosts"),
+                new Function0<Object>() {
+                    @Override
+                    public Object apply() {
+                        return (float) address2ShardId.size();
+                    }
+                });
+    }
+
+    private static Seq<String> gaugeName(String name) {
+        return scala.collection.JavaConversions.asScalaBuffer(Arrays.asList(name)).toList();
+    }
+
+    @Override
+    public void startService() {
+        super.startService();
+        this.hashedWheelTimer.start();
+    }
+
+    @Override
+    public void stopService() {
+        this.hashedWheelTimer.stop();
+        super.stopService();
+    }
+
+    @Override
+    public Set<SocketAddress> getHosts() {
+        synchronized (shardId2Address) {
+            return ImmutableSet.copyOf(address2ShardId.keySet());
+        }
+    }
+
+    @Override
+    public SocketAddress getHost(String key, RoutingContext rContext)
+            throws NoBrokersAvailableException {
+        SocketAddress host = circle.get(key, rContext);
+        if (null != host) {
+            return host;
+        }
+        throw new NoBrokersAvailableException("No host found for " + key + ", routing context : " + rContext);
+    }
+
+    @Override
+    public void removeHost(SocketAddress host, Throwable reason) {
+        removeHostInternal(host, Optional.of(reason));
+    }
+
+    private void removeHostInternal(SocketAddress host, Optional<Throwable> reason) {
+        synchronized (shardId2Address) {
+            Integer shardId = address2ShardId.remove(host);
+            if (null != shardId) {
+                SocketAddress curHost = shardId2Address.get(shardId);
+                if (null != curHost && curHost.equals(host)) {
+                    shardId2Address.remove(shardId);
+                }
+                circle.remove(shardId, host);
+                if (reason.isPresent()) {
+                    if (reason.get() instanceof ChannelException) {
+                        logger.info("Shard {} ({}) left due to ChannelException, black it out for {} seconds"
+                            + " (message = {})",
+                            new Object[] { shardId, host, blackoutSeconds, reason.get().toString() });
+                        BlackoutHost blackoutHost = new BlackoutHost(shardId, host);
+                        hashedWheelTimer.newTimeout(blackoutHost, blackoutSeconds, TimeUnit.SECONDS);
+                    } else {
+                        logger.info("Shard {} ({}) left due to exception {}",
+                                new Object[] { shardId, host, reason.get().toString() });
+                    }
+                } else {
+                    logger.info("Shard {} ({}) left after server set change",
+                                shardId, host);
+                }
+            } else if (reason.isPresent()) {
+                logger.info("Node {} left due to exception {}", host, reason.get().toString());
+            } else {
+                logger.info("Node {} left after server set change", host);
+            }
+        }
+    }
+
+    /**
+     * The caller should synchronize on <i>shardId2Address</i>.
+     * @param shardId
+     *          Shard id of new host joined.
+     * @param newHost
+     *          New host joined.
+     * @param removedList
+     *          Old hosts to remove
+     */
+    private void join(int shardId, SocketAddress newHost, Set<SocketAddress> removedList) {
+        SocketAddress oldHost = shardId2Address.put(shardId, newHost);
+        if (null != oldHost) {
+            // remove the old host only when a new shard is kicked in to replace it.
+            address2ShardId.remove(oldHost);
+            circle.remove(shardId, oldHost);
+            removedList.add(oldHost);
+            logger.info("Shard {} ({}) left permanently.", shardId, oldHost);
+        }
+        address2ShardId.put(newHost, shardId);
+        circle.add(shardId, newHost);
+        logger.info("Shard {} ({}) joined to replace ({}).",
+                    new Object[] { shardId, newHost, oldHost });
+    }
+
+    @Override
+    protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress> serviceInstances) {
+        Set<SocketAddress> joinedList = new HashSet<SocketAddress>();
+        Set<SocketAddress> removedList = new HashSet<SocketAddress>();
+
+        Map<Integer, SocketAddress> newMap = new HashMap<Integer, SocketAddress>();
+        synchronized (shardId2Address) {
+            for (DLSocketAddress serviceInstance : serviceInstances) {
+                if (serviceInstance.getShard() >= 0) {
+                    newMap.put(serviceInstance.getShard(), serviceInstance.getSocketAddress());
+                } else {
+                    Integer shard = address2ShardId.get(serviceInstance.getSocketAddress());
+                    if (null == shard) {
+                        // Assign a random negative shardId
+                        int shardId;
+                        do {
+                            shardId = Math.min(-1 , (int) (Math.random() * Integer.MIN_VALUE));
+                        } while (null != shardId2Address.get(shardId));
+                        shard = shardId;
+                    }
+                    newMap.put(shard, serviceInstance.getSocketAddress());
+                }
+            }
+        }
+
+        Map<Integer, SocketAddress> left;
+        synchronized (shardId2Address) {
+            MapDifference<Integer, SocketAddress> difference =
+                    Maps.difference(shardId2Address, newMap);
+            left = difference.entriesOnlyOnLeft();
+            for (Map.Entry<Integer, SocketAddress> shardEntry : left.entrySet()) {
+                int shard = shardEntry.getKey();
+                if (shard >= 0) {
+                    SocketAddress host = shardId2Address.get(shard);
+                    if (null != host) {
+                        // we don't remove those hosts that just disappered on serverset proactively,
+                        // since it might be just because serverset become flaky
+                        // address2ShardId.remove(host);
+                        // circle.remove(shard, host);
+                        logger.info("Shard {} ({}) left temporarily.", shard, host);
+                    }
+                } else {
+                    // shard id is negative - they are resolved from finagle name, which instances don't have shard id
+                    // in this case, if they are removed from serverset, we removed them directly
+                    SocketAddress host = shardEntry.getValue();
+                    if (null != host) {
+                        removeHostInternal(host, Optional.<Throwable>absent());
+                        removedList.add(host);
+                    }
+                }
+            }
+            // we need to find if any shards are replacing old shards
+            for (Map.Entry<Integer, SocketAddress> shard : newMap.entrySet()) {
+                SocketAddress oldHost = shardId2Address.get(shard.getKey());
+                SocketAddress newHost = shard.getValue();
+                if (!newHost.equals(oldHost)) {
+                    join(shard.getKey(), newHost, removedList);
+                    joinedList.add(newHost);
+                }
+            }
+        }
+
+        for (SocketAddress addr : removedList) {
+            for (RoutingListener listener : listeners) {
+                listener.onServerLeft(addr);
+            }
+        }
+
+        for (SocketAddress addr : joinedList) {
+            for (RoutingListener listener : listeners) {
+                listener.onServerJoin(addr);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java
new file mode 100644
index 0000000..e51eb1e
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/NameServerSet.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.base.Command;
+import com.twitter.common.base.Commands;
+import com.twitter.common.zookeeper.Group;
+import com.twitter.common.zookeeper.ServerSet;
+import com.twitter.finagle.Addr;
+import com.twitter.finagle.Address;
+import com.twitter.finagle.Name;
+import com.twitter.finagle.Resolver$;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+/**
+ * Finagle Name based {@link ServerSet} implementation.
+ */
+class NameServerSet implements ServerSet {
+
+    private static final Logger logger = LoggerFactory.getLogger(NameServerSet.class);
+
+    private volatile Set<HostChangeMonitor<ServiceInstance>> watchers =
+        new HashSet<HostChangeMonitor<ServiceInstance>>();
+    private volatile ImmutableSet<ServiceInstance> hostSet = ImmutableSet.of();
+    private AtomicBoolean resolutionPending = new AtomicBoolean(true);
+
+    public NameServerSet(String nameStr) {
+        Name name;
+        try {
+            name = Resolver$.MODULE$.eval(nameStr);
+        } catch (Exception exc) {
+            logger.error("Exception in Resolver.eval for name {}", nameStr, exc);
+            // Since this is called from various places that dont handle specific exceptions,
+            // we have no option than to throw a runtime exception to halt the control flow
+            // This should only happen in case of incorrect configuration. Having a log message
+            // would help identify the problem during tests
+            throw new RuntimeException(exc);
+        }
+        initialize(name);
+    }
+
+    public NameServerSet(Name name) {
+        initialize(name);
+    }
+
+    private void initialize(Name name) {
+        if (name instanceof TestName) {
+            ((TestName) name).changes(new AbstractFunction1<Addr, BoxedUnit>() {
+                @Override
+                public BoxedUnit apply(Addr varAddr) {
+                    return NameServerSet.this.respondToChanges(varAddr);
+                }
+            });
+        } else if (name instanceof Name.Bound) {
+            ((Name.Bound) name).addr().changes().respond(new AbstractFunction1<Addr, BoxedUnit>() {
+                @Override
+                public BoxedUnit apply(Addr varAddr) {
+                    return NameServerSet.this.respondToChanges(varAddr);
+                }
+            });
+        } else {
+            logger.error("NameServerSet only supports Name.Bound. While the resolved name {} was {}",
+                name, name.getClass());
+            throw new UnsupportedOperationException("NameServerSet only supports Name.Bound");
+        }
+    }
+
+    private ServiceInstance endpointAddressToServiceInstance(Address endpointAddress) {
+        if (endpointAddress instanceof Address.Inet) {
+            InetSocketAddress inetSocketAddress = ((Address.Inet) endpointAddress).addr();
+            Endpoint endpoint = new Endpoint(inetSocketAddress.getHostString(), inetSocketAddress.getPort());
+            HashMap<String, Endpoint> map = new HashMap<String, Endpoint>();
+            map.put("thrift", endpoint);
+            return new ServiceInstance(
+                endpoint,
+                map,
+                Status.ALIVE);
+        } else {
+            logger.error("We expect InetSocketAddress while the resolved address {} was {}",
+                        endpointAddress, endpointAddress.getClass());
+            throw new UnsupportedOperationException("invalid endpoint address: " + endpointAddress);
+        }
+    }
+
+
+    private BoxedUnit respondToChanges(Addr addr) {
+        ImmutableSet<ServiceInstance> oldHostSet = ImmutableSet.copyOf(hostSet);
+
+        ImmutableSet<ServiceInstance> newHostSet = oldHostSet;
+
+        if (addr instanceof Addr.Bound) {
+            scala.collection.immutable.Set<Address> endpointAddresses = ((Addr.Bound) addr).addrs();
+            scala.collection.Iterator<Address> endpointAddressesIterator = endpointAddresses.toIterator();
+            HashSet<ServiceInstance> serviceInstances = new HashSet<ServiceInstance>();
+            while (endpointAddressesIterator.hasNext()) {
+                serviceInstances.add(endpointAddressToServiceInstance(endpointAddressesIterator.next()));
+            }
+            newHostSet = ImmutableSet.copyOf(serviceInstances);
+
+        } else if (addr instanceof Addr.Failed) {
+            logger.error("Name resolution failed", ((Addr.Failed) addr).cause());
+            newHostSet = ImmutableSet.of();
+        } else if (addr.toString().equals("Pending")) {
+            logger.info("Name resolution pending");
+            newHostSet = oldHostSet;
+        } else if (addr.toString().equals("Neg")) {
+            newHostSet = ImmutableSet.of();
+        } else {
+            logger.error("Invalid Addr type: {}", addr.getClass().getName());
+            throw new UnsupportedOperationException("Invalid Addr type:" + addr.getClass().getName());
+        }
+
+        // Reference comparison is valid as the sets are immutable
+        if (oldHostSet != newHostSet) {
+            logger.info("NameServerSet updated: {} -> {}", hostSetToString(oldHostSet), hostSetToString(newHostSet));
+            resolutionPending.set(false);
+            hostSet = newHostSet;
+            synchronized (watchers) {
+                for (HostChangeMonitor<ServiceInstance> watcher: watchers) {
+                    watcher.onChange(newHostSet);
+                }
+            }
+
+        }
+
+        return BoxedUnit.UNIT;
+    }
+
+
+    private String hostSetToString(ImmutableSet<ServiceInstance> hostSet) {
+        StringBuilder result = new StringBuilder();
+        result.append("(");
+        for (ServiceInstance serviceInstance : hostSet) {
+            Endpoint endpoint = serviceInstance.getServiceEndpoint();
+            result.append(String.format(" %s:%d", endpoint.getHost(), endpoint.getPort()));
+        }
+        result.append(" )");
+
+        return result.toString();
+    }
+
+
+    /**
+     * Attempts to join a server set for this logical service group.
+     *
+     * @param endpoint the primary service endpoint
+     * @param additionalEndpoints and additional endpoints keyed by their logical name
+     * @param status the current service status
+     * @return an EndpointStatus object that allows the endpoint to adjust its status
+     * @throws Group.JoinException if there was a problem joining the server set
+     * @throws InterruptedException if interrupted while waiting to join the server set
+     * @deprecated The status field is deprecated. Please use {@link #join(java.net.InetSocketAddress, java.util.Map)}
+     */
+    @Override
+    public EndpointStatus join(InetSocketAddress endpoint,
+                               Map<String, InetSocketAddress> additionalEndpoints,
+                               Status status)
+            throws Group.JoinException, InterruptedException {
+        throw new UnsupportedOperationException("NameServerSet does not support join");
+    }
+
+    /**
+     * Attempts to join a server set for this logical service group.
+     *
+     * @param endpoint the primary service endpoint
+     * @param additionalEndpoints and additional endpoints keyed by their logical name
+     * @return an EndpointStatus object that allows the endpoint to adjust its status
+     * @throws Group.JoinException if there was a problem joining the server set
+     * @throws InterruptedException if interrupted while waiting to join the server set
+     */
+    @Override
+    public EndpointStatus join(InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints)
+            throws Group.JoinException, InterruptedException {
+        throw new UnsupportedOperationException("NameServerSet does not support join");
+    }
+
+    /**
+     * Attempts to join a server set for this logical service group.
+     *
+     * @param endpoint the primary service endpoint
+     * @param additionalEndpoints and additional endpoints keyed by their logical name
+     * @param shardId Unique shard identifier for this member of the service.
+     * @return an EndpointStatus object that allows the endpoint to adjust its status
+     * @throws Group.JoinException if there was a problem joining the server set
+     * @throws InterruptedException if interrupted while waiting to join the server set
+     */
+    @Override
+    public EndpointStatus join(InetSocketAddress endpoint,
+                               Map<String, InetSocketAddress> additionalEndpoints,
+                               int shardId)
+            throws Group.JoinException, InterruptedException {
+        throw new UnsupportedOperationException("NameServerSet does not support join");
+    }
+
+    /**
+     * Registers a monitor to receive change notices for this server set as long as this jvm process
+     * is alive.  Blocks until the initial server set can be gathered and delivered to the monitor.
+     * The monitor will be notified if the membership set or parameters of existing members have
+     * changed.
+     *
+     * @param monitor the server set monitor to call back when the host set changes
+     * @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set
+     * @deprecated Deprecated in favor of {@link #watch(com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor)}
+     */
+    @Deprecated
+    @Override
+    public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+        throw new UnsupportedOperationException("NameServerSet does not support monitor");
+    }
+
+    /**
+     * Registers a monitor to receive change notices for this server set as long as this jvm process
+     * is alive.  Blocks until the initial server set can be gathered and delivered to the monitor.
+     * The monitor will be notified if the membership set or parameters of existing members have
+     * changed.
+     *
+     * @param monitor the server set monitor to call back when the host set changes
+     * @return A command which, when executed, will stop monitoring the host set.
+     * @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set
+     */
+    @Override
+    public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+        // First add the monitor to the watchers so that it does not miss any changes and invoke
+        // the onChange method
+        synchronized (watchers) {
+            watchers.add(monitor);
+        }
+
+        if (resolutionPending.compareAndSet(false, false)) {
+            monitor.onChange(hostSet);
+        }
+
+        return Commands.NOOP; // Return value is not used
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java
new file mode 100644
index 0000000..d71cee3
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RegionsRoutingService.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Sets;
+import org.apache.distributedlog.client.resolver.RegionResolver;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.net.SocketAddress;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Chain multiple routing services.
+ */
+public class RegionsRoutingService implements RoutingService {
+
+    private static final Logger logger = LoggerFactory.getLogger(RegionsRoutingService.class);
+
+    /**
+     * Create a multiple regions routing services based on a list of region routing {@code services}.
+     *
+     * <p>It is deprecated. Please use {@link Builder} to build multiple regions routing service.
+     *
+     * @param regionResolver region resolver
+     * @param services a list of region routing services.
+     * @return multiple regions routing service
+     * @see Builder
+     */
+    @Deprecated
+    public static RegionsRoutingService of(RegionResolver regionResolver,
+                                         RoutingService...services) {
+        return new RegionsRoutingService(regionResolver, services);
+    }
+
+    /**
+     * Create a builder to build a multiple-regions routing service.
+     *
+     * @return builder to build a multiple-regions routing service.
+     */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder to build a multiple-regions routing service.
+     */
+    public static class Builder implements RoutingService.Builder {
+
+        private RegionResolver resolver;
+        private RoutingService.Builder[] routingServiceBuilders;
+        private StatsReceiver statsReceiver = NullStatsReceiver.get();
+
+        private Builder() {}
+
+        public Builder routingServiceBuilders(RoutingService.Builder...builders) {
+            this.routingServiceBuilders = builders;
+            return this;
+        }
+
+        public Builder resolver(RegionResolver regionResolver) {
+            this.resolver = regionResolver;
+            return this;
+        }
+
+        @Override
+        public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) {
+            this.statsReceiver = statsReceiver;
+            return this;
+        }
+
+        @Override
+        public RegionsRoutingService build() {
+            checkNotNull(routingServiceBuilders, "No routing service builder provided.");
+            checkNotNull(resolver, "No region resolver provided.");
+            checkNotNull(statsReceiver, "No stats receiver provided");
+            RoutingService[] services = new RoutingService[routingServiceBuilders.length];
+            for (int i = 0; i < services.length; i++) {
+                String statsScope;
+                if (0 == i) {
+                    statsScope = "local";
+                } else {
+                    statsScope = "remote_" + i;
+                }
+                services[i] = routingServiceBuilders[i]
+                        .statsReceiver(statsReceiver.scope(statsScope))
+                        .build();
+            }
+            return new RegionsRoutingService(resolver, services);
+        }
+    }
+
+    protected final RegionResolver regionResolver;
+    protected final RoutingService[] routingServices;
+
+    private RegionsRoutingService(RegionResolver resolver,
+                                  RoutingService[] routingServices) {
+        this.regionResolver = resolver;
+        this.routingServices = routingServices;
+    }
+
+    @Override
+    public Set<SocketAddress> getHosts() {
+        Set<SocketAddress> hosts = Sets.newHashSet();
+        for (RoutingService rs : routingServices) {
+            hosts.addAll(rs.getHosts());
+        }
+        return hosts;
+    }
+
+    @Override
+    public void startService() {
+        for (RoutingService service : routingServices) {
+            service.startService();
+        }
+        logger.info("Regions Routing Service Started");
+    }
+
+    @Override
+    public void stopService() {
+        for (RoutingService service : routingServices) {
+            service.stopService();
+        }
+        logger.info("Regions Routing Service Stopped");
+    }
+
+    @Override
+    public RoutingService registerListener(RoutingListener listener) {
+        for (RoutingService service : routingServices) {
+            service.registerListener(listener);
+        }
+        return this;
+    }
+
+    @Override
+    public RoutingService unregisterListener(RoutingListener listener) {
+        for (RoutingService service : routingServices) {
+            service.registerListener(listener);
+        }
+        return this;
+    }
+
+    @Override
+    public SocketAddress getHost(String key, RoutingContext routingContext)
+            throws NoBrokersAvailableException {
+        for (RoutingService service : routingServices) {
+            try {
+                SocketAddress addr = service.getHost(key, routingContext);
+                if (routingContext.hasUnavailableRegions()) {
+                    // current region is unavailable
+                    String region = regionResolver.resolveRegion(addr);
+                    if (routingContext.isUnavailableRegion(region)) {
+                        continue;
+                    }
+                }
+                if (!routingContext.isTriedHost(addr)) {
+                    return addr;
+                }
+            } catch (NoBrokersAvailableException nbae) {
+                // if there isn't broker available in current service, try next service.
+                logger.debug("No brokers available in region {} : ", service, nbae);
+            }
+        }
+        throw new NoBrokersAvailableException("No host found for " + key + ", routing context : " + routingContext);
+    }
+
+    @Override
+    public void removeHost(SocketAddress address, Throwable reason) {
+        for (RoutingService service : routingServices) {
+            service.removeHost(address, reason);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java
new file mode 100644
index 0000000..ad73c17
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingService.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import org.apache.distributedlog.client.resolver.RegionResolver;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Routing Service provides mechanism how to route requests.
+ */
+public interface RoutingService {
+
+    /**
+     * Builder to build routing service.
+     */
+    interface Builder {
+
+        /**
+         * Build routing service with stats receiver.
+         *
+         * @param statsReceiver
+         *          stats receiver
+         * @return built routing service
+         */
+        Builder statsReceiver(StatsReceiver statsReceiver);
+
+        /**
+         * Build the routing service.
+         *
+         * @return built routing service
+         */
+        RoutingService build();
+
+    }
+
+    /**
+     * Listener for server changes on routing service.
+     */
+    interface RoutingListener {
+        /**
+         * Trigger when server left.
+         *
+         * @param address left server.
+         */
+        void onServerLeft(SocketAddress address);
+
+        /**
+         * Trigger when server joint.
+         *
+         * @param address joint server.
+         */
+        void onServerJoin(SocketAddress address);
+    }
+
+    /**
+     * Routing Context of a request.
+     */
+    class RoutingContext {
+
+        public static RoutingContext of(RegionResolver resolver) {
+            return new RoutingContext(resolver);
+        }
+
+        final RegionResolver regionResolver;
+        final Map<SocketAddress, StatusCode> triedHosts;
+        final Set<String> unavailableRegions;
+
+        private RoutingContext(RegionResolver regionResolver) {
+            this.regionResolver = regionResolver;
+            this.triedHosts = new HashMap<SocketAddress, StatusCode>();
+            this.unavailableRegions = new HashSet<String>();
+        }
+
+        @Override
+        public synchronized String toString() {
+            return "(tried hosts=" + triedHosts + ")";
+        }
+
+        /**
+         * Add tried host to routing context.
+         *
+         * @param socketAddress
+         *          socket address of tried host.
+         * @param code
+         *          status code returned from tried host.
+         * @return routing context.
+         */
+        public synchronized RoutingContext addTriedHost(SocketAddress socketAddress, StatusCode code) {
+            this.triedHosts.put(socketAddress, code);
+            if (StatusCode.REGION_UNAVAILABLE == code) {
+                unavailableRegions.add(regionResolver.resolveRegion(socketAddress));
+            }
+            return this;
+        }
+
+        /**
+         * Is the host <i>address</i> already tried.
+         *
+         * @param address
+         *          socket address to check
+         * @return true if the address is already tried, otherwise false.
+         */
+        public synchronized boolean isTriedHost(SocketAddress address) {
+            return this.triedHosts.containsKey(address);
+        }
+
+        /**
+         * Whether encountered unavailable regions.
+         *
+         * @return true if encountered unavailable regions, otherwise false.
+         */
+        public synchronized boolean hasUnavailableRegions() {
+            return !unavailableRegions.isEmpty();
+        }
+
+        /**
+         * Whether the <i>region</i> is unavailable.
+         *
+         * @param region
+         *          region
+         * @return true if the region is unavailable, otherwise false.
+         */
+        public synchronized boolean isUnavailableRegion(String region) {
+            return unavailableRegions.contains(region);
+        }
+
+    }
+
+    /**
+     * Start routing service.
+     */
+    void startService();
+
+    /**
+     * Stop routing service.
+     */
+    void stopService();
+
+    /**
+     * Register routing listener.
+     *
+     * @param listener routing listener.
+     * @return routing service.
+     */
+    RoutingService registerListener(RoutingListener listener);
+
+    /**
+     * Unregister routing listener.
+     *
+     * @param listener routing listener.
+     * @return routing service.
+     */
+    RoutingService unregisterListener(RoutingListener listener);
+
+    /**
+     * Get all the hosts that available in routing service.
+     *
+     * @return all the hosts
+     */
+    Set<SocketAddress> getHosts();
+
+    /**
+     * Get the host to route the request by <i>key</i>.
+     *
+     * @param key
+     *          key to route the request.
+     * @param rContext
+     *          routing context.
+     * @return host to route the request
+     * @throws NoBrokersAvailableException
+     */
+    SocketAddress getHost(String key, RoutingContext rContext)
+            throws NoBrokersAvailableException;
+
+    /**
+     * Remove the host <i>address</i> for a specific <i>reason</i>.
+     *
+     * @param address
+     *          host address to remove
+     * @param reason
+     *          reason to remove the host
+     */
+    void removeHost(SocketAddress address, Throwable reason);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java
new file mode 100644
index 0000000..4ac22ce
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingServiceProvider.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import com.twitter.finagle.stats.StatsReceiver;
+
+class RoutingServiceProvider implements RoutingService.Builder {
+
+    final RoutingService routingService;
+
+    RoutingServiceProvider(RoutingService routingService) {
+        this.routingService = routingService;
+    }
+
+    @Override
+    public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) {
+        return this;
+    }
+
+    @Override
+    public RoutingService build() {
+        return routingService;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java
new file mode 100644
index 0000000..8e8edd3
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/RoutingUtils.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import com.twitter.common.zookeeper.ServerSet;
+import java.net.SocketAddress;
+
+/**
+ * Utils for routing services.
+ */
+public class RoutingUtils {
+
+    private static final int NUM_CONSISTENT_HASH_REPLICAS = 997;
+
+    /**
+     * Building routing service from <code>finagleNameStr</code>.
+     *
+     * @param finagleNameStr
+     *          finagle name str of a service
+     * @return routing service builder
+     */
+    public static RoutingService.Builder buildRoutingService(String finagleNameStr) {
+        if (!finagleNameStr.startsWith("serverset!")
+                && !finagleNameStr.startsWith("inet!")
+                && !finagleNameStr.startsWith("zk!")) {
+            // We only support serverset based names at the moment
+            throw new UnsupportedOperationException("Finagle Name format not supported for name: " + finagleNameStr);
+        }
+        return buildRoutingService(new NameServerSet(finagleNameStr), true);
+    }
+
+    /**
+     * Building routing service from <code>serverSet</code>.
+     *
+     * @param serverSet
+     *          server set of a service
+     * @return routing service builder
+     */
+    public static RoutingService.Builder buildRoutingService(ServerSet serverSet) {
+        return buildRoutingService(serverSet, false);
+    }
+
+    /**
+     * Building routing service from <code>address</code>.
+     *
+     * @param address
+     *          host to route the requests
+     * @return routing service builder
+     */
+    public static RoutingService.Builder buildRoutingService(SocketAddress address) {
+        return SingleHostRoutingService.newBuilder().address(address);
+    }
+
+    /**
+     * Build routing service builder of a routing service <code>routingService</code>.
+     *
+     * @param routingService
+     *          routing service to provide
+     * @return routing service builder
+     */
+    public static RoutingService.Builder buildRoutingService(RoutingService routingService) {
+        return new RoutingServiceProvider(routingService);
+    }
+
+    private static RoutingService.Builder buildRoutingService(ServerSet serverSet,
+                                                              boolean resolveFromName) {
+        return ConsistentHashRoutingService.newBuilder()
+                .serverSet(serverSet)
+                .resolveFromName(resolveFromName)
+                .numReplicas(NUM_CONSISTENT_HASH_REPLICAS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
new file mode 100644
index 0000000..4fe8141
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.distributedlog.service.DLSocketAddress;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Routing Service based on a given {@link com.twitter.common.zookeeper.ServerSet}.
+ */
+class ServerSetRoutingService extends Thread implements RoutingService {
+
+    private static final Logger logger = LoggerFactory.getLogger(ServerSetRoutingService.class);
+
+    static ServerSetRoutingServiceBuilder newServerSetRoutingServiceBuilder() {
+        return new ServerSetRoutingServiceBuilder();
+    }
+
+    /**
+     * Builder to build {@link com.twitter.common.zookeeper.ServerSet} based routing service.
+     */
+    static class ServerSetRoutingServiceBuilder implements RoutingService.Builder {
+
+        private ServerSetWatcher serverSetWatcher;
+
+        private ServerSetRoutingServiceBuilder() {}
+
+        public ServerSetRoutingServiceBuilder serverSetWatcher(ServerSetWatcher serverSetWatcher) {
+            this.serverSetWatcher = serverSetWatcher;
+            return this;
+        }
+
+        @Override
+        public Builder statsReceiver(StatsReceiver statsReceiver) {
+            return this;
+        }
+
+        @Override
+        public RoutingService build() {
+            checkNotNull(serverSetWatcher, "No serverset watcher provided.");
+            return new ServerSetRoutingService(this.serverSetWatcher);
+        }
+    }
+
+    private static class HostComparator implements Comparator<SocketAddress> {
+
+        private static final HostComparator INSTANCE = new HostComparator();
+
+        @Override
+        public int compare(SocketAddress o1, SocketAddress o2) {
+            return o1.toString().compareTo(o2.toString());
+        }
+    }
+
+    private final ServerSetWatcher serverSetWatcher;
+
+    private final Set<SocketAddress> hostSet = new HashSet<SocketAddress>();
+    private List<SocketAddress> hostList = new ArrayList<SocketAddress>();
+    private final HashFunction hasher = Hashing.md5();
+
+    // Server Set Changes
+    private final AtomicReference<ImmutableSet<DLSocketAddress>> serverSetChange =
+            new AtomicReference<ImmutableSet<DLSocketAddress>>(null);
+    private final CountDownLatch changeLatch = new CountDownLatch(1);
+
+    // Listeners
+    protected final CopyOnWriteArraySet<RoutingListener> listeners =
+            new CopyOnWriteArraySet<RoutingListener>();
+
+    ServerSetRoutingService(ServerSetWatcher serverSetWatcher) {
+        super("ServerSetRoutingService");
+        this.serverSetWatcher = serverSetWatcher;
+    }
+
+    @Override
+    public Set<SocketAddress> getHosts() {
+        synchronized (hostSet) {
+            return ImmutableSet.copyOf(hostSet);
+        }
+    }
+
+    @Override
+    public void startService() {
+        start();
+        try {
+            if (!changeLatch.await(1, TimeUnit.MINUTES)) {
+                logger.warn("No serverset change received in 1 minute.");
+            }
+        } catch (InterruptedException e) {
+            logger.warn("Interrupted waiting first serverset change : ", e);
+        }
+        logger.info("{} Routing Service Started.", getClass().getSimpleName());
+    }
+
+    @Override
+    public void stopService() {
+        Thread.currentThread().interrupt();
+        try {
+            join();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.warn("Interrupted on waiting serverset routing service to finish : ", e);
+        }
+        logger.info("{} Routing Service Stopped.", getClass().getSimpleName());
+    }
+
+    @Override
+    public RoutingService registerListener(RoutingListener listener) {
+        listeners.add(listener);
+        return this;
+    }
+
+    @Override
+    public RoutingService unregisterListener(RoutingListener listener) {
+        listeners.remove(listener);
+        return this;
+    }
+
+    @Override
+    public SocketAddress getHost(String key, RoutingContext rContext)
+            throws NoBrokersAvailableException {
+        SocketAddress address = null;
+        synchronized (hostSet) {
+            if (0 != hostList.size()) {
+                int hashCode = hasher.hashUnencodedChars(key).asInt();
+                int hostId = signSafeMod(hashCode, hostList.size());
+                address = hostList.get(hostId);
+                if (rContext.isTriedHost(address)) {
+                    ArrayList<SocketAddress> newList = new ArrayList<SocketAddress>(hostList);
+                    newList.remove(hostId);
+                    // pickup a new host by rehashing it.
+                    hostId = signSafeMod(hashCode, newList.size());
+                    address = newList.get(hostId);
+                    int i = hostId;
+                    while (rContext.isTriedHost(address)) {
+                        i = (i + 1) % newList.size();
+                        if (i == hostId) {
+                            address = null;
+                            break;
+                        }
+                        address = newList.get(i);
+                    }
+                }
+            }
+        }
+        if (null == address) {
+            throw new NoBrokersAvailableException("No host is available.");
+        }
+        return address;
+    }
+
+    @Override
+    public void removeHost(SocketAddress host, Throwable reason) {
+        synchronized (hostSet) {
+            if (hostSet.remove(host)) {
+                logger.info("Node {} left due to : ", host, reason);
+            }
+            hostList = new ArrayList<SocketAddress>(hostSet);
+            Collections.sort(hostList, HostComparator.INSTANCE);
+            logger.info("Host list becomes : {}.", hostList);
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            serverSetWatcher.watch(new ServerSetWatcher.ServerSetMonitor() {
+                @Override
+                public void onChange(ImmutableSet<DLSocketAddress> serviceInstances) {
+                    ImmutableSet<DLSocketAddress> lastValue = serverSetChange.getAndSet(serviceInstances);
+                    if (null == lastValue) {
+                        ImmutableSet<DLSocketAddress> mostRecentValue;
+                        do {
+                            mostRecentValue = serverSetChange.get();
+                            performServerSetChange(mostRecentValue);
+                            changeLatch.countDown();
+                        } while (!serverSetChange.compareAndSet(mostRecentValue, null));
+                    }
+                }
+            });
+        } catch (Exception e) {
+            logger.error("Fail to monitor server set : ", e);
+            Runtime.getRuntime().exit(-1);
+        }
+    }
+
+    protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress> serverSet) {
+        Set<SocketAddress> newSet = new HashSet<SocketAddress>();
+        for (DLSocketAddress serviceInstance : serverSet) {
+            newSet.add(serviceInstance.getSocketAddress());
+        }
+
+        Set<SocketAddress> removed;
+        Set<SocketAddress> added;
+        synchronized (hostSet) {
+            removed = Sets.difference(hostSet, newSet).immutableCopy();
+            added = Sets.difference(newSet, hostSet).immutableCopy();
+            for (SocketAddress node: removed) {
+                if (hostSet.remove(node)) {
+                    logger.info("Node {} left.", node);
+                }
+            }
+            for (SocketAddress node: added) {
+                if (hostSet.add(node)) {
+                    logger.info("Node {} joined.", node);
+                }
+            }
+        }
+
+        for (SocketAddress addr : removed) {
+            for (RoutingListener listener : listeners) {
+                listener.onServerLeft(addr);
+            }
+        }
+
+        for (SocketAddress addr : added) {
+            for (RoutingListener listener : listeners) {
+                listener.onServerJoin(addr);
+            }
+        }
+
+        synchronized (hostSet) {
+            hostList = new ArrayList<SocketAddress>(hostSet);
+            Collections.sort(hostList, HostComparator.INSTANCE);
+            logger.info("Host list becomes : {}.", hostList);
+        }
+
+    }
+
+    static int signSafeMod(long dividend, int divisor) {
+        int mod = (int) (dividend % divisor);
+
+        if (mod < 0) {
+            mod += divisor;
+        }
+
+        return mod;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java
new file mode 100644
index 0000000..77b7beb
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.distributedlog.service.DLSocketAddress;
+
+/**
+ * Watch on server set changes.
+ */
+public interface ServerSetWatcher {
+
+    /**
+     * Exception thrown when failed to monitor serverset.
+     */
+    class MonitorException extends Exception {
+
+        private static final long serialVersionUID = 392751505154339548L;
+
+        public MonitorException(String msg) {
+            super(msg);
+        }
+
+        public MonitorException(String msg, Throwable cause) {
+            super(msg, cause);
+        }
+    }
+
+    /**
+     * An interface to an object that is interested in receiving notification whenever the host set changes.
+     */
+    interface ServerSetMonitor {
+
+        /**
+         * Called when either the available set of services changes.
+         *
+         * <p>It happens either when a service dies or a new INSTANCE comes on-line or
+         * when an existing service advertises a status or health change.
+         *
+         * @param hostSet the current set of available ServiceInstances
+         */
+        void onChange(ImmutableSet<DLSocketAddress> hostSet);
+    }
+
+    /**
+     * Registers a monitor to receive change notices for this server set as long as this jvm process is alive.
+     *
+     * <p>Blocks until the initial server set can be gathered and delivered to the monitor.
+     * The monitor will be notified if the membership set or parameters of existing members have
+     * changed.
+     *
+     * @param monitor the server set monitor to call back when the host set changes
+     * @throws MonitorException if there is a problem monitoring the host set
+     */
+    void watch(final ServerSetMonitor monitor) throws MonitorException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java
new file mode 100644
index 0000000..753a1af
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Sets;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.net.SocketAddress;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * Single Host Routing Service.
+ */
+public class SingleHostRoutingService implements RoutingService {
+
+    public static SingleHostRoutingService of(SocketAddress address) {
+        return new SingleHostRoutingService(address);
+    }
+
+    /**
+     * Builder to build single host based routing service.
+     *
+     * @return builder to build single host based routing service.
+     */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder to build single host based routing service.
+     */
+    public static class Builder implements RoutingService.Builder {
+
+        private SocketAddress address;
+
+        private Builder() {}
+
+        public Builder address(SocketAddress address) {
+            this.address = address;
+            return this;
+        }
+
+        @Override
+        public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) {
+            return this;
+        }
+
+        @Override
+        public RoutingService build() {
+            checkNotNull(address, "Host is null");
+            return new SingleHostRoutingService(address);
+        }
+    }
+
+    private SocketAddress address;
+    private final CopyOnWriteArraySet<RoutingListener> listeners =
+            new CopyOnWriteArraySet<RoutingListener>();
+
+    SingleHostRoutingService(SocketAddress address) {
+        this.address = address;
+    }
+
+    public void setAddress(SocketAddress address) {
+        this.address = address;
+    }
+
+    @Override
+    public Set<SocketAddress> getHosts() {
+        return Sets.newHashSet(address);
+    }
+
+    @Override
+    public void startService() {
+        // no-op
+        for (RoutingListener listener : listeners) {
+            listener.onServerJoin(address);
+        }
+    }
+
+    @Override
+    public void stopService() {
+        // no-op
+    }
+
+    @Override
+    public RoutingService registerListener(RoutingListener listener) {
+        listeners.add(listener);
+        return this;
+    }
+
+    @Override
+    public RoutingService unregisterListener(RoutingListener listener) {
+        listeners.remove(listener);
+        return null;
+    }
+
+    @Override
+    public SocketAddress getHost(String key, RoutingContext rContext)
+            throws NoBrokersAvailableException {
+        if (rContext.isTriedHost(address)) {
+            throw new NoBrokersAvailableException("No hosts is available : routing context = " + rContext);
+        }
+        return address;
+    }
+
+    @Override
+    public void removeHost(SocketAddress address, Throwable reason) {
+        // no-op
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java
new file mode 100644
index 0000000..2fc8de0
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import com.twitter.finagle.Addr;
+import com.twitter.finagle.Address;
+import com.twitter.finagle.Addrs;
+import com.twitter.finagle.Name;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A {@link Name} implementation for testing purpose.
+ */
+public class TestName implements Name {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestName.class);
+
+    private AbstractFunction1<Addr, BoxedUnit> callback = null;
+
+    public void changes(AbstractFunction1<Addr, BoxedUnit> callback) {
+        this.callback = callback;
+    }
+
+    public void changeAddrs(List<Address> addresses) {
+        if (null != callback) {
+            LOG.info("Sending a callback {}", addresses);
+            callback.apply(Addrs.newBoundAddr(addresses));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java
new file mode 100644
index 0000000..1ff7c93
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.service.DLSocketAddress;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import java.net.InetSocketAddress;
+import java.util.Set;
+
+/**
+ * Twitter {@link ServerSet} based watcher.
+ */
+public class TwitterServerSetWatcher implements ServerSetWatcher {
+
+    private final ServerSet serverSet;
+    private final boolean resolvedFromName;
+
+    /**
+     * Construct a {@link ServerSet} based watcher.
+     *
+     * @param serverSet server set.
+     * @param resolvedFromName whether to resolve hosts from {@link com.twitter.finagle.Name}.
+     */
+    public TwitterServerSetWatcher(ServerSet serverSet,
+                                   boolean resolvedFromName) {
+        this.serverSet = serverSet;
+        this.resolvedFromName = resolvedFromName;
+    }
+
+    /**
+     * Registers a monitor to receive change notices for this server set as long as this jvm process is alive.
+     *
+     * <p>Blocks until the initial server set can be gathered and delivered to the monitor.
+     * The monitor will be notified if the membership set or parameters of existing members have
+     * changed.
+     *
+     * @param monitor the server set monitor to call back when the host set changes
+     * @throws MonitorException if there is a problem monitoring the host set
+     */
+    public void watch(final ServerSetMonitor monitor)
+            throws MonitorException {
+        try {
+            serverSet.watch(new DynamicHostSet.HostChangeMonitor<ServiceInstance>() {
+                @Override
+                public void onChange(ImmutableSet<ServiceInstance> serviceInstances) {
+                    Set<DLSocketAddress> dlServers = Sets.newHashSet();
+                    for (ServiceInstance serviceInstance : serviceInstances) {
+                        Endpoint endpoint = serviceInstance.getAdditionalEndpoints().get("thrift");
+                        InetSocketAddress inetAddr =
+                                new InetSocketAddress(endpoint.getHost(), endpoint.getPort());
+                        int shardId = resolvedFromName ? -1 : serviceInstance.getShard();
+                        DLSocketAddress address = new DLSocketAddress(shardId, inetAddr);
+                        dlServers.add(address);
+                    }
+                    monitor.onChange(ImmutableSet.copyOf(dlServers));
+                }
+            });
+        } catch (DynamicHostSet.MonitorException me) {
+            throw new MonitorException("Failed to monitor server set : ", me);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java
new file mode 100644
index 0000000..352d755
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Routing Mechanisms to route the traffic to the owner of streams.
+ */
+package org.apache.distributedlog.client.routing;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java
new file mode 100644
index 0000000..93cdf7a
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.serverset;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.net.HostAndPort;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.zookeeper.ServerSet;
+import com.twitter.common.zookeeper.ServerSets;
+import com.twitter.common.zookeeper.ZooKeeperClient;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper over zookeeper client and its server set.
+ */
+public class DLZkServerSet {
+
+    private static final Logger logger = LoggerFactory.getLogger(DLZkServerSet.class);
+
+    static final String ZNODE_WRITE_PROXY = ".write_proxy";
+
+    private static String getZKServersFromDLUri(URI uri) {
+        return uri.getAuthority().replace(";", ",");
+    }
+
+    private static Iterable<InetSocketAddress> getZkAddresses(URI uri) {
+        String zkServers = getZKServersFromDLUri(uri);
+        String[] zkServerList = StringUtils.split(zkServers, ',');
+        ImmutableList.Builder<InetSocketAddress> builder = ImmutableList.builder();
+        for (String zkServer : zkServerList) {
+            HostAndPort hostAndPort = HostAndPort.fromString(zkServer).withDefaultPort(2181);
+            builder.add(InetSocketAddress.createUnresolved(
+                    hostAndPort.getHostText(),
+                    hostAndPort.getPort()));
+        }
+        return builder.build();
+    }
+
+    public static DLZkServerSet of(URI uri,
+                                   int zkSessionTimeoutMs) {
+        // Create zookeeper and server set
+        String zkPath = uri.getPath() + "/" + ZNODE_WRITE_PROXY;
+        Iterable<InetSocketAddress> zkAddresses = getZkAddresses(uri);
+        ZooKeeperClient zkClient =
+                new ZooKeeperClient(Amount.of(zkSessionTimeoutMs, Time.MILLISECONDS), zkAddresses);
+        ServerSet serverSet = ServerSets.create(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, zkPath);
+        return new DLZkServerSet(zkClient, serverSet);
+    }
+
+    private final ZooKeeperClient zkClient;
+    private final ServerSet zkServerSet;
+
+    public DLZkServerSet(ZooKeeperClient zkClient,
+                         ServerSet zkServerSet) {
+        this.zkClient = zkClient;
+        this.zkServerSet = zkServerSet;
+    }
+
+    public ZooKeeperClient getZkClient() {
+        return zkClient;
+    }
+
+    public ServerSet getServerSet() {
+        return zkServerSet;
+    }
+
+    public void close() {
+        zkClient.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java
new file mode 100644
index 0000000..38a7544
--- /dev/null
+++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utils related to server set.
+ */
+package org.apache.distributedlog.client.serverset;