You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:33 UTC

[26/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
deleted file mode 100644
index 0e2a152..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
+++ /dev/null
@@ -1,608 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.client.ClientConfig;
-import org.apache.distributedlog.client.DistributedLogClientImpl;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.client.proxy.ClusterClient;
-import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
-import org.apache.distributedlog.client.resolver.RegionResolver;
-import org.apache.distributedlog.client.routing.RegionsRoutingService;
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.client.routing.RoutingUtils;
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import com.twitter.finagle.Name;
-import com.twitter.finagle.Resolver$;
-import com.twitter.finagle.Service;
-import com.twitter.finagle.ThriftMux;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientId;
-import com.twitter.finagle.thrift.ThriftClientFramedCodec;
-import com.twitter.finagle.thrift.ThriftClientRequest;
-import com.twitter.util.Duration;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.Random;
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-
-/**
- * Builder to build {@link DistributedLogClient}.
- */
-public final class DistributedLogClientBuilder {
-
-    private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientBuilder.class);
-
-    private static final Random random = new Random(System.currentTimeMillis());
-
-    private String name = null;
-    private ClientId clientId = null;
-    private RoutingService.Builder routingServiceBuilder = null;
-    private ClientBuilder clientBuilder = null;
-    private String serverRoutingServiceFinagleName = null;
-    private StatsReceiver statsReceiver = new NullStatsReceiver();
-    private StatsReceiver streamStatsReceiver = new NullStatsReceiver();
-    private ClientConfig clientConfig = new ClientConfig();
-    private boolean enableRegionStats = false;
-    private final RegionResolver regionResolver = new DefaultRegionResolver();
-
-    /**
-     * Create a client builder.
-     *
-     * @return client builder
-     */
-    public static DistributedLogClientBuilder newBuilder() {
-        return new DistributedLogClientBuilder();
-    }
-
-    /**
-     * Create a new client builder from an existing {@code builder}.
-     *
-     * @param builder the existing builder.
-     * @return a new client builder.
-     */
-    public static DistributedLogClientBuilder newBuilder(DistributedLogClientBuilder builder) {
-        DistributedLogClientBuilder newBuilder = new DistributedLogClientBuilder();
-        newBuilder.name = builder.name;
-        newBuilder.clientId = builder.clientId;
-        newBuilder.clientBuilder = builder.clientBuilder;
-        newBuilder.routingServiceBuilder = builder.routingServiceBuilder;
-        newBuilder.statsReceiver = builder.statsReceiver;
-        newBuilder.streamStatsReceiver = builder.streamStatsReceiver;
-        newBuilder.enableRegionStats = builder.enableRegionStats;
-        newBuilder.serverRoutingServiceFinagleName = builder.serverRoutingServiceFinagleName;
-        newBuilder.clientConfig = ClientConfig.newConfig(builder.clientConfig);
-        return newBuilder;
-    }
-
-    // private constructor
-    private DistributedLogClientBuilder() {}
-
-    /**
-     * Client Name.
-     *
-     * @param name
-     *          client name
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder name(String name) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.name = name;
-        return newBuilder;
-    }
-
-    /**
-     * Client ID.
-     *
-     * @param clientId
-     *          client id
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder clientId(ClientId clientId) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientId = clientId;
-        return newBuilder;
-    }
-
-    /**
-     * Serverset to access proxy services.
-     *
-     * @param serverSet
-     *          server set.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder serverSet(ServerSet serverSet) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(serverSet);
-        newBuilder.enableRegionStats = false;
-        return newBuilder;
-    }
-
-    /**
-     * Server Sets to access proxy services.
-     *
-     * <p>The <i>local</i> server set will be tried first then <i>remotes</i>.
-     *
-     * @param local local server set.
-     * @param remotes remote server sets.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder serverSets(ServerSet local, ServerSet...remotes) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1];
-        builders[0] = RoutingUtils.buildRoutingService(local);
-        for (int i = 1; i < builders.length; i++) {
-            builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]);
-        }
-        newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder()
-                .resolver(regionResolver)
-                .routingServiceBuilders(builders);
-        newBuilder.enableRegionStats = remotes.length > 0;
-        return newBuilder;
-    }
-
-    /**
-     * Name to access proxy services.
-     *
-     * @param finagleNameStr
-     *          finagle name string.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder finagleNameStr(String finagleNameStr) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr);
-        newBuilder.enableRegionStats = false;
-        return newBuilder;
-    }
-
-    /**
-     * Finagle name strs to access proxy services.
-     *
-     * <p>The <i>local</i> finalge name str will be tried first, then <i>remotes</i>.
-     *
-     * @param local local server set.
-     * @param remotes remote server sets.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder finagleNameStrs(String local, String...remotes) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1];
-        builders[0] = RoutingUtils.buildRoutingService(local);
-        for (int i = 1; i < builders.length; i++) {
-            builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]);
-        }
-        newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder()
-                .routingServiceBuilders(builders)
-                .resolver(regionResolver);
-        newBuilder.enableRegionStats = remotes.length > 0;
-        return newBuilder;
-    }
-
-    /**
-     * URI to access proxy services.
-     *
-     * <p>Assuming the write proxies are announced under `.write_proxy` of the provided namespace uri.
-     * The builder will convert the dl uri (e.g. distributedlog://{zkserver}/path/to/namespace) to
-     * zookeeper serverset based finagle name str (`zk!{zkserver}!/path/to/namespace/.write_proxy`)
-     *
-     * @param uri namespace uri to access the serverset of write proxies
-     * @return distributedlog builder
-     */
-    public DistributedLogClientBuilder uri(URI uri) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        String zkServers = uri.getAuthority().replace(";", ",");
-        String[] zkServerList = StringUtils.split(zkServers, ',');
-        String finagleNameStr = String.format(
-                "zk!%s!%s/.write_proxy",
-                zkServerList[random.nextInt(zkServerList.length)], // zk server
-                uri.getPath());
-        newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr);
-        newBuilder.enableRegionStats = false;
-        return newBuilder;
-    }
-
-    /**
-     * Address of write proxy to connect.
-     *
-     * @param address
-     *          write proxy address.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder host(SocketAddress address) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(address);
-        newBuilder.enableRegionStats = false;
-        return newBuilder;
-    }
-
-    private DistributedLogClientBuilder routingServiceBuilder(RoutingService.Builder builder) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.routingServiceBuilder = builder;
-        newBuilder.enableRegionStats = false;
-        return newBuilder;
-    }
-
-    /**
-     * Routing Service to access proxy services.
-     *
-     * @param routingService
-     *          routing service
-     * @return client builder.
-     */
-    @VisibleForTesting
-    public DistributedLogClientBuilder routingService(RoutingService routingService) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(routingService);
-        newBuilder.enableRegionStats = false;
-        return newBuilder;
-    }
-
-    /**
-     * Stats receiver to expose client stats.
-     *
-     * @param statsReceiver
-     *          stats receiver.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder statsReceiver(StatsReceiver statsReceiver) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.statsReceiver = statsReceiver;
-        return newBuilder;
-    }
-
-    /**
-     * Stream Stats Receiver to expose per stream stats.
-     *
-     * @param streamStatsReceiver
-     *          stream stats receiver
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder streamStatsReceiver(StatsReceiver streamStatsReceiver) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.streamStatsReceiver = streamStatsReceiver;
-        return newBuilder;
-    }
-
-    /**
-     * Set underlying finagle client builder.
-     *
-     * @param builder
-     *          finagle client builder.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder clientBuilder(ClientBuilder builder) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientBuilder = builder;
-        return newBuilder;
-    }
-
-    /**
-     * Backoff time when redirecting to an already retried host.
-     *
-     * @param ms
-     *          backoff time.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder redirectBackoffStartMs(int ms) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setRedirectBackoffStartMs(ms);
-        return newBuilder;
-    }
-
-    /**
-     * Max backoff time when redirecting to an already retried host.
-     *
-     * @param ms
-     *          backoff time.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder redirectBackoffMaxMs(int ms) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setRedirectBackoffMaxMs(ms);
-        return newBuilder;
-    }
-
-    /**
-     * Max redirects that is allowed per request.
-     *
-     * <p>If <i>redirects</i> are exhausted, fail the request immediately.
-     *
-     * @param redirects
-     *          max redirects allowed before failing a request.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder maxRedirects(int redirects) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setMaxRedirects(redirects);
-        return newBuilder;
-    }
-
-    /**
-     * Timeout per request in millis.
-     *
-     * @param timeoutMs
-     *          timeout per request in millis.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder requestTimeoutMs(int timeoutMs) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setRequestTimeoutMs(timeoutMs);
-        return newBuilder;
-    }
-
-    /**
-     * Set thriftmux enabled.
-     *
-     * @param enabled
-     *          is thriftmux enabled
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder thriftmux(boolean enabled) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setThriftMux(enabled);
-        return newBuilder;
-    }
-
-    /**
-     * Set failfast stream exception handling enabled.
-     *
-     * @param enabled
-     *          is failfast exception handling enabled
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder streamFailfast(boolean enabled) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setStreamFailfast(enabled);
-        return newBuilder;
-    }
-
-    /**
-     * Set the regex to match stream names that the client cares about.
-     *
-     * @param nameRegex
-     *          stream name regex
-     * @return client builder
-     */
-    public DistributedLogClientBuilder streamNameRegex(String nameRegex) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setStreamNameRegex(nameRegex);
-        return newBuilder;
-    }
-
-    /**
-     * Whether to use the new handshake endpoint to exchange ownership cache.
-     *
-     * <p>Enable this when the servers are updated to support handshaking with client info.
-     *
-     * @param enabled
-     *          new handshake endpoint is enabled.
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder handshakeWithClientInfo(boolean enabled) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setHandshakeWithClientInfo(enabled);
-        return newBuilder;
-    }
-
-    /**
-     * Set the periodic handshake interval in milliseconds.
-     *
-     * <p>Every <code>intervalMs</code>, the DL client will handshake with existing proxies again.
-     * If the interval is less than ownership sync interval, the handshake won't sync ownerships. Otherwise, it will.
-     *
-     * @see #periodicOwnershipSyncIntervalMs(long)
-     * @param intervalMs
-     *          handshake interval
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder periodicHandshakeIntervalMs(long intervalMs) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setPeriodicHandshakeIntervalMs(intervalMs);
-        return newBuilder;
-    }
-
-    /**
-     * Set the periodic ownership sync interval in milliseconds.
-     *
-     * <p>If periodic handshake is enabled, the handshake will sync ownership if the elapsed time is larger than
-     * sync interval.
-     *
-     * @see #periodicHandshakeIntervalMs(long)
-     * @param intervalMs
-     *          interval that handshake should sync ownerships.
-     * @return client builder
-     */
-    public DistributedLogClientBuilder periodicOwnershipSyncIntervalMs(long intervalMs) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setPeriodicOwnershipSyncIntervalMs(intervalMs);
-        return newBuilder;
-    }
-
-    /**
-     * Enable/Disable periodic dumping ownership cache.
-     *
-     * @param enabled
-     *          flag to enable/disable periodic dumping ownership cache
-     * @return client builder.
-     */
-    public DistributedLogClientBuilder periodicDumpOwnershipCache(boolean enabled) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setPeriodicDumpOwnershipCacheEnabled(enabled);
-        return newBuilder;
-    }
-
-    /**
-     * Set periodic dumping ownership cache interval.
-     *
-     * @param intervalMs
-     *          interval on dumping ownership cache, in millis.
-     * @return client builder
-     */
-    public DistributedLogClientBuilder periodicDumpOwnershipCacheIntervalMs(long intervalMs) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setPeriodicDumpOwnershipCacheIntervalMs(intervalMs);
-        return newBuilder;
-    }
-
-    /**
-     * Enable handshake tracing.
-     *
-     * @param enabled
-     *          flag to enable/disable handshake tracing
-     * @return client builder
-     */
-    public DistributedLogClientBuilder handshakeTracing(boolean enabled) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setHandshakeTracingEnabled(enabled);
-        return newBuilder;
-    }
-
-    /**
-     * Enable checksum on requests to the proxy.
-     *
-     * @param enabled
-     *          flag to enable/disable checksum
-     * @return client builder
-     */
-    public DistributedLogClientBuilder checksum(boolean enabled) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig.setChecksumEnabled(enabled);
-        return newBuilder;
-    }
-
-    /**
-     * Configure the finagle name string for the server-side routing service.
-     *
-     * @param nameStr name string of the server-side routing service
-     * @return client builder
-     */
-    public DistributedLogClientBuilder serverRoutingServiceFinagleNameStr(String nameStr) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.serverRoutingServiceFinagleName = nameStr;
-        return newBuilder;
-    }
-
-    DistributedLogClientBuilder clientConfig(ClientConfig clientConfig) {
-        DistributedLogClientBuilder newBuilder = newBuilder(this);
-        newBuilder.clientConfig = ClientConfig.newConfig(clientConfig);
-        return newBuilder;
-    }
-
-    /**
-     * Build distributedlog client.
-     *
-     * @return distributedlog client.
-     */
-    public DistributedLogClient build() {
-        return buildClient();
-    }
-
-    /**
-     * Build monitor service client.
-     *
-     * @return monitor service client.
-     */
-    public MonitorServiceClient buildMonitorClient() {
-
-        return buildClient();
-    }
-
-    @SuppressWarnings("unchecked")
-    ClusterClient buildServerRoutingServiceClient(String serverRoutingServiceFinagleName) {
-        ClientBuilder builder = this.clientBuilder;
-        if (null == builder) {
-            builder = ClientBuilder.get()
-                    .tcpConnectTimeout(Duration.fromMilliseconds(200))
-                    .connectTimeout(Duration.fromMilliseconds(200))
-                    .requestTimeout(Duration.fromSeconds(1))
-                    .retries(20);
-            if (!clientConfig.getThriftMux()) {
-                builder = builder.hostConnectionLimit(1);
-            }
-        }
-        if (clientConfig.getThriftMux()) {
-            builder = builder.stack(ThriftMux.client().withClientId(clientId));
-        } else {
-            builder = builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId)));
-        }
-
-        Name name;
-        try {
-            name = Resolver$.MODULE$.eval(serverRoutingServiceFinagleName);
-        } catch (Exception exc) {
-            logger.error("Exception in Resolver.eval for name {}", serverRoutingServiceFinagleName, exc);
-            throw new RuntimeException(exc);
-        }
-
-        // builder the client
-        Service<ThriftClientRequest, byte[]> client =
-                ClientBuilder.safeBuildFactory(
-                        builder.dest(name).reportTo(statsReceiver.scope("routing"))
-                ).toService();
-        DistributedLogService.ServiceIface service =
-                new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory());
-        return new ClusterClient(client, service);
-    }
-
-    DistributedLogClientImpl buildClient() {
-        checkNotNull(name, "No name provided.");
-        checkNotNull(clientId, "No client id provided.");
-        checkNotNull(routingServiceBuilder, "No routing service builder provided.");
-        checkNotNull(statsReceiver, "No stats receiver provided.");
-        if (null == streamStatsReceiver) {
-            streamStatsReceiver = new NullStatsReceiver();
-        }
-
-        Optional<ClusterClient> serverRoutingServiceClient = Optional.absent();
-        if (null != serverRoutingServiceFinagleName) {
-            serverRoutingServiceClient = Optional.of(
-                    buildServerRoutingServiceClient(serverRoutingServiceFinagleName));
-        }
-
-        RoutingService routingService = routingServiceBuilder
-                .statsReceiver(statsReceiver.scope("routing"))
-                .build();
-        DistributedLogClientImpl clientImpl =
-                new DistributedLogClientImpl(
-                        name,
-                        clientId,
-                        routingService,
-                        clientBuilder,
-                        clientConfig,
-                        serverRoutingServiceClient,
-                        statsReceiver,
-                        streamStatsReceiver,
-                        regionResolver,
-                        enableRegionStats);
-        routingService.startService();
-        clientImpl.handshake();
-        return clientImpl;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java
deleted file mode 100644
index 033882f..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * DistributedLog Service Client.
- */
-package org.apache.distributedlog.service;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/resources/findbugsExclude.xml b/distributedlog-client/src/main/resources/findbugsExclude.xml
deleted file mode 100644
index 05ee085..0000000
--- a/distributedlog-client/src/main/resources/findbugsExclude.xml
+++ /dev/null
@@ -1,23 +0,0 @@
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-//-->
-<FindBugsFilter>
-  <Match>
-    <!-- generated code, we can't be held responsible for findbugs in it //-->
-    <Class name="~org\.apache\.distributedlog\.thrift.*" />
-  </Match>
-</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
deleted file mode 100644
index d7494de..0000000
--- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.Lists;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecord;
-import org.apache.distributedlog.LogRecordSet;
-import org.apache.distributedlog.LogRecordSetBuffer;
-import org.apache.distributedlog.exceptions.LogRecordTooLongException;
-import org.apache.distributedlog.io.CompressionCodec;
-import org.apache.distributedlog.service.DistributedLogClient;
-import com.twitter.finagle.IndividualRequestTimeoutException;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Test {@link DistributedLogMultiStreamWriter}.
- */
-public class TestDistributedLogMultiStreamWriter {
-
-    @Test(timeout = 20000, expected = IllegalArgumentException.class)
-    public void testBuildWithNullStreams() throws Exception {
-        DistributedLogMultiStreamWriter.newBuilder()
-                .build();
-    }
-
-    @Test(timeout = 20000, expected = IllegalArgumentException.class)
-    public void testBuildWithEmptyStreamList() throws Exception {
-        DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.<String>newArrayList())
-                .build();
-    }
-
-    @Test(timeout = 20000, expected = NullPointerException.class)
-    public void testBuildWithNullClient() throws Exception {
-        DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .build();
-    }
-
-    @Test(timeout = 20000, expected = NullPointerException.class)
-    public void testBuildWithNullCodec() throws Exception {
-        DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(mock(DistributedLogClient.class))
-                .compressionCodec(null)
-                .build();
-    }
-
-    @Test(timeout = 20000, expected = IllegalArgumentException.class)
-    public void testBuildWithInvalidSpeculativeSettings1()
-            throws Exception {
-        DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(mock(DistributedLogClient.class))
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(-1)
-                .build();
-    }
-
-    @Test(timeout = 20000, expected = IllegalArgumentException.class)
-    public void testBuildWithInvalidSpeculativeSettings2()
-            throws Exception {
-        DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(mock(DistributedLogClient.class))
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(10)
-                .maxSpeculativeTimeoutMs(5)
-                .build();
-    }
-
-    @Test(timeout = 20000, expected = IllegalArgumentException.class)
-    public void testBuildWithInvalidSpeculativeSettings3()
-            throws Exception {
-        DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(mock(DistributedLogClient.class))
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(10)
-                .maxSpeculativeTimeoutMs(20)
-                .speculativeBackoffMultiplier(-1)
-                .build();
-    }
-
-    @Test(timeout = 20000, expected = IllegalArgumentException.class)
-    public void testBuildWithInvalidSpeculativeSettings4()
-            throws Exception {
-        DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(mock(DistributedLogClient.class))
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(10)
-                .maxSpeculativeTimeoutMs(20)
-                .speculativeBackoffMultiplier(2)
-                .requestTimeoutMs(10)
-                .build();
-    }
-
-    @Test(timeout = 20000)
-    public void testBuildMultiStreamWriter()
-            throws Exception {
-        DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(mock(DistributedLogClient.class))
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(10)
-                .maxSpeculativeTimeoutMs(20)
-                .speculativeBackoffMultiplier(2)
-                .requestTimeoutMs(50)
-                .build();
-        assertTrue(true);
-    }
-
-    @Test(timeout = 20000)
-    public void testBuildWithPeriodicalFlushEnabled() throws Exception {
-        ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
-        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(mock(DistributedLogClient.class))
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(10)
-                .maxSpeculativeTimeoutMs(20)
-                .speculativeBackoffMultiplier(2)
-                .requestTimeoutMs(50)
-                .flushIntervalMs(1000)
-                .scheduler(executorService)
-                .build();
-        verify(executorService, times(1)).scheduleAtFixedRate(writer, 1000000, 1000000, TimeUnit.MICROSECONDS);
-    }
-
-    @Test(timeout = 20000)
-    public void testBuildWithPeriodicalFlushDisabled() throws Exception {
-        ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
-        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(mock(DistributedLogClient.class))
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(10)
-                .maxSpeculativeTimeoutMs(20)
-                .speculativeBackoffMultiplier(2)
-                .requestTimeoutMs(50)
-                .flushIntervalMs(0)
-                .scheduler(executorService)
-                .build();
-        verify(executorService, times(0)).scheduleAtFixedRate(writer, 1000, 1000, TimeUnit.MILLISECONDS);
-        writer.close();
-    }
-
-    @Test(timeout = 20000)
-    public void testFlushWhenBufferIsFull() throws Exception {
-        DistributedLogClient client = mock(DistributedLogClient.class);
-        when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
-                .thenReturn(Future.value(new DLSN(1L, 1L, 999L)));
-
-        ScheduledExecutorService executorService =
-                Executors.newSingleThreadScheduledExecutor();
-        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(client)
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(100000)
-                .maxSpeculativeTimeoutMs(200000)
-                .speculativeBackoffMultiplier(2)
-                .requestTimeoutMs(500000)
-                .flushIntervalMs(0)
-                .bufferSize(0)
-                .scheduler(executorService)
-                .build();
-
-        ByteBuffer buffer = ByteBuffer.wrap("test".getBytes(UTF_8));
-        writer.write(buffer);
-
-        verify(client, times(1)).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
-
-        writer.close();
-    }
-
-    @Test(timeout = 20000)
-    public void testFlushWhenExceedMaxLogRecordSetSize()
-            throws Exception {
-        DistributedLogClient client = mock(DistributedLogClient.class);
-        when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
-                .thenReturn(Future.value(new DLSN(1L, 1L, 999L)));
-        ScheduledExecutorService executorService =
-                Executors.newSingleThreadScheduledExecutor();
-        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(client)
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(100000)
-                .maxSpeculativeTimeoutMs(200000)
-                .speculativeBackoffMultiplier(2)
-                .requestTimeoutMs(500000)
-                .flushIntervalMs(0)
-                .bufferSize(Integer.MAX_VALUE)
-                .scheduler(executorService)
-                .build();
-
-        byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE - 3 * 100];
-        ByteBuffer buffer1 = ByteBuffer.wrap(data);
-        writer.write(buffer1);
-        verify(client, times(0)).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
-        LogRecordSet.Writer recordSetWriter1 = writer.getLogRecordSetWriter();
-        assertEquals(1, recordSetWriter1.getNumRecords());
-        assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, recordSetWriter1.getNumBytes());
-
-        ByteBuffer buffer2 = ByteBuffer.wrap(data);
-        writer.write(buffer2);
-        verify(client, times(1)).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
-        LogRecordSet.Writer recordSetWriter2 = writer.getLogRecordSetWriter();
-        assertEquals(1, recordSetWriter2.getNumRecords());
-        assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, recordSetWriter2.getNumBytes());
-        assertTrue(recordSetWriter1 != recordSetWriter2);
-
-        writer.close();
-    }
-
-    @Test(timeout = 20000)
-    public void testWriteTooLargeRecord() throws Exception {
-        DistributedLogClient client = mock(DistributedLogClient.class);
-        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(client)
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(100000)
-                .maxSpeculativeTimeoutMs(200000)
-                .speculativeBackoffMultiplier(2)
-                .requestTimeoutMs(5000000)
-                .flushIntervalMs(0)
-                .bufferSize(0)
-                .build();
-
-        byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE + 10];
-        ByteBuffer buffer = ByteBuffer.wrap(data);
-        Future<DLSN> writeFuture = writer.write(buffer);
-        assertTrue(writeFuture.isDefined());
-        try {
-            Await.result(writeFuture);
-            fail("Should fail on writing too long record");
-        } catch (LogRecordTooLongException lrtle) {
-            // expected
-        }
-        writer.close();
-    }
-
-    @Test(timeout = 20000)
-    public void testSpeculativeWrite() throws Exception {
-        DistributedLogClient client = mock(DistributedLogClient.class);
-        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(client)
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(10)
-                .maxSpeculativeTimeoutMs(20)
-                .speculativeBackoffMultiplier(2)
-                .requestTimeoutMs(5000000)
-                .flushIntervalMs(0)
-                .bufferSize(0)
-                .build();
-
-        final String secondStream = writer.getStream(1);
-
-        final DLSN dlsn = new DLSN(99L, 88L, 0L);
-
-        Mockito.doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                Object[] arguments = invocation.getArguments();
-                String stream = (String) arguments[0];
-                if (stream.equals(secondStream)) {
-                    return Future.value(dlsn);
-                } else {
-                    return new Promise<DLSN>();
-                }
-            }
-        }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
-
-        byte[] data = "test-test".getBytes(UTF_8);
-        ByteBuffer buffer = ByteBuffer.wrap(data);
-        Future<DLSN> writeFuture = writer.write(buffer);
-        DLSN writeDLSN = Await.result(writeFuture);
-        assertEquals(dlsn, writeDLSN);
-        writer.close();
-    }
-
-    @Test(timeout = 20000)
-    public void testPeriodicalFlush() throws Exception {
-        DistributedLogClient client = mock(DistributedLogClient.class);
-        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(client)
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(10)
-                .maxSpeculativeTimeoutMs(20)
-                .speculativeBackoffMultiplier(2)
-                .requestTimeoutMs(5000000)
-                .flushIntervalMs(10)
-                .bufferSize(Integer.MAX_VALUE)
-                .build();
-
-        final DLSN dlsn = new DLSN(99L, 88L, 0L);
-
-        Mockito.doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                return Future.value(dlsn);
-            }
-        }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
-
-        byte[] data = "test-test".getBytes(UTF_8);
-        ByteBuffer buffer = ByteBuffer.wrap(data);
-        Future<DLSN> writeFuture = writer.write(buffer);
-        DLSN writeDLSN = Await.result(writeFuture);
-        assertEquals(dlsn, writeDLSN);
-        writer.close();
-    }
-
-    @Test(timeout = 20000)
-    public void testFailRequestAfterRetriedAllStreams() throws Exception {
-        DistributedLogClient client = mock(DistributedLogClient.class);
-        when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
-                .thenReturn(new Promise<DLSN>());
-        DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
-                .streams(Lists.newArrayList("stream1", "stream2"))
-                .client(client)
-                .compressionCodec(CompressionCodec.Type.LZ4)
-                .firstSpeculativeTimeoutMs(10)
-                .maxSpeculativeTimeoutMs(20)
-                .speculativeBackoffMultiplier(2)
-                .requestTimeoutMs(5000000)
-                .flushIntervalMs(10)
-                .bufferSize(Integer.MAX_VALUE)
-                .build();
-
-        byte[] data = "test-test".getBytes(UTF_8);
-        ByteBuffer buffer = ByteBuffer.wrap(data);
-        Future<DLSN> writeFuture = writer.write(buffer);
-        try {
-            Await.result(writeFuture);
-            fail("Should fail the request after retries all streams");
-        } catch (IndividualRequestTimeoutException e) {
-            long timeoutMs = e.timeout().inMilliseconds();
-            assertTrue(timeoutMs >= (10 + 20) && timeoutMs < 5000000);
-        }
-        writer.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
deleted file mode 100644
index 86d1c11..0000000
--- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.ownership;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.distributedlog.client.ClientConfig;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Map;
-import java.util.Set;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Test Case for Ownership Cache.
- */
-public class TestOwnershipCache {
-
-    @Rule
-    public TestName runtime = new TestName();
-
-    private static OwnershipCache createOwnershipCache() {
-        ClientConfig clientConfig = new ClientConfig();
-        return new OwnershipCache(clientConfig, null,
-                                  NullStatsReceiver.get(), NullStatsReceiver.get());
-    }
-
-    private static SocketAddress createSocketAddress(int port) {
-        return new InetSocketAddress("127.0.0.1", port);
-    }
-
-    @Test(timeout = 60000)
-    public void testUpdateOwner() {
-        OwnershipCache cache = createOwnershipCache();
-        SocketAddress addr = createSocketAddress(1000);
-        String stream = runtime.getMethodName();
-
-        assertTrue("Should successfully update owner if no owner exists before",
-                cache.updateOwner(stream, addr));
-        assertEquals("Owner should be " + addr + " for stream " + stream,
-                addr, cache.getOwner(stream));
-        assertTrue("Should successfully update owner if old owner is same",
-                cache.updateOwner(stream, addr));
-        assertEquals("Owner should be " + addr + " for stream " + stream,
-                addr, cache.getOwner(stream));
-    }
-
-    @Test(timeout = 60000)
-    public void testRemoveOwnerFromStream() {
-        OwnershipCache cache = createOwnershipCache();
-        int initialPort = 2000;
-        int numProxies = 2;
-        int numStreamsPerProxy = 2;
-        for (int i = 0; i < numProxies; i++) {
-            SocketAddress addr = createSocketAddress(initialPort + i);
-            for (int j = 0; j < numStreamsPerProxy; j++) {
-                String stream = runtime.getMethodName() + "_" + i + "_" + j;
-                cache.updateOwner(stream, addr);
-            }
-        }
-        Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
-                numProxies * numStreamsPerProxy, ownershipMap.size());
-        Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
-        assertEquals("There should be " + numProxies + " proxies cached",
-                numProxies, ownershipDistribution.size());
-
-        String stream = runtime.getMethodName() + "_0_0";
-        SocketAddress owner = createSocketAddress(initialPort);
-
-        // remove non-existent mapping won't change anything
-        SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
-        cache.removeOwnerFromStream(stream, nonExistentAddr, "remove-non-existent-addr");
-        assertEquals("Owner " + owner + " should not be removed",
-                owner, cache.getOwner(stream));
-        ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
-                numProxies * numStreamsPerProxy, ownershipMap.size());
-
-        // remove existent mapping should remove ownership mapping
-        cache.removeOwnerFromStream(stream, owner, "remove-owner");
-        assertNull("Owner " + owner + " should be removed", cache.getOwner(stream));
-        ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should be " + (numProxies * numStreamsPerProxy - 1) + " entries left in cache",
-                numProxies * numStreamsPerProxy - 1, ownershipMap.size());
-        ownershipDistribution = cache.getStreamOwnershipDistribution();
-        assertEquals("There should still be " + numProxies + " proxies cached",
-                numProxies, ownershipDistribution.size());
-        Set<String> ownedStreams = ownershipDistribution.get(owner);
-        assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned for " + owner,
-                numStreamsPerProxy - 1, ownedStreams.size());
-        assertFalse("Stream " + stream + " should not be owned by " + owner,
-                ownedStreams.contains(stream));
-    }
-
-    @Test(timeout = 60000)
-    public void testRemoveAllStreamsFromOwner() {
-        OwnershipCache cache = createOwnershipCache();
-        int initialPort = 2000;
-        int numProxies = 2;
-        int numStreamsPerProxy = 2;
-        for (int i = 0; i < numProxies; i++) {
-            SocketAddress addr = createSocketAddress(initialPort + i);
-            for (int j = 0; j < numStreamsPerProxy; j++) {
-                String stream = runtime.getMethodName() + "_" + i + "_" + j;
-                cache.updateOwner(stream, addr);
-            }
-        }
-        Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
-                numProxies * numStreamsPerProxy, ownershipMap.size());
-        Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
-        assertEquals("There should be " + numProxies + " proxies cached",
-                numProxies, ownershipDistribution.size());
-
-        SocketAddress owner = createSocketAddress(initialPort);
-
-        // remove non-existent host won't change anything
-        SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
-        cache.removeAllStreamsFromOwner(nonExistentAddr);
-        ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should still be " + (numProxies * numStreamsPerProxy) + " entries in cache",
-                numProxies * numStreamsPerProxy, ownershipMap.size());
-        ownershipDistribution = cache.getStreamOwnershipDistribution();
-        assertEquals("There should still be " + numProxies + " proxies cached",
-                numProxies, ownershipDistribution.size());
-
-        // remove existent host should remove ownership mapping
-        cache.removeAllStreamsFromOwner(owner);
-        ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should be " + ((numProxies - 1) * numStreamsPerProxy) + " entries left in cache",
-                (numProxies - 1) * numStreamsPerProxy, ownershipMap.size());
-        ownershipDistribution = cache.getStreamOwnershipDistribution();
-        assertEquals("There should be " + (numProxies - 1) + " proxies cached",
-                numProxies - 1, ownershipDistribution.size());
-        assertFalse("Host " + owner + " should not be cached",
-                ownershipDistribution.containsKey(owner));
-    }
-
-    @Test(timeout = 60000)
-    public void testReplaceOwner() {
-        OwnershipCache cache = createOwnershipCache();
-        int initialPort = 2000;
-        int numProxies = 2;
-        int numStreamsPerProxy = 2;
-        for (int i = 0; i < numProxies; i++) {
-            SocketAddress addr = createSocketAddress(initialPort + i);
-            for (int j = 0; j < numStreamsPerProxy; j++) {
-                String stream = runtime.getMethodName() + "_" + i + "_" + j;
-                cache.updateOwner(stream, addr);
-            }
-        }
-        Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
-                numProxies * numStreamsPerProxy, ownershipMap.size());
-        Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
-        assertEquals("There should be " + numProxies + " proxies cached",
-                numProxies, ownershipDistribution.size());
-
-        String stream = runtime.getMethodName() + "_0_0";
-        SocketAddress oldOwner = createSocketAddress(initialPort);
-        SocketAddress newOwner = createSocketAddress(initialPort + 999);
-
-        cache.updateOwner(stream, newOwner);
-        assertEquals("Owner of " + stream + " should be changed from " + oldOwner + " to " + newOwner,
-                newOwner, cache.getOwner(stream));
-        ownershipMap = cache.getStreamOwnerMapping();
-        assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
-                numProxies * numStreamsPerProxy, ownershipMap.size());
-        assertEquals("Owner of " + stream + " should be " + newOwner,
-                newOwner, ownershipMap.get(stream));
-        ownershipDistribution = cache.getStreamOwnershipDistribution();
-        assertEquals("There should be " + (numProxies + 1) + " proxies cached",
-                numProxies + 1, ownershipDistribution.size());
-        Set<String> oldOwnedStreams = ownershipDistribution.get(oldOwner);
-        assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + oldOwner,
-                numStreamsPerProxy - 1, oldOwnedStreams.size());
-        assertFalse("Stream " + stream + " should not be owned by " + oldOwner,
-                oldOwnedStreams.contains(stream));
-        Set<String> newOwnedStreams = ownershipDistribution.get(newOwner);
-        assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + newOwner,
-                1, newOwnedStreams.size());
-        assertTrue("Stream " + stream + " should be owned by " + newOwner,
-                newOwnedStreams.contains(stream));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
deleted file mode 100644
index 8ef33bd..0000000
--- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.proxy;
-
-import org.apache.distributedlog.thrift.service.BulkWriteResponse;
-import org.apache.distributedlog.thrift.service.ClientInfo;
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import org.apache.distributedlog.thrift.service.HeartbeatOptions;
-import org.apache.distributedlog.thrift.service.ServerInfo;
-import org.apache.distributedlog.thrift.service.WriteContext;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import com.twitter.util.Future;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * Mock DistributedLog Related Services.
- */
-public class MockDistributedLogServices {
-
-    /**
-     * Mock basic service.
-     */
-    static class MockBasicService implements DistributedLogService.ServiceIface {
-
-        @Override
-        public Future<ServerInfo> handshake() {
-            return Future.value(new ServerInfo());
-        }
-
-        @Override
-        public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
-            return Future.value(new ServerInfo());
-        }
-
-        @Override
-        public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> heartbeatWithOptions(String stream,
-                                                          WriteContext ctx,
-                                                          HeartbeatOptions options) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> write(String stream,
-                                           ByteBuffer data) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> writeWithContext(String stream,
-                                                      ByteBuffer data,
-                                                      WriteContext ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<BulkWriteResponse> writeBulkWithContext(String stream,
-                                                              List<ByteBuffer> data,
-                                                              WriteContext ctx) {
-            return Future.value(new BulkWriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> truncate(String stream,
-                                              String dlsn,
-                                              WriteContext ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> release(String stream,
-                                             WriteContext ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> create(String stream, WriteContext ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> delete(String stream,
-                                            WriteContext ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<WriteResponse> getOwner(String stream, WriteContext ctx) {
-            return Future.value(new WriteResponse());
-        }
-
-        @Override
-        public Future<Void> setAcceptNewStream(boolean enabled) {
-            return Future.value(null);
-        }
-    }
-
-    /**
-     * Mock server info service.
-     */
-    public static class MockServerInfoService extends MockBasicService {
-
-        protected ServerInfo serverInfo;
-
-        public MockServerInfoService() {
-            serverInfo = new ServerInfo();
-        }
-
-        public void updateServerInfo(ServerInfo serverInfo) {
-            this.serverInfo = serverInfo;
-        }
-
-        @Override
-        public Future<ServerInfo> handshake() {
-            return Future.value(serverInfo);
-        }
-
-        @Override
-        public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
-            return Future.value(serverInfo);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
deleted file mode 100644
index e38c2ed..0000000
--- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.proxy;
-
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import java.net.SocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Mock Proxy Client Builder.
- */
-class MockProxyClientBuilder implements ProxyClient.Builder {
-
-    static class MockProxyClient extends ProxyClient {
-        MockProxyClient(SocketAddress address,
-                        DistributedLogService.ServiceIface service) {
-            super(address, new MockThriftClient(), service);
-        }
-    }
-
-    private final ConcurrentMap<SocketAddress, MockProxyClient> clients =
-            new ConcurrentHashMap<SocketAddress, MockProxyClient>();
-
-    public void provideProxyClient(SocketAddress address,
-                                   MockProxyClient proxyClient) {
-        clients.put(address, proxyClient);
-    }
-
-    @Override
-    public ProxyClient build(SocketAddress address) {
-        return clients.get(address);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
deleted file mode 100644
index ad1c878..0000000
--- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.proxy;
-
-import com.twitter.finagle.Service;
-import com.twitter.finagle.thrift.ThriftClientRequest;
-import com.twitter.util.Future;
-
-/**
- * Mock Thrift Client.
- */
-class MockThriftClient extends Service<ThriftClientRequest, byte[]> {
-    @Override
-    public Future<byte[]> apply(ThriftClientRequest request) {
-        return Future.value(request.message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
deleted file mode 100644
index 6d9a471..0000000
--- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.proxy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.distributedlog.client.ClientConfig;
-import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockBasicService;
-import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockServerInfoService;
-import org.apache.distributedlog.client.proxy.MockProxyClientBuilder.MockProxyClient;
-import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
-import org.apache.distributedlog.client.stats.ClientStats;
-import org.apache.distributedlog.thrift.service.ServerInfo;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.lang3.tuple.Pair;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Test Proxy Client Manager.
- */
-public class TestProxyClientManager {
-
-    @Rule
-    public TestName runtime = new TestName();
-
-    static class TestHostProvider implements HostProvider {
-
-        Set<SocketAddress> hosts = new HashSet<SocketAddress>();
-
-        synchronized void addHost(SocketAddress host) {
-            hosts.add(host);
-        }
-
-        @Override
-        public synchronized Set<SocketAddress> getHosts() {
-            return ImmutableSet.copyOf(hosts);
-        }
-
-    }
-
-    private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder,
-                                                               long periodicHandshakeIntervalMs) {
-        HostProvider provider = new TestHostProvider();
-        return createProxyClientManager(builder, provider, periodicHandshakeIntervalMs);
-    }
-
-    private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder,
-                                                               HostProvider hostProvider,
-                                                               long periodicHandshakeIntervalMs) {
-        ClientConfig clientConfig = new ClientConfig();
-        clientConfig.setPeriodicHandshakeIntervalMs(periodicHandshakeIntervalMs);
-        clientConfig.setPeriodicOwnershipSyncIntervalMs(-1);
-        HashedWheelTimer dlTimer = new HashedWheelTimer(
-                new ThreadFactoryBuilder().setNameFormat("TestProxyClientManager-timer-%d").build(),
-                clientConfig.getRedirectBackoffStartMs(),
-                TimeUnit.MILLISECONDS);
-        return new ProxyClientManager(clientConfig, builder, dlTimer, hostProvider,
-                new ClientStats(NullStatsReceiver.get(), false, new DefaultRegionResolver()));
-    }
-
-    private static SocketAddress createSocketAddress(int port) {
-        return new InetSocketAddress("127.0.0.1", port);
-    }
-
-    private static MockProxyClient createMockProxyClient(SocketAddress address) {
-        return new MockProxyClient(address, new MockBasicService());
-    }
-
-    private static Pair<MockProxyClient, MockServerInfoService> createMockProxyClient(
-            SocketAddress address, ServerInfo serverInfo) {
-        MockServerInfoService service = new MockServerInfoService();
-        MockProxyClient proxyClient = new MockProxyClient(address, service);
-        service.updateServerInfo(serverInfo);
-        return Pair.of(proxyClient, service);
-    }
-
-    @Test(timeout = 60000)
-    public void testBasicCreateRemove() throws Exception {
-        SocketAddress address = createSocketAddress(1000);
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        MockProxyClient mockProxyClient = createMockProxyClient(address);
-        builder.provideProxyClient(address, mockProxyClient);
-
-        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        ProxyClient proxyClient =  clientManager.createClient(address);
-        assertEquals("Create client should build the proxy client",
-                1, clientManager.getNumProxies());
-        assertTrue("The client returned should be the same client that builder built",
-                mockProxyClient == proxyClient);
-    }
-
-    @Test(timeout = 60000)
-    public void testGetShouldCreateClient() throws Exception {
-        SocketAddress address = createSocketAddress(2000);
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        MockProxyClient mockProxyClient = createMockProxyClient(address);
-        builder.provideProxyClient(address, mockProxyClient);
-
-        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        ProxyClient proxyClient =  clientManager.getClient(address);
-        assertEquals("Get client should build the proxy client",
-                1, clientManager.getNumProxies());
-        assertTrue("The client returned should be the same client that builder built",
-                mockProxyClient == proxyClient);
-    }
-
-    @Test(timeout = 60000)
-    public void testConditionalRemoveClient() throws Exception {
-        SocketAddress address = createSocketAddress(3000);
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        MockProxyClient mockProxyClient = createMockProxyClient(address);
-        MockProxyClient anotherMockProxyClient = createMockProxyClient(address);
-        builder.provideProxyClient(address, mockProxyClient);
-
-        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        clientManager.createClient(address);
-        assertEquals("Create client should build the proxy client",
-                1, clientManager.getNumProxies());
-        clientManager.removeClient(address, anotherMockProxyClient);
-        assertEquals("Conditional remove should not remove proxy client",
-                1, clientManager.getNumProxies());
-        clientManager.removeClient(address, mockProxyClient);
-        assertEquals("Conditional remove should remove proxy client",
-                0, clientManager.getNumProxies());
-    }
-
-    @Test(timeout = 60000)
-    public void testRemoveClient() throws Exception {
-        SocketAddress address = createSocketAddress(3000);
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        MockProxyClient mockProxyClient = createMockProxyClient(address);
-        builder.provideProxyClient(address, mockProxyClient);
-
-        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        clientManager.createClient(address);
-        assertEquals("Create client should build the proxy client",
-                1, clientManager.getNumProxies());
-        clientManager.removeClient(address);
-        assertEquals("Remove should remove proxy client",
-                0, clientManager.getNumProxies());
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateClientShouldHandshake() throws Exception {
-        SocketAddress address = createSocketAddress(3000);
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        ServerInfo serverInfo = new ServerInfo();
-        serverInfo.putToOwnerships(runtime.getMethodName() + "_stream",
-                runtime.getMethodName() + "_owner");
-        Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
-                createMockProxyClient(address, serverInfo);
-        builder.provideProxyClient(address, mockProxyClient.getLeft());
-
-        final AtomicReference<ServerInfo> resultHolder = new AtomicReference<ServerInfo>(null);
-        final CountDownLatch doneLatch = new CountDownLatch(1);
-        ProxyListener listener = new ProxyListener() {
-            @Override
-            public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
-                resultHolder.set(serverInfo);
-                doneLatch.countDown();
-            }
-            @Override
-            public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
-            }
-        };
-
-        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
-        clientManager.registerProxyListener(listener);
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        clientManager.createClient(address);
-        assertEquals("Create client should build the proxy client",
-                1, clientManager.getNumProxies());
-
-        // When a client is created, it would handshake with that proxy
-        doneLatch.await();
-        assertEquals("Handshake should return server info",
-                serverInfo, resultHolder.get());
-    }
-
-    @Test(timeout = 60000)
-    public void testHandshake() throws Exception {
-        final int numHosts = 3;
-        final int numStreamsPerHost = 3;
-        final int initialPort = 4000;
-
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        Map<SocketAddress, ServerInfo> serverInfoMap =
-                new HashMap<SocketAddress, ServerInfo>();
-        for (int i = 0; i < numHosts; i++) {
-            SocketAddress address = createSocketAddress(initialPort + i);
-            ServerInfo serverInfo = new ServerInfo();
-            for (int j = 0; j < numStreamsPerHost; j++) {
-                serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j,
-                        address.toString());
-            }
-            Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
-                    createMockProxyClient(address, serverInfo);
-            builder.provideProxyClient(address, mockProxyClient.getLeft());
-            serverInfoMap.put(address, serverInfo);
-        }
-
-        final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>();
-        final CountDownLatch doneLatch = new CountDownLatch(2 * numHosts);
-        ProxyListener listener = new ProxyListener() {
-            @Override
-            public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
-                synchronized (results) {
-                    results.put(address, serverInfo);
-                }
-                doneLatch.countDown();
-            }
-
-            @Override
-            public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
-            }
-        };
-
-        TestHostProvider rs = new TestHostProvider();
-        ProxyClientManager clientManager = createProxyClientManager(builder, rs, 0L);
-        clientManager.registerProxyListener(listener);
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        for (int i = 0; i < numHosts; i++) {
-            rs.addHost(createSocketAddress(initialPort + i));
-        }
-        // handshake would handshake with 3 hosts again
-        clientManager.handshake();
-        doneLatch.await();
-        assertEquals("Handshake should return server info",
-                numHosts, results.size());
-        assertTrue("Handshake should get all server infos",
-                Maps.difference(serverInfoMap, results).areEqual());
-    }
-
-    @Test(timeout = 60000)
-    public void testPeriodicHandshake() throws Exception {
-        final int numHosts = 3;
-        final int numStreamsPerHost = 3;
-        final int initialPort = 5000;
-
-        MockProxyClientBuilder builder = new MockProxyClientBuilder();
-        Map<SocketAddress, ServerInfo> serverInfoMap =
-                new HashMap<SocketAddress, ServerInfo>();
-        Map<SocketAddress, MockServerInfoService> mockServiceMap =
-                new HashMap<SocketAddress, MockServerInfoService>();
-        final Map<SocketAddress, CountDownLatch> hostDoneLatches =
-                new HashMap<SocketAddress, CountDownLatch>();
-        for (int i = 0; i < numHosts; i++) {
-            SocketAddress address = createSocketAddress(initialPort + i);
-            ServerInfo serverInfo = new ServerInfo();
-            for (int j = 0; j < numStreamsPerHost; j++) {
-                serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j,
-                        address.toString());
-            }
-            Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
-                    createMockProxyClient(address, serverInfo);
-            builder.provideProxyClient(address, mockProxyClient.getLeft());
-            serverInfoMap.put(address, serverInfo);
-            mockServiceMap.put(address, mockProxyClient.getRight());
-            hostDoneLatches.put(address, new CountDownLatch(2));
-        }
-
-        final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>();
-        final CountDownLatch doneLatch = new CountDownLatch(numHosts);
-        ProxyListener listener = new ProxyListener() {
-            @Override
-            public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
-                synchronized (results) {
-                    results.put(address, serverInfo);
-                    CountDownLatch latch = hostDoneLatches.get(address);
-                    if (null != latch) {
-                        latch.countDown();
-                    }
-                }
-                doneLatch.countDown();
-            }
-
-            @Override
-            public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
-            }
-        };
-
-        TestHostProvider rs = new TestHostProvider();
-        ProxyClientManager clientManager = createProxyClientManager(builder, rs, 50L);
-        clientManager.setPeriodicHandshakeEnabled(false);
-        clientManager.registerProxyListener(listener);
-
-        assertEquals("There should be no clients in the manager",
-                0, clientManager.getNumProxies());
-        for (int i = 0; i < numHosts; i++) {
-            SocketAddress address = createSocketAddress(initialPort + i);
-            rs.addHost(address);
-            clientManager.createClient(address);
-        }
-
-        // make sure the first 3 handshakes going through
-        doneLatch.await();
-
-        assertEquals("Handshake should return server info",
-                numHosts, results.size());
-        assertTrue("Handshake should get all server infos",
-                Maps.difference(serverInfoMap, results).areEqual());
-
-        // update server info
-        for (int i = 0; i < numHosts; i++) {
-            SocketAddress address = createSocketAddress(initialPort + i);
-            ServerInfo serverInfo = new ServerInfo();
-            for (int j = 0; j < numStreamsPerHost; j++) {
-                serverInfo.putToOwnerships(runtime.getMethodName() + "_new_stream_" + j,
-                        address.toString());
-            }
-            MockServerInfoService service = mockServiceMap.get(address);
-            serverInfoMap.put(address, serverInfo);
-            service.updateServerInfo(serverInfo);
-        }
-
-        clientManager.setPeriodicHandshakeEnabled(true);
-        for (int i = 0; i < numHosts; i++) {
-            SocketAddress address = createSocketAddress(initialPort + i);
-            CountDownLatch latch = hostDoneLatches.get(address);
-            latch.await();
-        }
-
-        assertTrue("Periodic handshake should update all server infos",
-                Maps.difference(serverInfoMap, results).areEqual());
-    }
-
-}