You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:27 UTC

[20/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
new file mode 100644
index 0000000..86d1c11
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.ownership;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.distributedlog.client.ClientConfig;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test Case for Ownership Cache.
+ */
+public class TestOwnershipCache {
+
+    @Rule
+    public TestName runtime = new TestName();
+
+    private static OwnershipCache createOwnershipCache() {
+        ClientConfig clientConfig = new ClientConfig();
+        return new OwnershipCache(clientConfig, null,
+                                  NullStatsReceiver.get(), NullStatsReceiver.get());
+    }
+
+    private static SocketAddress createSocketAddress(int port) {
+        return new InetSocketAddress("127.0.0.1", port);
+    }
+
+    @Test(timeout = 60000)
+    public void testUpdateOwner() {
+        OwnershipCache cache = createOwnershipCache();
+        SocketAddress addr = createSocketAddress(1000);
+        String stream = runtime.getMethodName();
+
+        assertTrue("Should successfully update owner if no owner exists before",
+                cache.updateOwner(stream, addr));
+        assertEquals("Owner should be " + addr + " for stream " + stream,
+                addr, cache.getOwner(stream));
+        assertTrue("Should successfully update owner if old owner is same",
+                cache.updateOwner(stream, addr));
+        assertEquals("Owner should be " + addr + " for stream " + stream,
+                addr, cache.getOwner(stream));
+    }
+
+    @Test(timeout = 60000)
+    public void testRemoveOwnerFromStream() {
+        OwnershipCache cache = createOwnershipCache();
+        int initialPort = 2000;
+        int numProxies = 2;
+        int numStreamsPerProxy = 2;
+        for (int i = 0; i < numProxies; i++) {
+            SocketAddress addr = createSocketAddress(initialPort + i);
+            for (int j = 0; j < numStreamsPerProxy; j++) {
+                String stream = runtime.getMethodName() + "_" + i + "_" + j;
+                cache.updateOwner(stream, addr);
+            }
+        }
+        Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+                numProxies * numStreamsPerProxy, ownershipMap.size());
+        Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should be " + numProxies + " proxies cached",
+                numProxies, ownershipDistribution.size());
+
+        String stream = runtime.getMethodName() + "_0_0";
+        SocketAddress owner = createSocketAddress(initialPort);
+
+        // remove non-existent mapping won't change anything
+        SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
+        cache.removeOwnerFromStream(stream, nonExistentAddr, "remove-non-existent-addr");
+        assertEquals("Owner " + owner + " should not be removed",
+                owner, cache.getOwner(stream));
+        ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+                numProxies * numStreamsPerProxy, ownershipMap.size());
+
+        // remove existent mapping should remove ownership mapping
+        cache.removeOwnerFromStream(stream, owner, "remove-owner");
+        assertNull("Owner " + owner + " should be removed", cache.getOwner(stream));
+        ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + (numProxies * numStreamsPerProxy - 1) + " entries left in cache",
+                numProxies * numStreamsPerProxy - 1, ownershipMap.size());
+        ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should still be " + numProxies + " proxies cached",
+                numProxies, ownershipDistribution.size());
+        Set<String> ownedStreams = ownershipDistribution.get(owner);
+        assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned for " + owner,
+                numStreamsPerProxy - 1, ownedStreams.size());
+        assertFalse("Stream " + stream + " should not be owned by " + owner,
+                ownedStreams.contains(stream));
+    }
+
+    @Test(timeout = 60000)
+    public void testRemoveAllStreamsFromOwner() {
+        OwnershipCache cache = createOwnershipCache();
+        int initialPort = 2000;
+        int numProxies = 2;
+        int numStreamsPerProxy = 2;
+        for (int i = 0; i < numProxies; i++) {
+            SocketAddress addr = createSocketAddress(initialPort + i);
+            for (int j = 0; j < numStreamsPerProxy; j++) {
+                String stream = runtime.getMethodName() + "_" + i + "_" + j;
+                cache.updateOwner(stream, addr);
+            }
+        }
+        Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+                numProxies * numStreamsPerProxy, ownershipMap.size());
+        Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should be " + numProxies + " proxies cached",
+                numProxies, ownershipDistribution.size());
+
+        SocketAddress owner = createSocketAddress(initialPort);
+
+        // remove non-existent host won't change anything
+        SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
+        cache.removeAllStreamsFromOwner(nonExistentAddr);
+        ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should still be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+                numProxies * numStreamsPerProxy, ownershipMap.size());
+        ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should still be " + numProxies + " proxies cached",
+                numProxies, ownershipDistribution.size());
+
+        // remove existent host should remove ownership mapping
+        cache.removeAllStreamsFromOwner(owner);
+        ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + ((numProxies - 1) * numStreamsPerProxy) + " entries left in cache",
+                (numProxies - 1) * numStreamsPerProxy, ownershipMap.size());
+        ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should be " + (numProxies - 1) + " proxies cached",
+                numProxies - 1, ownershipDistribution.size());
+        assertFalse("Host " + owner + " should not be cached",
+                ownershipDistribution.containsKey(owner));
+    }
+
+    @Test(timeout = 60000)
+    public void testReplaceOwner() {
+        OwnershipCache cache = createOwnershipCache();
+        int initialPort = 2000;
+        int numProxies = 2;
+        int numStreamsPerProxy = 2;
+        for (int i = 0; i < numProxies; i++) {
+            SocketAddress addr = createSocketAddress(initialPort + i);
+            for (int j = 0; j < numStreamsPerProxy; j++) {
+                String stream = runtime.getMethodName() + "_" + i + "_" + j;
+                cache.updateOwner(stream, addr);
+            }
+        }
+        Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+                numProxies * numStreamsPerProxy, ownershipMap.size());
+        Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should be " + numProxies + " proxies cached",
+                numProxies, ownershipDistribution.size());
+
+        String stream = runtime.getMethodName() + "_0_0";
+        SocketAddress oldOwner = createSocketAddress(initialPort);
+        SocketAddress newOwner = createSocketAddress(initialPort + 999);
+
+        cache.updateOwner(stream, newOwner);
+        assertEquals("Owner of " + stream + " should be changed from " + oldOwner + " to " + newOwner,
+                newOwner, cache.getOwner(stream));
+        ownershipMap = cache.getStreamOwnerMapping();
+        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+                numProxies * numStreamsPerProxy, ownershipMap.size());
+        assertEquals("Owner of " + stream + " should be " + newOwner,
+                newOwner, ownershipMap.get(stream));
+        ownershipDistribution = cache.getStreamOwnershipDistribution();
+        assertEquals("There should be " + (numProxies + 1) + " proxies cached",
+                numProxies + 1, ownershipDistribution.size());
+        Set<String> oldOwnedStreams = ownershipDistribution.get(oldOwner);
+        assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + oldOwner,
+                numStreamsPerProxy - 1, oldOwnedStreams.size());
+        assertFalse("Stream " + stream + " should not be owned by " + oldOwner,
+                oldOwnedStreams.contains(stream));
+        Set<String> newOwnedStreams = ownershipDistribution.get(newOwner);
+        assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + newOwner,
+                1, newOwnedStreams.size());
+        assertTrue("Stream " + stream + " should be owned by " + newOwner,
+                newOwnedStreams.contains(stream));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
new file mode 100644
index 0000000..8ef33bd
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import org.apache.distributedlog.thrift.service.BulkWriteResponse;
+import org.apache.distributedlog.thrift.service.ClientInfo;
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import org.apache.distributedlog.thrift.service.HeartbeatOptions;
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import org.apache.distributedlog.thrift.service.WriteContext;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import com.twitter.util.Future;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Mock DistributedLog Related Services.
+ */
+public class MockDistributedLogServices {
+
+    /**
+     * Mock basic service.
+     */
+    static class MockBasicService implements DistributedLogService.ServiceIface {
+
+        @Override
+        public Future<ServerInfo> handshake() {
+            return Future.value(new ServerInfo());
+        }
+
+        @Override
+        public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
+            return Future.value(new ServerInfo());
+        }
+
+        @Override
+        public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> heartbeatWithOptions(String stream,
+                                                          WriteContext ctx,
+                                                          HeartbeatOptions options) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> write(String stream,
+                                           ByteBuffer data) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> writeWithContext(String stream,
+                                                      ByteBuffer data,
+                                                      WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<BulkWriteResponse> writeBulkWithContext(String stream,
+                                                              List<ByteBuffer> data,
+                                                              WriteContext ctx) {
+            return Future.value(new BulkWriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> truncate(String stream,
+                                              String dlsn,
+                                              WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> release(String stream,
+                                             WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> create(String stream, WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> delete(String stream,
+                                            WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<WriteResponse> getOwner(String stream, WriteContext ctx) {
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
+        public Future<Void> setAcceptNewStream(boolean enabled) {
+            return Future.value(null);
+        }
+    }
+
+    /**
+     * Mock server info service.
+     */
+    public static class MockServerInfoService extends MockBasicService {
+
+        protected ServerInfo serverInfo;
+
+        public MockServerInfoService() {
+            serverInfo = new ServerInfo();
+        }
+
+        public void updateServerInfo(ServerInfo serverInfo) {
+            this.serverInfo = serverInfo;
+        }
+
+        @Override
+        public Future<ServerInfo> handshake() {
+            return Future.value(serverInfo);
+        }
+
+        @Override
+        public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
+            return Future.value(serverInfo);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
new file mode 100644
index 0000000..e38c2ed
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Mock Proxy Client Builder.
+ */
+class MockProxyClientBuilder implements ProxyClient.Builder {
+
+    static class MockProxyClient extends ProxyClient {
+        MockProxyClient(SocketAddress address,
+                        DistributedLogService.ServiceIface service) {
+            super(address, new MockThriftClient(), service);
+        }
+    }
+
+    private final ConcurrentMap<SocketAddress, MockProxyClient> clients =
+            new ConcurrentHashMap<SocketAddress, MockProxyClient>();
+
+    public void provideProxyClient(SocketAddress address,
+                                   MockProxyClient proxyClient) {
+        clients.put(address, proxyClient);
+    }
+
+    @Override
+    public ProxyClient build(SocketAddress address) {
+        return clients.get(address);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
new file mode 100644
index 0000000..ad1c878
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import com.twitter.finagle.Service;
+import com.twitter.finagle.thrift.ThriftClientRequest;
+import com.twitter.util.Future;
+
+/**
+ * Mock Thrift Client.
+ */
+class MockThriftClient extends Service<ThriftClientRequest, byte[]> {
+    @Override
+    public Future<byte[]> apply(ThriftClientRequest request) {
+        return Future.value(request.message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
new file mode 100644
index 0000000..6d9a471
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.distributedlog.client.ClientConfig;
+import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockBasicService;
+import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockServerInfoService;
+import org.apache.distributedlog.client.proxy.MockProxyClientBuilder.MockProxyClient;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.client.stats.ClientStats;
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.tuple.Pair;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test Proxy Client Manager.
+ */
+public class TestProxyClientManager {
+
+    @Rule
+    public TestName runtime = new TestName();
+
+    static class TestHostProvider implements HostProvider {
+
+        Set<SocketAddress> hosts = new HashSet<SocketAddress>();
+
+        synchronized void addHost(SocketAddress host) {
+            hosts.add(host);
+        }
+
+        @Override
+        public synchronized Set<SocketAddress> getHosts() {
+            return ImmutableSet.copyOf(hosts);
+        }
+
+    }
+
+    private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder,
+                                                               long periodicHandshakeIntervalMs) {
+        HostProvider provider = new TestHostProvider();
+        return createProxyClientManager(builder, provider, periodicHandshakeIntervalMs);
+    }
+
+    private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder,
+                                                               HostProvider hostProvider,
+                                                               long periodicHandshakeIntervalMs) {
+        ClientConfig clientConfig = new ClientConfig();
+        clientConfig.setPeriodicHandshakeIntervalMs(periodicHandshakeIntervalMs);
+        clientConfig.setPeriodicOwnershipSyncIntervalMs(-1);
+        HashedWheelTimer dlTimer = new HashedWheelTimer(
+                new ThreadFactoryBuilder().setNameFormat("TestProxyClientManager-timer-%d").build(),
+                clientConfig.getRedirectBackoffStartMs(),
+                TimeUnit.MILLISECONDS);
+        return new ProxyClientManager(clientConfig, builder, dlTimer, hostProvider,
+                new ClientStats(NullStatsReceiver.get(), false, new DefaultRegionResolver()));
+    }
+
+    private static SocketAddress createSocketAddress(int port) {
+        return new InetSocketAddress("127.0.0.1", port);
+    }
+
+    private static MockProxyClient createMockProxyClient(SocketAddress address) {
+        return new MockProxyClient(address, new MockBasicService());
+    }
+
+    private static Pair<MockProxyClient, MockServerInfoService> createMockProxyClient(
+            SocketAddress address, ServerInfo serverInfo) {
+        MockServerInfoService service = new MockServerInfoService();
+        MockProxyClient proxyClient = new MockProxyClient(address, service);
+        service.updateServerInfo(serverInfo);
+        return Pair.of(proxyClient, service);
+    }
+
+    @Test(timeout = 60000)
+    public void testBasicCreateRemove() throws Exception {
+        SocketAddress address = createSocketAddress(1000);
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        MockProxyClient mockProxyClient = createMockProxyClient(address);
+        builder.provideProxyClient(address, mockProxyClient);
+
+        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        ProxyClient proxyClient =  clientManager.createClient(address);
+        assertEquals("Create client should build the proxy client",
+                1, clientManager.getNumProxies());
+        assertTrue("The client returned should be the same client that builder built",
+                mockProxyClient == proxyClient);
+    }
+
+    @Test(timeout = 60000)
+    public void testGetShouldCreateClient() throws Exception {
+        SocketAddress address = createSocketAddress(2000);
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        MockProxyClient mockProxyClient = createMockProxyClient(address);
+        builder.provideProxyClient(address, mockProxyClient);
+
+        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        ProxyClient proxyClient =  clientManager.getClient(address);
+        assertEquals("Get client should build the proxy client",
+                1, clientManager.getNumProxies());
+        assertTrue("The client returned should be the same client that builder built",
+                mockProxyClient == proxyClient);
+    }
+
+    @Test(timeout = 60000)
+    public void testConditionalRemoveClient() throws Exception {
+        SocketAddress address = createSocketAddress(3000);
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        MockProxyClient mockProxyClient = createMockProxyClient(address);
+        MockProxyClient anotherMockProxyClient = createMockProxyClient(address);
+        builder.provideProxyClient(address, mockProxyClient);
+
+        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        clientManager.createClient(address);
+        assertEquals("Create client should build the proxy client",
+                1, clientManager.getNumProxies());
+        clientManager.removeClient(address, anotherMockProxyClient);
+        assertEquals("Conditional remove should not remove proxy client",
+                1, clientManager.getNumProxies());
+        clientManager.removeClient(address, mockProxyClient);
+        assertEquals("Conditional remove should remove proxy client",
+                0, clientManager.getNumProxies());
+    }
+
+    @Test(timeout = 60000)
+    public void testRemoveClient() throws Exception {
+        SocketAddress address = createSocketAddress(3000);
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        MockProxyClient mockProxyClient = createMockProxyClient(address);
+        builder.provideProxyClient(address, mockProxyClient);
+
+        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        clientManager.createClient(address);
+        assertEquals("Create client should build the proxy client",
+                1, clientManager.getNumProxies());
+        clientManager.removeClient(address);
+        assertEquals("Remove should remove proxy client",
+                0, clientManager.getNumProxies());
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateClientShouldHandshake() throws Exception {
+        SocketAddress address = createSocketAddress(3000);
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        ServerInfo serverInfo = new ServerInfo();
+        serverInfo.putToOwnerships(runtime.getMethodName() + "_stream",
+                runtime.getMethodName() + "_owner");
+        Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
+                createMockProxyClient(address, serverInfo);
+        builder.provideProxyClient(address, mockProxyClient.getLeft());
+
+        final AtomicReference<ServerInfo> resultHolder = new AtomicReference<ServerInfo>(null);
+        final CountDownLatch doneLatch = new CountDownLatch(1);
+        ProxyListener listener = new ProxyListener() {
+            @Override
+            public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
+                resultHolder.set(serverInfo);
+                doneLatch.countDown();
+            }
+            @Override
+            public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
+            }
+        };
+
+        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+        clientManager.registerProxyListener(listener);
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        clientManager.createClient(address);
+        assertEquals("Create client should build the proxy client",
+                1, clientManager.getNumProxies());
+
+        // When a client is created, it would handshake with that proxy
+        doneLatch.await();
+        assertEquals("Handshake should return server info",
+                serverInfo, resultHolder.get());
+    }
+
+    @Test(timeout = 60000)
+    public void testHandshake() throws Exception {
+        final int numHosts = 3;
+        final int numStreamsPerHost = 3;
+        final int initialPort = 4000;
+
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        Map<SocketAddress, ServerInfo> serverInfoMap =
+                new HashMap<SocketAddress, ServerInfo>();
+        for (int i = 0; i < numHosts; i++) {
+            SocketAddress address = createSocketAddress(initialPort + i);
+            ServerInfo serverInfo = new ServerInfo();
+            for (int j = 0; j < numStreamsPerHost; j++) {
+                serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j,
+                        address.toString());
+            }
+            Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
+                    createMockProxyClient(address, serverInfo);
+            builder.provideProxyClient(address, mockProxyClient.getLeft());
+            serverInfoMap.put(address, serverInfo);
+        }
+
+        final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>();
+        final CountDownLatch doneLatch = new CountDownLatch(2 * numHosts);
+        ProxyListener listener = new ProxyListener() {
+            @Override
+            public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
+                synchronized (results) {
+                    results.put(address, serverInfo);
+                }
+                doneLatch.countDown();
+            }
+
+            @Override
+            public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
+            }
+        };
+
+        TestHostProvider rs = new TestHostProvider();
+        ProxyClientManager clientManager = createProxyClientManager(builder, rs, 0L);
+        clientManager.registerProxyListener(listener);
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        for (int i = 0; i < numHosts; i++) {
+            rs.addHost(createSocketAddress(initialPort + i));
+        }
+        // handshake would handshake with 3 hosts again
+        clientManager.handshake();
+        doneLatch.await();
+        assertEquals("Handshake should return server info",
+                numHosts, results.size());
+        assertTrue("Handshake should get all server infos",
+                Maps.difference(serverInfoMap, results).areEqual());
+    }
+
+    @Test(timeout = 60000)
+    public void testPeriodicHandshake() throws Exception {
+        final int numHosts = 3;
+        final int numStreamsPerHost = 3;
+        final int initialPort = 5000;
+
+        MockProxyClientBuilder builder = new MockProxyClientBuilder();
+        Map<SocketAddress, ServerInfo> serverInfoMap =
+                new HashMap<SocketAddress, ServerInfo>();
+        Map<SocketAddress, MockServerInfoService> mockServiceMap =
+                new HashMap<SocketAddress, MockServerInfoService>();
+        final Map<SocketAddress, CountDownLatch> hostDoneLatches =
+                new HashMap<SocketAddress, CountDownLatch>();
+        for (int i = 0; i < numHosts; i++) {
+            SocketAddress address = createSocketAddress(initialPort + i);
+            ServerInfo serverInfo = new ServerInfo();
+            for (int j = 0; j < numStreamsPerHost; j++) {
+                serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j,
+                        address.toString());
+            }
+            Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
+                    createMockProxyClient(address, serverInfo);
+            builder.provideProxyClient(address, mockProxyClient.getLeft());
+            serverInfoMap.put(address, serverInfo);
+            mockServiceMap.put(address, mockProxyClient.getRight());
+            hostDoneLatches.put(address, new CountDownLatch(2));
+        }
+
+        final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>();
+        final CountDownLatch doneLatch = new CountDownLatch(numHosts);
+        ProxyListener listener = new ProxyListener() {
+            @Override
+            public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
+                synchronized (results) {
+                    results.put(address, serverInfo);
+                    CountDownLatch latch = hostDoneLatches.get(address);
+                    if (null != latch) {
+                        latch.countDown();
+                    }
+                }
+                doneLatch.countDown();
+            }
+
+            @Override
+            public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
+            }
+        };
+
+        TestHostProvider rs = new TestHostProvider();
+        ProxyClientManager clientManager = createProxyClientManager(builder, rs, 50L);
+        clientManager.setPeriodicHandshakeEnabled(false);
+        clientManager.registerProxyListener(listener);
+
+        assertEquals("There should be no clients in the manager",
+                0, clientManager.getNumProxies());
+        for (int i = 0; i < numHosts; i++) {
+            SocketAddress address = createSocketAddress(initialPort + i);
+            rs.addHost(address);
+            clientManager.createClient(address);
+        }
+
+        // make sure the first 3 handshakes going through
+        doneLatch.await();
+
+        assertEquals("Handshake should return server info",
+                numHosts, results.size());
+        assertTrue("Handshake should get all server infos",
+                Maps.difference(serverInfoMap, results).areEqual());
+
+        // update server info
+        for (int i = 0; i < numHosts; i++) {
+            SocketAddress address = createSocketAddress(initialPort + i);
+            ServerInfo serverInfo = new ServerInfo();
+            for (int j = 0; j < numStreamsPerHost; j++) {
+                serverInfo.putToOwnerships(runtime.getMethodName() + "_new_stream_" + j,
+                        address.toString());
+            }
+            MockServerInfoService service = mockServiceMap.get(address);
+            serverInfoMap.put(address, serverInfo);
+            service.updateServerInfo(serverInfo);
+        }
+
+        clientManager.setPeriodicHandshakeEnabled(true);
+        for (int i = 0; i < numHosts; i++) {
+            SocketAddress address = createSocketAddress(initialPort + i);
+            CountDownLatch latch = hostDoneLatches.get(address);
+            latch.await();
+        }
+
+        assertTrue("Periodic handshake should update all server infos",
+                Maps.difference(serverInfoMap, results).areEqual());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
new file mode 100644
index 0000000..f44cddd
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.service.DLSocketAddress;
+import com.twitter.finagle.Address;
+import com.twitter.finagle.Addresses;
+import com.twitter.finagle.ChannelWriteException;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link ConsistentHashRoutingService}.
+ */
+public class TestConsistentHashRoutingService {
+
+    @Test(timeout = 60000)
+    public void testBlackoutHost() throws Exception {
+        TestName name = new TestName();
+        RoutingService routingService = ConsistentHashRoutingService.newBuilder()
+                .serverSet(new NameServerSet(name))
+                .resolveFromName(true)
+                .numReplicas(997)
+                .blackoutSeconds(2)
+                .build();
+
+        InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 3181);
+        Address address = Addresses.newInetAddress(inetAddress);
+        List<Address> addresses = new ArrayList<Address>(1);
+        addresses.add(address);
+        name.changeAddrs(addresses);
+
+        routingService.startService();
+
+        RoutingService.RoutingContext routingContext =
+                RoutingService.RoutingContext.of(new DefaultRegionResolver());
+
+        String streamName = "test-blackout-host";
+        assertEquals(inetAddress, routingService.getHost(streamName, routingContext));
+        routingService.removeHost(inetAddress, new ChannelWriteException(new IOException("test exception")));
+        try {
+            routingService.getHost(streamName, routingContext);
+            fail("Should fail to get host since no brokers are available");
+        } catch (NoBrokersAvailableException nbae) {
+            // expected
+        }
+
+        TimeUnit.SECONDS.sleep(3);
+        assertEquals(inetAddress, routingService.getHost(streamName, routingContext));
+
+        routingService.stopService();
+    }
+
+    @Test(timeout = 60000)
+    public void testPerformServerSetChangeOnName() throws Exception {
+        TestName name = new TestName();
+        ConsistentHashRoutingService routingService = (ConsistentHashRoutingService)
+                ConsistentHashRoutingService.newBuilder()
+                        .serverSet(new NameServerSet(name))
+                        .resolveFromName(true)
+                        .numReplicas(997)
+                        .build();
+
+        int basePort = 3180;
+        int numHosts = 4;
+        List<Address> addresses1 = Lists.newArrayListWithExpectedSize(4);
+        List<Address> addresses2 = Lists.newArrayListWithExpectedSize(4);
+        List<Address> addresses3 = Lists.newArrayListWithExpectedSize(4);
+
+        // fill up the addresses1
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            Address address = Addresses.newInetAddress(inetAddress);
+            addresses1.add(address);
+        }
+        // fill up the addresses2 - overlap with addresses1
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 2 + i);
+            Address address = Addresses.newInetAddress(inetAddress);
+            addresses2.add(address);
+        }
+        // fill up the addresses3 - not overlap with addresses2
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i);
+            Address address = Addresses.newInetAddress(inetAddress);
+            addresses3.add(address);
+        }
+
+        final List<SocketAddress> leftAddresses = Lists.newArrayList();
+        final List<SocketAddress> joinAddresses = Lists.newArrayList();
+
+        RoutingService.RoutingListener routingListener = new RoutingService.RoutingListener() {
+            @Override
+            public void onServerLeft(SocketAddress address) {
+                synchronized (leftAddresses) {
+                    leftAddresses.add(address);
+                    leftAddresses.notifyAll();
+                }
+            }
+
+            @Override
+            public void onServerJoin(SocketAddress address) {
+                synchronized (joinAddresses) {
+                    joinAddresses.add(address);
+                    joinAddresses.notifyAll();
+                }
+            }
+        };
+
+        routingService.registerListener(routingListener);
+        name.changeAddrs(addresses1);
+
+        routingService.startService();
+
+        synchronized (joinAddresses) {
+            while (joinAddresses.size() < numHosts) {
+                joinAddresses.wait();
+            }
+        }
+
+        // validate 4 nodes joined
+        synchronized (joinAddresses) {
+            assertEquals(numHosts, joinAddresses.size());
+        }
+        synchronized (leftAddresses) {
+            assertEquals(0, leftAddresses.size());
+        }
+        assertEquals(numHosts, routingService.shardId2Address.size());
+        assertEquals(numHosts, routingService.address2ShardId.size());
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+
+        // update addresses2 - 2 new hosts joined, 2 old hosts left
+        name.changeAddrs(addresses2);
+        synchronized (joinAddresses) {
+            while (joinAddresses.size() < numHosts + 2) {
+                joinAddresses.wait();
+            }
+        }
+        synchronized (leftAddresses) {
+            while (leftAddresses.size() < numHosts - 2) {
+                leftAddresses.wait();
+            }
+        }
+        assertEquals(numHosts, routingService.shardId2Address.size());
+        assertEquals(numHosts, routingService.address2ShardId.size());
+
+        // first 2 shards should leave
+        for (int i = 0; i < 2; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            assertFalse(routingService.address2ShardId.containsKey(inetAddress));
+        }
+
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 2 + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+
+        // update addresses3 - 2 new hosts joined, 2 old hosts left
+        name.changeAddrs(addresses3);
+        synchronized (joinAddresses) {
+            while (joinAddresses.size() < numHosts + 2 + numHosts) {
+                joinAddresses.wait();
+            }
+        }
+        synchronized (leftAddresses) {
+            while (leftAddresses.size() < numHosts - 2 + numHosts) {
+                leftAddresses.wait();
+            }
+        }
+        assertEquals(numHosts, routingService.shardId2Address.size());
+        assertEquals(numHosts, routingService.address2ShardId.size());
+
+        // first 6 shards should leave
+        for (int i = 0; i < 2 + numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            assertFalse(routingService.address2ShardId.containsKey(inetAddress));
+        }
+        // new 4 shards should exist
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+
+    }
+
+    private static class TestServerSetWatcher implements ServerSetWatcher {
+
+        final LinkedBlockingQueue<ImmutableSet<DLSocketAddress>> changeQueue =
+                new LinkedBlockingQueue<ImmutableSet<DLSocketAddress>>();
+        final CopyOnWriteArrayList<ServerSetMonitor> monitors =
+                new CopyOnWriteArrayList<ServerSetMonitor>();
+
+        @Override
+        public void watch(ServerSetMonitor monitor) throws MonitorException {
+            monitors.add(monitor);
+            ImmutableSet<DLSocketAddress> change;
+            while ((change = changeQueue.poll()) != null) {
+                notifyChanges(change);
+            }
+        }
+
+        void notifyChanges(ImmutableSet<DLSocketAddress> addresses) {
+            if (monitors.isEmpty()) {
+                changeQueue.add(addresses);
+            } else {
+                for (ServerSetMonitor monitor : monitors) {
+                    monitor.onChange(addresses);
+                }
+            }
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testPerformServerSetChangeOnServerSet() throws Exception {
+        TestServerSetWatcher serverSetWatcher = new TestServerSetWatcher();
+        ConsistentHashRoutingService routingService = new ConsistentHashRoutingService(
+                serverSetWatcher, 997, Integer.MAX_VALUE, NullStatsReceiver.get());
+
+        int basePort = 3180;
+        int numHosts = 4;
+        Set<DLSocketAddress> addresses1 = Sets.newConcurrentHashSet();
+        Set<DLSocketAddress> addresses2 = Sets.newConcurrentHashSet();
+        Set<DLSocketAddress> addresses3 = Sets.newConcurrentHashSet();
+
+        // fill up the addresses1
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            DLSocketAddress dsa = new DLSocketAddress(i, inetAddress);
+            addresses1.add(dsa);
+        }
+        // fill up the addresses2 - overlap with addresses1
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + i);
+            DLSocketAddress dsa = new DLSocketAddress(i + 2, inetAddress);
+            addresses2.add(dsa);
+        }
+        // fill up the addresses3 - not overlap with addresses2
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i);
+            DLSocketAddress dsa = new DLSocketAddress(i, inetAddress);
+            addresses3.add(dsa);
+        }
+
+        final List<SocketAddress> leftAddresses = Lists.newArrayList();
+        final List<SocketAddress> joinAddresses = Lists.newArrayList();
+
+        RoutingService.RoutingListener routingListener = new RoutingService.RoutingListener() {
+            @Override
+            public void onServerLeft(SocketAddress address) {
+                synchronized (leftAddresses) {
+                    leftAddresses.add(address);
+                    leftAddresses.notifyAll();
+                }
+            }
+
+            @Override
+            public void onServerJoin(SocketAddress address) {
+                synchronized (joinAddresses) {
+                    joinAddresses.add(address);
+                    joinAddresses.notifyAll();
+                }
+            }
+        };
+
+        routingService.registerListener(routingListener);
+        serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses1));
+
+        routingService.startService();
+
+        synchronized (joinAddresses) {
+            while (joinAddresses.size() < numHosts) {
+                joinAddresses.wait();
+            }
+        }
+
+        // validate 4 nodes joined
+        synchronized (joinAddresses) {
+            assertEquals(numHosts, joinAddresses.size());
+        }
+        synchronized (leftAddresses) {
+            assertEquals(0, leftAddresses.size());
+        }
+        assertEquals(numHosts, routingService.shardId2Address.size());
+        assertEquals(numHosts, routingService.address2ShardId.size());
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            assertEquals(i, shardId);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+
+        // update addresses2 - 2 new hosts joined, 2 old hosts left
+        serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses2));
+        synchronized (joinAddresses) {
+            while (joinAddresses.size() < numHosts + 2) {
+                joinAddresses.wait();
+            }
+        }
+        synchronized (leftAddresses) {
+            while (leftAddresses.size() < 2) {
+                leftAddresses.wait();
+            }
+        }
+
+        assertEquals(numHosts + 2, routingService.shardId2Address.size());
+        assertEquals(numHosts + 2, routingService.address2ShardId.size());
+        // first 2 shards should not leave
+        for (int i = 0; i < 2; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            assertEquals(i, shardId);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            assertEquals(i + 2, shardId);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+
+        // update addresses3
+        serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses3));
+        synchronized (joinAddresses) {
+            while (joinAddresses.size() < numHosts + 2 + numHosts) {
+                joinAddresses.wait();
+            }
+        }
+        synchronized (leftAddresses) {
+            while (leftAddresses.size() < 2 + numHosts) {
+                leftAddresses.wait();
+            }
+        }
+        assertEquals(numHosts + 2, routingService.shardId2Address.size());
+        assertEquals(numHosts + 2, routingService.address2ShardId.size());
+
+        // first 4 shards should leave
+        for (int i = 0; i < numHosts; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            assertEquals(i, shardId);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+        // the other 2 shards should be still there
+        for (int i = 0; i < 2; i++) {
+            InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + 2 + i);
+            assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+            int shardId = routingService.address2ShardId.get(inetAddress);
+            assertEquals(numHosts + i, shardId);
+            SocketAddress sa = routingService.shardId2Address.get(shardId);
+            assertNotNull(sa);
+            assertEquals(inetAddress, sa);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java
new file mode 100644
index 0000000..59665b9
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for `inet` name resolution.
+ */
+public class TestInetNameResolution {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestRoutingService.class);
+
+    @Test(timeout = 10000)
+    public void testInetNameResolution() throws Exception {
+        String nameStr = "inet!127.0.0.1:3181";
+        final CountDownLatch resolved = new CountDownLatch(1);
+        final AtomicBoolean validationFailed = new AtomicBoolean(false);
+
+        NameServerSet serverSet = new NameServerSet(nameStr);
+        serverSet.watch(new DynamicHostSet.HostChangeMonitor<ServiceInstance>() {
+            @Override
+            public void onChange(ImmutableSet<ServiceInstance> hostSet) {
+                if (hostSet.size() > 1) {
+                    logger.error("HostSet has more elements than expected {}", hostSet);
+                    validationFailed.set(true);
+                    resolved.countDown();
+                } else if (hostSet.size() == 1) {
+                    ServiceInstance serviceInstance = hostSet.iterator().next();
+                    Endpoint endpoint = serviceInstance.getAdditionalEndpoints().get("thrift");
+                    InetSocketAddress address = new InetSocketAddress(endpoint.getHost(), endpoint.getPort());
+                    if (endpoint.getPort() != 3181) {
+                        logger.error("Port does not match the expected port {}", endpoint.getPort());
+                        validationFailed.set(true);
+                    } else if (!address.getAddress().getHostAddress().equals("127.0.0.1")) {
+                        logger.error("Host address does not match the expected address {}",
+                            address.getAddress().getHostAddress());
+                        validationFailed.set(true);
+                    }
+                    resolved.countDown();
+                }
+            }
+        });
+
+        resolved.await();
+        Assert.assertEquals(false, validationFailed.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java
new file mode 100644
index 0000000..151663e
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Sets;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import com.twitter.finagle.NoBrokersAvailableException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link RegionsRoutingService}.
+ */
+public class TestRegionsRoutingService {
+
+    @Test(timeout = 60000)
+    public void testRoutingListener() throws Exception {
+        int numRoutingServices = 5;
+        RoutingService.Builder[] routingServiceBuilders = new RoutingService.Builder[numRoutingServices];
+        Set<SocketAddress> hosts = new HashSet<SocketAddress>();
+        Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>();
+        for (int i = 0; i < numRoutingServices; i++) {
+            String finagleNameStr = "inet!127.0.0.1:" + (3181 + i);
+            routingServiceBuilders[i] = RoutingUtils.buildRoutingService(finagleNameStr);
+            SocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + i);
+            hosts.add(address);
+            regionMap.put(address, "region-" + i);
+        }
+
+        final CountDownLatch doneLatch = new CountDownLatch(numRoutingServices);
+        final AtomicInteger numHostsLeft = new AtomicInteger(0);
+        final Set<SocketAddress> jointHosts = new HashSet<SocketAddress>();
+        RegionsRoutingService regionsRoutingService =
+                RegionsRoutingService.newBuilder()
+                    .routingServiceBuilders(routingServiceBuilders)
+                    .resolver(new DefaultRegionResolver(regionMap))
+                    .build();
+        regionsRoutingService.registerListener(new RoutingService.RoutingListener() {
+            @Override
+            public void onServerLeft(SocketAddress address) {
+                numHostsLeft.incrementAndGet();
+            }
+
+            @Override
+            public void onServerJoin(SocketAddress address) {
+                jointHosts.add(address);
+                doneLatch.countDown();
+            }
+        });
+
+        regionsRoutingService.startService();
+
+        doneLatch.await();
+
+        assertEquals(numRoutingServices, jointHosts.size());
+        assertEquals(0, numHostsLeft.get());
+        assertTrue(Sets.difference(hosts, jointHosts).immutableCopy().isEmpty());
+    }
+
+    @Test(timeout = 60000)
+    public void testGetHost() throws Exception {
+        int numRoutingServices = 3;
+        RoutingService.Builder[] routingServiceBuilders = new RoutingService.Builder[numRoutingServices];
+        Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>();
+        for (int i = 0; i < numRoutingServices; i++) {
+            String finagleNameStr = "inet!127.0.0.1:" + (3181 + i);
+            routingServiceBuilders[i] = RoutingUtils.buildRoutingService(finagleNameStr);
+            SocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + i);
+            regionMap.put(address, "region-" + i);
+        }
+
+        RegionsRoutingService regionsRoutingService =
+                RegionsRoutingService.newBuilder()
+                    .resolver(new DefaultRegionResolver(regionMap))
+                    .routingServiceBuilders(routingServiceBuilders)
+                    .build();
+        regionsRoutingService.startService();
+
+        RoutingService.RoutingContext routingContext =
+                RoutingService.RoutingContext.of(new DefaultRegionResolver())
+                        .addTriedHost(new InetSocketAddress("127.0.0.1", 3183), StatusCode.WRITE_EXCEPTION);
+        assertEquals(new InetSocketAddress("127.0.0.1", 3181),
+                regionsRoutingService.getHost("any", routingContext));
+
+        routingContext =
+                RoutingService.RoutingContext.of(new DefaultRegionResolver())
+                        .addTriedHost(new InetSocketAddress("127.0.0.1", 3181), StatusCode.WRITE_EXCEPTION);
+        assertEquals(new InetSocketAddress("127.0.0.1", 3182),
+                regionsRoutingService.getHost("any", routingContext));
+
+        // add 3182 to routing context as tried host
+        routingContext.addTriedHost(new InetSocketAddress("127.0.0.1", 3182), StatusCode.WRITE_EXCEPTION);
+        assertEquals(new InetSocketAddress("127.0.0.1", 3183),
+                regionsRoutingService.getHost("any", routingContext));
+
+        // add 3183 to routing context as tried host
+        routingContext.addTriedHost(new InetSocketAddress("127.0.0.1", 3183), StatusCode.WRITE_EXCEPTION);
+        try {
+            regionsRoutingService.getHost("any", routingContext);
+            fail("Should fail to get host since all regions are tried.");
+        } catch (NoBrokersAvailableException nbae) {
+            // expected
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java
new file mode 100644
index 0000000..d2d61a9
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import com.twitter.finagle.Address;
+import com.twitter.finagle.Addresses;
+import com.twitter.finagle.addr.WeightedAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for {@link RoutingService}.
+ */
+@RunWith(Parameterized.class)
+public class TestRoutingService {
+
+    static final Logger LOG = LoggerFactory.getLogger(TestRoutingService.class);
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> configs() {
+        ArrayList<Object[]> list = new ArrayList<Object[]>();
+        for (int i = 0; i <= 1; i++) {
+            for (int j = 0; j <= 1; j++) {
+                for (int k = 0; k <= 1; k++) {
+                    list.add(new Boolean[] {i == 1, j == 1, k == 1});
+                }
+            }
+        }
+        return list;
+    }
+
+    private final boolean consistentHash;
+    private final boolean weightedAddresses;
+    private final boolean asyncResolution;
+
+    public TestRoutingService(boolean consistentHash, boolean weightedAddresses, boolean asyncResolution) {
+        this.consistentHash = consistentHash;
+        this.weightedAddresses = weightedAddresses;
+        this.asyncResolution = asyncResolution;
+    }
+
+    private List<Address> getAddresses(boolean weightedAddresses) {
+        ArrayList<Address> addresses = new ArrayList<Address>();
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.1", 3181)));
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.2", 3181)));
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.3", 3181)));
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.4", 3181)));
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.5", 3181)));
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.6", 3181)));
+        addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.7", 3181)));
+
+        if (weightedAddresses) {
+            ArrayList<Address> wAddresses = new ArrayList<Address>();
+            for (Address address: addresses) {
+                wAddresses.add(WeightedAddress.apply(address, 1.0));
+            }
+            return wAddresses;
+        } else {
+            return addresses;
+        }
+    }
+
+    private void testRoutingServiceHelper(boolean consistentHash,
+                                          boolean weightedAddresses,
+                                          boolean asyncResolution)
+        throws Exception {
+        ExecutorService executorService = null;
+        final List<Address> addresses = getAddresses(weightedAddresses);
+        final TestName name = new TestName();
+        RoutingService routingService;
+        if (consistentHash) {
+            routingService = ConsistentHashRoutingService.newBuilder()
+                    .serverSet(new NameServerSet(name))
+                    .resolveFromName(true)
+                    .numReplicas(997)
+                    .build();
+        } else {
+            routingService = ServerSetRoutingService.newServerSetRoutingServiceBuilder()
+                    .serverSetWatcher(new TwitterServerSetWatcher(new NameServerSet(name), true)).build();
+        }
+
+        if (asyncResolution) {
+            executorService = Executors.newSingleThreadExecutor();
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    name.changeAddrs(addresses);
+                }
+            });
+        } else {
+            name.changeAddrs(addresses);
+        }
+        routingService.startService();
+
+        HashSet<SocketAddress> mapping = new HashSet<SocketAddress>();
+
+        for (int i = 0; i < 1000; i++) {
+            for (int j = 0; j < 5; j++) {
+                String stream = "TestStream-" + i + "-" + j;
+                mapping.add(routingService.getHost(stream,
+                        RoutingService.RoutingContext.of(new DefaultRegionResolver())));
+            }
+        }
+
+        assertEquals(mapping.size(), addresses.size());
+
+        if (null != executorService) {
+            executorService.shutdown();
+        }
+
+    }
+
+    @Test(timeout = 5000)
+    public void testRoutingService() throws Exception {
+        testRoutingServiceHelper(this.consistentHash, this.weightedAddresses, this.asyncResolution);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
new file mode 100644
index 0000000..ab0cb58
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.speculative;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import com.twitter.util.CountDownLatch;
+import com.twitter.util.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test {@link TestDefaultSpeculativeRequestExecutionPolicy}.
+ */
+public class TestDefaultSpeculativeRequestExecutionPolicy {
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testInvalidBackoffMultiplier() throws Exception {
+        new DefaultSpeculativeRequestExecutionPolicy(100, 200, -1);
+    }
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testInvalidMaxSpeculativeTimeout() throws Exception {
+        new DefaultSpeculativeRequestExecutionPolicy(100, Integer.MAX_VALUE, 2);
+    }
+
+    @Test(timeout = 20000)
+    public void testSpeculativeRequests() throws Exception {
+        DefaultSpeculativeRequestExecutionPolicy policy =
+                new DefaultSpeculativeRequestExecutionPolicy(10, 10000, 2);
+        SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class);
+
+        final AtomicInteger callCount = new AtomicInteger(0);
+        final CountDownLatch latch = new CountDownLatch(3);
+
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                try {
+                    return Future.value(callCount.incrementAndGet() < 3);
+                } finally {
+                    latch.countDown();
+                }
+            }
+        }).when(executor).issueSpeculativeRequest();
+
+        ScheduledExecutorService executorService =
+                Executors.newSingleThreadScheduledExecutor();
+        policy.initiateSpeculativeRequest(executorService, executor);
+
+        latch.await();
+
+        assertEquals(40, policy.getNextSpeculativeRequestTimeout());
+    }
+
+    @Test(timeout = 20000)
+    public void testSpeculativeRequestsWithMaxTimeout() throws Exception {
+        DefaultSpeculativeRequestExecutionPolicy policy =
+                new DefaultSpeculativeRequestExecutionPolicy(10, 15, 2);
+        SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class);
+
+        final AtomicInteger callCount = new AtomicInteger(0);
+        final CountDownLatch latch = new CountDownLatch(3);
+
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                try {
+                    return Future.value(callCount.incrementAndGet() < 3);
+                } finally {
+                    latch.countDown();
+                }
+            }
+        }).when(executor).issueSpeculativeRequest();
+
+        ScheduledExecutorService executorService =
+                Executors.newSingleThreadScheduledExecutor();
+        policy.initiateSpeculativeRequest(executorService, executor);
+
+        latch.await();
+
+        assertEquals(15, policy.getNextSpeculativeRequestTimeout());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java
new file mode 100644
index 0000000..d2df9a5
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static org.junit.Assert.assertFalse;
+
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration;
+import org.junit.Test;
+
+/**
+ * Test Case of {@link org.apache.distributedlog.service.DistributedLogClientBuilder}.
+ */
+public class TestDistributedLogClientBuilder {
+
+    @Test(timeout = 60000)
+    public void testBuildClientsFromSameBuilder() throws Exception {
+        DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder()
+                .name("build-clients-from-same-builder")
+                .clientId(ClientId$.MODULE$.apply("test-builder"))
+                .finagleNameStr("inet!127.0.0.1:7001")
+                .streamNameRegex(".*")
+                .handshakeWithClientInfo(true)
+                .clientBuilder(ClientBuilder.get()
+                    .hostConnectionLimit(1)
+                    .connectTimeout(Duration.fromSeconds(1))
+                    .tcpConnectTimeout(Duration.fromSeconds(1))
+                    .requestTimeout(Duration.fromSeconds(10)));
+        DistributedLogClient client1 = builder.build();
+        DistributedLogClient client2 = builder.build();
+        assertFalse(client1 == client2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/resources/log4j.properties b/distributedlog-proxy-client/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3e51059
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/resources/log4j.properties
@@ -0,0 +1,51 @@
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+#
+# DisributedLog Logging Configuration
+#
+
+# Example with rolling log file
+log4j.rootLogger=INFO, CONSOLE
+
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+#Set the bookkeeper level to warning
+log4j.logger.org.apache.bookkeeper=INFO
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Add ROLLINGFILE to rootLogger to get log file output
+#    Log DEBUG level and above messages to a log file
+#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+#log4j.appender.ROLLINGFILE.Threshold=INFO
+#log4j.appender.ROLLINGFILE.File=distributedlog.log
+#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm
+#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.Threshold=TRACE
+log4j.appender.R.File=target/error.log
+log4j.appender.R.MaxFileSize=200MB
+log4j.appender.R.MaxBackupIndex=7
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n