You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:52 UTC

[47/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
deleted file mode 100644
index 1077cd0..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
+++ /dev/null
@@ -1,1200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.LogRecordSetBuffer;
-import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
-import com.twitter.distributedlog.client.ownership.OwnershipCache;
-import com.twitter.distributedlog.client.proxy.ClusterClient;
-import com.twitter.distributedlog.client.proxy.HostProvider;
-import com.twitter.distributedlog.client.proxy.ProxyClient;
-import com.twitter.distributedlog.client.proxy.ProxyClientManager;
-import com.twitter.distributedlog.client.proxy.ProxyListener;
-import com.twitter.distributedlog.client.resolver.RegionResolver;
-import com.twitter.distributedlog.client.routing.RoutingService;
-import com.twitter.distributedlog.client.routing.RoutingService.RoutingContext;
-import com.twitter.distributedlog.client.stats.ClientStats;
-import com.twitter.distributedlog.client.stats.OpStats;
-import com.twitter.distributedlog.exceptions.DLClientClosedException;
-import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.exceptions.ServiceUnavailableException;
-import com.twitter.distributedlog.exceptions.StreamUnavailableException;
-import com.twitter.distributedlog.service.DLSocketAddress;
-import com.twitter.distributedlog.service.DistributedLogClient;
-import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
-import com.twitter.distributedlog.thrift.service.HeartbeatOptions;
-import com.twitter.distributedlog.thrift.service.ResponseHeader;
-import com.twitter.distributedlog.thrift.service.ServerInfo;
-import com.twitter.distributedlog.thrift.service.ServerStatus;
-import com.twitter.distributedlog.thrift.service.StatusCode;
-import com.twitter.distributedlog.thrift.service.WriteContext;
-import com.twitter.distributedlog.thrift.service.WriteResponse;
-import com.twitter.distributedlog.util.ProtocolUtils;
-import com.twitter.finagle.CancelledRequestException;
-import com.twitter.finagle.ConnectionFailedException;
-import com.twitter.finagle.Failure;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.RequestTimeoutException;
-import com.twitter.finagle.ServiceException;
-import com.twitter.finagle.ServiceTimeoutException;
-import com.twitter.finagle.WriteException;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientId;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.thrift.TApplicationException;
-import org.jboss.netty.channel.ChannelException;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.Seq;
-import scala.runtime.AbstractFunction1;
-
-
-/**
- * Implementation of distributedlog client.
- */
-public class DistributedLogClientImpl implements DistributedLogClient, MonitorServiceClient,
-        RoutingService.RoutingListener, ProxyListener, HostProvider {
-
-    private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientImpl.class);
-
-    private final String clientName;
-    private final ClientId clientId;
-    private final ClientConfig clientConfig;
-    private final RoutingService routingService;
-    private final ProxyClient.Builder clientBuilder;
-    private final boolean streamFailfast;
-    private final Pattern streamNameRegexPattern;
-
-    // Timer
-    private final HashedWheelTimer dlTimer;
-
-    // region resolver
-    private final RegionResolver regionResolver;
-
-    // Ownership maintenance
-    private final OwnershipCache ownershipCache;
-    // Channel/Client management
-    private final ProxyClientManager clientManager;
-    // Cluster Client (for routing service)
-    private final Optional<ClusterClient> clusterClient;
-
-    // Close Status
-    private boolean closed = false;
-    private final ReentrantReadWriteLock closeLock =
-            new ReentrantReadWriteLock();
-
-    abstract class StreamOp implements TimerTask {
-        final String stream;
-
-        final AtomicInteger tries = new AtomicInteger(0);
-        final RoutingContext routingContext = RoutingContext.of(regionResolver);
-        final WriteContext ctx = new WriteContext();
-        final Stopwatch stopwatch;
-        final OpStats opStats;
-        SocketAddress nextAddressToSend;
-
-        StreamOp(final String stream, final OpStats opStats) {
-            this.stream = stream;
-            this.stopwatch = Stopwatch.createStarted();
-            this.opStats = opStats;
-        }
-
-        boolean shouldTimeout() {
-            long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-            return shouldTimeout(elapsedMs);
-        }
-
-        boolean shouldTimeout(long elapsedMs) {
-            return clientConfig.getRequestTimeoutMs() > 0
-                && elapsedMs >= clientConfig.getRequestTimeoutMs();
-        }
-
-        void send(SocketAddress address) {
-            long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-            if (clientConfig.getMaxRedirects() > 0
-                && tries.get() >= clientConfig.getMaxRedirects()) {
-                fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs),
-                        "Exhausted max redirects in " + elapsedMs + " ms"));
-                return;
-            } else if (shouldTimeout(elapsedMs)) {
-                fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs),
-                        "Exhausted max request timeout " + clientConfig.getRequestTimeoutMs()
-                                + " in " + elapsedMs + " ms"));
-                return;
-            }
-            synchronized (this) {
-                String addrStr = address.toString();
-                if (ctx.isSetTriedHosts() && ctx.getTriedHosts().contains(addrStr)) {
-                    nextAddressToSend = address;
-                    dlTimer.newTimeout(this,
-                            Math.min(clientConfig.getRedirectBackoffMaxMs(),
-                                    tries.get() * clientConfig.getRedirectBackoffStartMs()),
-                            TimeUnit.MILLISECONDS);
-                } else {
-                    doSend(address);
-                }
-            }
-        }
-
-        abstract Future<ResponseHeader> sendRequest(ProxyClient sc);
-
-        void doSend(SocketAddress address) {
-            ctx.addToTriedHosts(address.toString());
-            if (clientConfig.isChecksumEnabled()) {
-                Long crc32 = computeChecksum();
-                if (null != crc32) {
-                    ctx.setCrc32(crc32);
-                }
-            }
-            tries.incrementAndGet();
-            sendWriteRequest(address, this);
-        }
-
-        void beforeComplete(ProxyClient sc, ResponseHeader responseHeader) {
-            ownershipCache.updateOwner(stream, sc.getAddress());
-        }
-
-        void complete(SocketAddress address) {
-            stopwatch.stop();
-            opStats.completeRequest(address,
-                    stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get());
-        }
-
-        void fail(SocketAddress address, Throwable t) {
-            stopwatch.stop();
-            opStats.failRequest(address,
-                    stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get());
-        }
-
-        Long computeChecksum() {
-            return null;
-        }
-
-        @Override
-        public synchronized void run(Timeout timeout) throws Exception {
-            if (!timeout.isCancelled() && null != nextAddressToSend) {
-                doSend(nextAddressToSend);
-            } else {
-                fail(null, new CancelledRequestException());
-            }
-        }
-    }
-
-    class BulkWriteOp extends StreamOp {
-
-        final List<ByteBuffer> data;
-        final ArrayList<Promise<DLSN>> results;
-
-        BulkWriteOp(final String name, final List<ByteBuffer> data) {
-            super(name, clientStats.getOpStats("bulk_write"));
-            this.data = data;
-
-            // This could take a while (relatively speaking) for very large inputs. We probably don't want
-            // to go so large for other reasons though.
-            this.results = new ArrayList<Promise<DLSN>>(data.size());
-            for (int i = 0; i < data.size(); i++) {
-                checkNotNull(data.get(i));
-                this.results.add(new Promise<DLSN>());
-            }
-        }
-
-        @Override
-        Future<ResponseHeader> sendRequest(final ProxyClient sc) {
-            return sc.getService().writeBulkWithContext(stream, data, ctx)
-                .addEventListener(new FutureEventListener<BulkWriteResponse>() {
-                @Override
-                public void onSuccess(BulkWriteResponse response) {
-                    // For non-success case, the ResponseHeader handler (the caller) will handle it.
-                    // Note success in this case means no finagle errors have occurred
-                    // (such as finagle connection issues). In general code != SUCCESS means there's some error
-                    // reported by dlog service. The caller will handle such errors.
-                    if (response.getHeader().getCode() == StatusCode.SUCCESS) {
-                        beforeComplete(sc, response.getHeader());
-                        BulkWriteOp.this.complete(sc.getAddress(), response);
-                        if (response.getWriteResponses().size() == 0 && data.size() > 0) {
-                            logger.error("non-empty bulk write got back empty response without failure for stream {}",
-                                stream);
-                        }
-                    }
-                }
-                @Override
-                public void onFailure(Throwable cause) {
-                    // Handled by the ResponseHeader listener (attached by the caller).
-                }
-            }).map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() {
-                @Override
-                public ResponseHeader apply(BulkWriteResponse response) {
-                    // We need to return the ResponseHeader to the caller's listener to process DLOG errors.
-                    return response.getHeader();
-                }
-            });
-        }
-
-        void complete(SocketAddress address, BulkWriteResponse bulkWriteResponse) {
-            super.complete(address);
-            Iterator<WriteResponse> writeResponseIterator = bulkWriteResponse.getWriteResponses().iterator();
-            Iterator<Promise<DLSN>> resultIterator = results.iterator();
-
-            // Fill in errors from thrift responses.
-            while (resultIterator.hasNext() && writeResponseIterator.hasNext()) {
-                Promise<DLSN> result = resultIterator.next();
-                WriteResponse writeResponse = writeResponseIterator.next();
-                if (StatusCode.SUCCESS == writeResponse.getHeader().getCode()) {
-                    result.setValue(DLSN.deserialize(writeResponse.getDlsn()));
-                } else {
-                    result.setException(DLException.of(writeResponse.getHeader()));
-                }
-            }
-
-            // Should never happen, but just in case so there's some record.
-            if (bulkWriteResponse.getWriteResponses().size() != data.size()) {
-                logger.error("wrong number of results, response = {} records = {}",
-                    bulkWriteResponse.getWriteResponses().size(), data.size());
-            }
-        }
-
-        @Override
-        void fail(SocketAddress address, Throwable t) {
-
-            // StreamOp.fail is called to fail the overall request. In case of BulkWriteOp we take the request level
-            // exception to apply to the first write. In fact for request level exceptions no request has ever been
-            // attempted, but logically we associate the error with the first write.
-            super.fail(address, t);
-            Iterator<Promise<DLSN>> resultIterator = results.iterator();
-
-            // Fail the first write with the batch level failure.
-            if (resultIterator.hasNext()) {
-                Promise<DLSN> result = resultIterator.next();
-                result.setException(t);
-            }
-
-            // Fail the remaining writes as cancelled requests.
-            while (resultIterator.hasNext()) {
-                Promise<DLSN> result = resultIterator.next();
-                result.setException(new CancelledRequestException());
-            }
-        }
-
-        @SuppressWarnings("unchecked")
-        List<Future<DLSN>> result() {
-            return (List) results;
-        }
-    }
-
-    abstract class AbstractWriteOp extends StreamOp {
-
-        final Promise<WriteResponse> result = new Promise<WriteResponse>();
-        Long crc32 = null;
-
-        AbstractWriteOp(final String name, final OpStats opStats) {
-            super(name, opStats);
-        }
-
-        void complete(SocketAddress address, WriteResponse response) {
-            super.complete(address);
-            result.setValue(response);
-        }
-
-        @Override
-        void fail(SocketAddress address, Throwable t) {
-            super.fail(address, t);
-            result.setException(t);
-        }
-
-        @Override
-        Long computeChecksum() {
-            if (null == crc32) {
-                crc32 = ProtocolUtils.streamOpCRC32(stream);
-            }
-            return crc32;
-        }
-
-        @Override
-        Future<ResponseHeader> sendRequest(final ProxyClient sc) {
-            return this.sendWriteRequest(sc).addEventListener(new FutureEventListener<WriteResponse>() {
-                @Override
-                public void onSuccess(WriteResponse response) {
-                    if (response.getHeader().getCode() == StatusCode.SUCCESS) {
-                        beforeComplete(sc, response.getHeader());
-                        AbstractWriteOp.this.complete(sc.getAddress(), response);
-                    }
-                }
-                @Override
-                public void onFailure(Throwable cause) {
-                    // handled by the ResponseHeader listener
-                }
-            }).map(new AbstractFunction1<WriteResponse, ResponseHeader>() {
-                @Override
-                public ResponseHeader apply(WriteResponse response) {
-                    return response.getHeader();
-                }
-            });
-        }
-
-        abstract Future<WriteResponse> sendWriteRequest(ProxyClient sc);
-    }
-
-    class WriteOp extends AbstractWriteOp {
-        final ByteBuffer data;
-
-        WriteOp(final String name, final ByteBuffer data) {
-            super(name, clientStats.getOpStats("write"));
-            this.data = data;
-        }
-
-        @Override
-        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
-            return sc.getService().writeWithContext(stream, data, ctx);
-        }
-
-        @Override
-        Long computeChecksum() {
-            if (null == crc32) {
-                byte[] dataBytes = new byte[data.remaining()];
-                data.duplicate().get(dataBytes);
-                crc32 = ProtocolUtils.writeOpCRC32(stream, dataBytes);
-            }
-            return crc32;
-        }
-
-        Future<DLSN> result() {
-            return result.map(new AbstractFunction1<WriteResponse, DLSN>() {
-                @Override
-                public DLSN apply(WriteResponse response) {
-                    return DLSN.deserialize(response.getDlsn());
-                }
-            });
-        }
-    }
-
-    class TruncateOp extends AbstractWriteOp {
-        final DLSN dlsn;
-
-        TruncateOp(String name, DLSN dlsn) {
-            super(name, clientStats.getOpStats("truncate"));
-            this.dlsn = dlsn;
-        }
-
-        @Override
-        Long computeChecksum() {
-            if (null == crc32) {
-                crc32 = ProtocolUtils.truncateOpCRC32(stream, dlsn);
-            }
-            return crc32;
-        }
-
-        @Override
-        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
-            return sc.getService().truncate(stream, dlsn.serialize(), ctx);
-        }
-
-        Future<Boolean> result() {
-            return result.map(new AbstractFunction1<WriteResponse, Boolean>() {
-                @Override
-                public Boolean apply(WriteResponse response) {
-                    return true;
-                }
-            });
-        }
-    }
-
-    class WriteRecordSetOp extends WriteOp {
-
-        WriteRecordSetOp(String name, LogRecordSetBuffer recordSet) {
-            super(name, recordSet.getBuffer());
-            ctx.setIsRecordSet(true);
-        }
-
-    }
-
-
-    class ReleaseOp extends AbstractWriteOp {
-
-        ReleaseOp(String name) {
-            super(name, clientStats.getOpStats("release"));
-        }
-
-        @Override
-        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
-            return sc.getService().release(stream, ctx);
-        }
-
-        @Override
-        void beforeComplete(ProxyClient sc, ResponseHeader header) {
-            ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted");
-        }
-
-        Future<Void> result() {
-            return result.map(new AbstractFunction1<WriteResponse, Void>() {
-                @Override
-                public Void apply(WriteResponse response) {
-                    return null;
-                }
-            });
-        }
-    }
-
-    class DeleteOp extends AbstractWriteOp {
-
-        DeleteOp(String name) {
-            super(name, clientStats.getOpStats("delete"));
-        }
-
-        @Override
-        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
-            return sc.getService().delete(stream, ctx);
-        }
-
-        @Override
-        void beforeComplete(ProxyClient sc, ResponseHeader header) {
-            ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted");
-        }
-
-        Future<Void> result() {
-            return result.map(new AbstractFunction1<WriteResponse, Void>() {
-                @Override
-                public Void apply(WriteResponse v1) {
-                    return null;
-                }
-            });
-        }
-    }
-
-    class CreateOp extends AbstractWriteOp {
-
-        CreateOp(String name) {
-            super(name, clientStats.getOpStats("create"));
-        }
-
-        @Override
-        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
-            return sc.getService().create(stream, ctx);
-        }
-
-        @Override
-        void beforeComplete(ProxyClient sc, ResponseHeader header) {
-            ownershipCache.updateOwner(stream, sc.getAddress());
-        }
-
-        Future<Void> result() {
-            return result.map(new AbstractFunction1<WriteResponse, Void>() {
-                @Override
-                public Void apply(WriteResponse v1) {
-                    return null;
-                }
-            }).voided();
-        }
-    }
-
-    class HeartbeatOp extends AbstractWriteOp {
-        HeartbeatOptions options;
-
-        HeartbeatOp(String name, boolean sendReaderHeartBeat) {
-            super(name, clientStats.getOpStats("heartbeat"));
-            options = new HeartbeatOptions();
-            options.setSendHeartBeatToReader(sendReaderHeartBeat);
-        }
-
-        @Override
-        Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
-            return sc.getService().heartbeatWithOptions(stream, ctx, options);
-        }
-
-        Future<Void> result() {
-            return result.map(new AbstractFunction1<WriteResponse, Void>() {
-                @Override
-                public Void apply(WriteResponse response) {
-                    return null;
-                }
-            });
-        }
-    }
-
-    // Stats
-    private final ClientStats clientStats;
-
-    public DistributedLogClientImpl(String name,
-                                    ClientId clientId,
-                                    RoutingService routingService,
-                                    ClientBuilder clientBuilder,
-                                    ClientConfig clientConfig,
-                                    Optional<ClusterClient> clusterClient,
-                                    StatsReceiver statsReceiver,
-                                    StatsReceiver streamStatsReceiver,
-                                    RegionResolver regionResolver,
-                                    boolean enableRegionStats) {
-        this.clientName = name;
-        this.clientId = clientId;
-        this.routingService = routingService;
-        this.clientConfig = clientConfig;
-        this.streamFailfast = clientConfig.getStreamFailfast();
-        this.streamNameRegexPattern = Pattern.compile(clientConfig.getStreamNameRegex());
-        this.regionResolver = regionResolver;
-        // Build the timer
-        this.dlTimer = new HashedWheelTimer(
-                new ThreadFactoryBuilder().setNameFormat("DLClient-" + name + "-timer-%d").build(),
-                this.clientConfig.getRedirectBackoffStartMs(),
-                TimeUnit.MILLISECONDS);
-        // register routing listener
-        this.routingService.registerListener(this);
-        // build the ownership cache
-        this.ownershipCache = new OwnershipCache(this.clientConfig, this.dlTimer, statsReceiver, streamStatsReceiver);
-        // Client Stats
-        this.clientStats = new ClientStats(statsReceiver, enableRegionStats, regionResolver);
-        // Client Manager
-        this.clientBuilder = ProxyClient.newBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats);
-        this.clientManager = new ProxyClientManager(
-                this.clientConfig,  // client config
-                this.clientBuilder, // client builder
-                this.dlTimer,       // timer
-                this,               // host provider
-                clientStats);       // client stats
-        this.clusterClient = clusterClient;
-        this.clientManager.registerProxyListener(this);
-
-        // Cache Stats
-        StatsReceiver cacheStatReceiver = statsReceiver.scope("cache");
-        Seq<String> numCachedStreamsGaugeName =
-                scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_streams")).toList();
-        cacheStatReceiver.provideGauge(numCachedStreamsGaugeName, new Function0<Object>() {
-            @Override
-            public Object apply() {
-                return (float) ownershipCache.getNumCachedStreams();
-            }
-        });
-        Seq<String> numCachedHostsGaugeName =
-                scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_hosts")).toList();
-        cacheStatReceiver.provideGauge(numCachedHostsGaugeName, new Function0<Object>() {
-            @Override
-            public Object apply() {
-                return (float) clientManager.getNumProxies();
-            }
-        });
-
-        logger.info("Build distributedlog client : name = {}, client_id = {}, routing_service = {},"
-            + " stats_receiver = {}, thriftmux = {}",
-            new Object[] {
-                name,
-                clientId,
-                routingService.getClass(),
-                statsReceiver.getClass(),
-                clientConfig.getThriftMux()
-            });
-    }
-
-    @Override
-    public Set<SocketAddress> getHosts() {
-        Set<SocketAddress> hosts = Sets.newHashSet();
-        // if using server side routing, we only handshake with the hosts in ownership cache.
-        if (!clusterClient.isPresent()) {
-            hosts.addAll(this.routingService.getHosts());
-        }
-        hosts.addAll(this.ownershipCache.getStreamOwnershipDistribution().keySet());
-        return hosts;
-    }
-
-    @Override
-    public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
-        if (null != serverInfo
-            && serverInfo.isSetServerStatus()
-            && ServerStatus.DOWN == serverInfo.getServerStatus()) {
-            logger.info("{} is detected as DOWN during handshaking", address);
-            // server is shutting down
-            handleServiceUnavailable(address, client, Optional.<StreamOp>absent());
-            return;
-        }
-
-        if (null != serverInfo && serverInfo.isSetOwnerships()) {
-            Map<String, String> ownerships = serverInfo.getOwnerships();
-            logger.debug("Handshaked with {} : {} ownerships returned.", address, ownerships.size());
-            for (Map.Entry<String, String> entry : ownerships.entrySet()) {
-                Matcher matcher = streamNameRegexPattern.matcher(entry.getKey());
-                if (!matcher.matches()) {
-                    continue;
-                }
-                updateOwnership(entry.getKey(), entry.getValue());
-            }
-        } else {
-            logger.debug("Handshaked with {} : no ownerships returned", address);
-        }
-    }
-
-    @Override
-    public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
-        cause = showRootCause(Optional.<StreamOp>absent(), cause);
-        handleRequestException(address, client, Optional.<StreamOp>absent(), cause);
-    }
-
-    @VisibleForTesting
-    public void handshake() {
-        clientManager.handshake();
-        logger.info("Handshaked with {} hosts, cached {} streams",
-                clientManager.getNumProxies(), ownershipCache.getNumCachedStreams());
-    }
-
-    @Override
-    public void onServerLeft(SocketAddress address) {
-        onServerLeft(address, null);
-    }
-
-    private void onServerLeft(SocketAddress address, ProxyClient sc) {
-        ownershipCache.removeAllStreamsFromOwner(address);
-        if (null == sc) {
-            clientManager.removeClient(address);
-        } else {
-            clientManager.removeClient(address, sc);
-        }
-    }
-
-    @Override
-    public void onServerJoin(SocketAddress address) {
-        // we only pre-create connection for client-side routing
-        // if it is server side routing, we only know the exact proxy address
-        // when #getOwner.
-        if (!clusterClient.isPresent()) {
-            clientManager.createClient(address);
-        }
-    }
-
-    public void close() {
-        closeLock.writeLock().lock();
-        try {
-            if (closed) {
-                return;
-            }
-            closed = true;
-        } finally {
-            closeLock.writeLock().unlock();
-        }
-        clientManager.close();
-        routingService.unregisterListener(this);
-        routingService.stopService();
-        dlTimer.stop();
-    }
-
-    @Override
-    public Future<Void> check(String stream) {
-        final HeartbeatOp op = new HeartbeatOp(stream, false);
-        sendRequest(op);
-        return op.result();
-    }
-
-    @Override
-    public Future<Void> heartbeat(String stream) {
-        final HeartbeatOp op = new HeartbeatOp(stream, true);
-        sendRequest(op);
-        return op.result();
-    }
-
-    @Override
-    public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() {
-        return ownershipCache.getStreamOwnershipDistribution();
-    }
-
-    @Override
-    public Future<Void> setAcceptNewStream(boolean enabled) {
-        Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients();
-        List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size());
-        for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) {
-            futures.add(entry.getValue().getService().setAcceptNewStream(enabled));
-        }
-        return Future.collect(futures).map(new Function<List<Void>, Void>() {
-            @Override
-            public Void apply(List<Void> list) {
-                return null;
-            }
-        });
-    }
-
-    @Override
-    public Future<DLSN> write(String stream, ByteBuffer data) {
-        final WriteOp op = new WriteOp(stream, data);
-        sendRequest(op);
-        return op.result();
-    }
-
-    @Override
-    public Future<DLSN> writeRecordSet(String stream, final LogRecordSetBuffer recordSet) {
-        final WriteRecordSetOp op = new WriteRecordSetOp(stream, recordSet);
-        sendRequest(op);
-        return op.result();
-    }
-
-    @Override
-    public List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data) {
-        if (data.size() > 0) {
-            final BulkWriteOp op = new BulkWriteOp(stream, data);
-            sendRequest(op);
-            return op.result();
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    @Override
-    public Future<Boolean> truncate(String stream, DLSN dlsn) {
-        final TruncateOp op = new TruncateOp(stream, dlsn);
-        sendRequest(op);
-        return op.result();
-    }
-
-    @Override
-    public Future<Void> delete(String stream) {
-        final DeleteOp op = new DeleteOp(stream);
-        sendRequest(op);
-        return op.result();
-    }
-
-    @Override
-    public Future<Void> release(String stream) {
-        final ReleaseOp op = new ReleaseOp(stream);
-        sendRequest(op);
-        return op.result();
-    }
-
-    @Override
-    public Future<Void> create(String stream) {
-        final CreateOp op = new CreateOp(stream);
-        sendRequest(op);
-        return op.result();
-    }
-
-    private void sendRequest(final StreamOp op) {
-        closeLock.readLock().lock();
-        try {
-            if (closed) {
-                op.fail(null, new DLClientClosedException("Client " + clientName + " is closed."));
-            } else {
-                doSend(op, null);
-            }
-        } finally {
-            closeLock.readLock().unlock();
-        }
-    }
-
-    /**
-     * Send the stream operation by routing service, excluding previous address if it is not null.
-     *
-     * @param op
-     *          stream operation.
-     * @param previousAddr
-     *          previous tried address.
-     */
-    private void doSend(final StreamOp op, final SocketAddress previousAddr) {
-        if (null != previousAddr) {
-            op.routingContext.addTriedHost(previousAddr, StatusCode.WRITE_EXCEPTION);
-        }
-        // Get host first
-        final SocketAddress address = ownershipCache.getOwner(op.stream);
-        if (null == address || op.routingContext.isTriedHost(address)) {
-            getOwner(op).addEventListener(new FutureEventListener<SocketAddress>() {
-                @Override
-                public void onFailure(Throwable cause) {
-                    op.fail(null, cause);
-                }
-
-                @Override
-                public void onSuccess(SocketAddress ownerAddr) {
-                    op.send(ownerAddr);
-                }
-            });
-        } else {
-            op.send(address);
-        }
-    }
-
-    private void retryGetOwnerFromResourcePlacementServer(final StreamOp op,
-                                                final Promise<SocketAddress> getOwnerPromise,
-                                                final Throwable cause) {
-        if (op.shouldTimeout()) {
-            op.fail(null, cause);
-            return;
-        }
-        getOwnerFromResourcePlacementServer(op, getOwnerPromise);
-    }
-
-    private void getOwnerFromResourcePlacementServer(final StreamOp op,
-                                                     final Promise<SocketAddress> getOwnerPromise) {
-        clusterClient.get().getService().getOwner(op.stream, op.ctx)
-            .addEventListener(new FutureEventListener<WriteResponse>() {
-                @Override
-                public void onFailure(Throwable cause) {
-                    getOwnerPromise.updateIfEmpty(new Throw<SocketAddress>(cause));
-                }
-
-                @Override
-                public void onSuccess(WriteResponse value) {
-                    if (StatusCode.FOUND == value.getHeader().getCode()
-                          && null != value.getHeader().getLocation()) {
-                        try {
-                            InetSocketAddress addr = DLSocketAddress.deserialize(
-                                value.getHeader().getLocation()
-                            ).getSocketAddress();
-                            getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr));
-                        } catch (IOException e) {
-                            // retry from the routing server again
-                            logger.error("ERROR in getOwner", e);
-                            retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, e);
-                            return;
-                        }
-                    } else {
-                        // retry from the routing server again
-                        retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise,
-                                new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown"));
-                    }
-                }
-            });
-    }
-
-    private Future<SocketAddress> getOwner(final StreamOp op) {
-        if (clusterClient.isPresent()) {
-            final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>();
-            getOwnerFromResourcePlacementServer(op, getOwnerPromise);
-            return getOwnerPromise;
-        }
-        // pickup host by hashing
-        try {
-            return Future.value(routingService.getHost(op.stream, op.routingContext));
-        } catch (NoBrokersAvailableException nbae) {
-            return Future.exception(nbae);
-        }
-    }
-
-    private void sendWriteRequest(final SocketAddress addr, final StreamOp op) {
-        // Get corresponding finagle client
-        final ProxyClient sc = clientManager.getClient(addr);
-        final long startTimeNanos = System.nanoTime();
-        // write the request to that host.
-        op.sendRequest(sc).addEventListener(new FutureEventListener<ResponseHeader>() {
-            @Override
-            public void onSuccess(ResponseHeader header) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Received response; header: {}", header);
-                }
-                clientStats.completeProxyRequest(addr, header.getCode(), startTimeNanos);
-                // update routing context
-                op.routingContext.addTriedHost(addr, header.getCode());
-                switch (header.getCode()) {
-                    case SUCCESS:
-                        // success handling is done per stream op
-                        break;
-                    case FOUND:
-                        handleRedirectResponse(header, op, addr);
-                        break;
-                    // for overcapacity, dont report failure since this normally happens quite a bit
-                    case OVER_CAPACITY:
-                        logger.debug("Failed to write request to {} : {}", op.stream, header);
-                        op.fail(addr, DLException.of(header));
-                        break;
-                    // for responses that indicate the requests definitely failed,
-                    // we should fail them immediately (e.g. TOO_LARGE_RECORD, METADATA_EXCEPTION)
-                    case NOT_IMPLEMENTED:
-                    case METADATA_EXCEPTION:
-                    case LOG_EMPTY:
-                    case LOG_NOT_FOUND:
-                    case TRUNCATED_TRANSACTION:
-                    case END_OF_STREAM:
-                    case TRANSACTION_OUT_OF_ORDER:
-                    case INVALID_STREAM_NAME:
-                    case REQUEST_DENIED:
-                    case TOO_LARGE_RECORD:
-                    case CHECKSUM_FAILED:
-                    // status code NOT_READY is returned if failfast is enabled in the server. don't redirect
-                    // since the proxy may still own the stream.
-                    case STREAM_NOT_READY:
-                        op.fail(addr, DLException.of(header));
-                        break;
-                    case SERVICE_UNAVAILABLE:
-                        handleServiceUnavailable(addr, sc, Optional.of(op));
-                        break;
-                    case REGION_UNAVAILABLE:
-                        // region is unavailable, redirect the request to hosts in other region
-                        redirect(op, null);
-                        break;
-                    // Proxy was overloaded and refused to try to acquire the stream. Don't remove ownership, since
-                    // we didn't have it in the first place.
-                    case TOO_MANY_STREAMS:
-                        handleRedirectableError(addr, op, header);
-                        break;
-                    case STREAM_UNAVAILABLE:
-                    case ZOOKEEPER_ERROR:
-                    case LOCKING_EXCEPTION:
-                    case UNEXPECTED:
-                    case INTERRUPTED:
-                    case BK_TRANSMIT_ERROR:
-                    case FLUSH_TIMEOUT:
-                    default:
-                        // when we are receiving these exceptions from proxy, it means proxy or the stream is closed
-                        // redirect the request.
-                        ownershipCache.removeOwnerFromStream(op.stream, addr, header.getCode().name());
-                        handleRedirectableError(addr, op, header);
-                        break;
-                }
-            }
-
-            @Override
-            public void onFailure(Throwable cause) {
-                Optional<StreamOp> opOptional = Optional.of(op);
-                cause = showRootCause(opOptional, cause);
-                clientStats.failProxyRequest(addr, cause, startTimeNanos);
-                handleRequestException(addr, sc, opOptional, cause);
-            }
-        });
-    }
-
-    // Response Handlers
-
-    Throwable showRootCause(Optional<StreamOp> op, Throwable cause) {
-        if (cause instanceof Failure) {
-            Failure failure = (Failure) cause;
-            if (failure.isFlagged(Failure.Wrapped())) {
-                try {
-                    // if it is a wrapped failure, unwrap it first
-                    cause = failure.show();
-                } catch (IllegalArgumentException iae) {
-                    if (op.isPresent()) {
-                        logger.warn("Failed to unwrap finagle failure of stream {} : ", op.get().stream, iae);
-                    } else {
-                        logger.warn("Failed to unwrap finagle failure : ", iae);
-                    }
-                }
-            }
-        }
-        return cause;
-    }
-
-    private void handleRedirectableError(SocketAddress addr,
-                                         StreamOp op,
-                                         ResponseHeader header) {
-        if (streamFailfast) {
-            op.fail(addr, DLException.of(header));
-        } else {
-            redirect(op, null);
-        }
-    }
-
-    void handleServiceUnavailable(SocketAddress addr,
-                                  ProxyClient sc,
-                                  Optional<StreamOp> op) {
-        // service is unavailable, remove it out of routing service
-        routingService.removeHost(addr, new ServiceUnavailableException(addr + " is unavailable now."));
-        onServerLeft(addr);
-        if (op.isPresent()) {
-            ownershipCache.removeOwnerFromStream(op.get().stream, addr, addr + " is unavailable now.");
-            // redirect the request to other host.
-            redirect(op.get(), null);
-        }
-    }
-
-    void handleRequestException(SocketAddress addr,
-                                ProxyClient sc,
-                                Optional<StreamOp> op,
-                                Throwable cause) {
-        boolean resendOp = false;
-        boolean removeOwnerFromStream = false;
-        SocketAddress previousAddr = addr;
-        String reason = cause.getMessage();
-        if (cause instanceof ConnectionFailedException || cause instanceof java.net.ConnectException) {
-            routingService.removeHost(addr, cause);
-            onServerLeft(addr, sc);
-            removeOwnerFromStream = true;
-            // redirect the request to other host.
-            resendOp = true;
-        } else if (cause instanceof ChannelException) {
-            // java.net.ConnectException typically means connection is refused remotely
-            // no process listening on remote address/port.
-            if (cause.getCause() instanceof java.net.ConnectException) {
-                routingService.removeHost(addr, cause.getCause());
-                onServerLeft(addr);
-                reason = cause.getCause().getMessage();
-            } else {
-                routingService.removeHost(addr, cause);
-                reason = cause.getMessage();
-            }
-            removeOwnerFromStream = true;
-            // redirect the request to other host.
-            resendOp = true;
-        } else if (cause instanceof ServiceTimeoutException) {
-            // redirect the request to itself again, which will backoff for a while
-            resendOp = true;
-            previousAddr = null;
-        } else if (cause instanceof WriteException) {
-            // redirect the request to other host.
-            resendOp = true;
-        } else if (cause instanceof ServiceException) {
-            // redirect the request to other host.
-            clientManager.removeClient(addr, sc);
-            resendOp = true;
-        } else if (cause instanceof TApplicationException) {
-            handleTApplicationException(cause, op, addr, sc);
-        } else if (cause instanceof Failure) {
-            handleFinagleFailure((Failure) cause, op, addr);
-        } else {
-            // Default handler
-            handleException(cause, op, addr);
-        }
-
-        if (op.isPresent()) {
-            if (removeOwnerFromStream) {
-                ownershipCache.removeOwnerFromStream(op.get().stream, addr, reason);
-            }
-            if (resendOp) {
-                doSend(op.get(), previousAddr);
-            }
-        }
-    }
-
-    /**
-     * Redirect the request to new proxy <i>newAddr</i>. If <i>newAddr</i> is null,
-     * it would pick up a host from routing service.
-     *
-     * @param op
-     *          stream operation
-     * @param newAddr
-     *          new proxy address
-     */
-    void redirect(StreamOp op, SocketAddress newAddr) {
-        ownershipCache.getOwnershipStatsLogger().onRedirect(op.stream);
-        if (null != newAddr) {
-            logger.debug("Redirect request {} to new owner {}.", op, newAddr);
-            op.send(newAddr);
-        } else {
-            doSend(op, null);
-        }
-    }
-
-    void handleFinagleFailure(Failure failure,
-                              Optional<StreamOp> op,
-                              SocketAddress addr) {
-        if (failure.isFlagged(Failure.Restartable())) {
-            if (op.isPresent()) {
-                // redirect the request to other host
-                doSend(op.get(), addr);
-            }
-        } else {
-            // fail the request if it is other types of failures
-            handleException(failure, op, addr);
-        }
-    }
-
-    void handleException(Throwable cause,
-                         Optional<StreamOp> op,
-                         SocketAddress addr) {
-        // RequestTimeoutException: fail it and let client decide whether to retry or not.
-
-        // FailedFastException:
-        // We don't actually know when FailedFastException will be thrown
-        // so properly we just throw it back to application to let application
-        // handle it.
-
-        // Other Exceptions: as we don't know how to handle them properly so throw them to client
-        if (op.isPresent()) {
-            logger.error("Failed to write request to {} @ {} : {}",
-                    new Object[]{op.get().stream, addr, cause.toString()});
-            op.get().fail(addr, cause);
-        }
-    }
-
-    void handleTApplicationException(Throwable cause,
-                                     Optional<StreamOp> op,
-                                     SocketAddress addr,
-                                     ProxyClient sc) {
-        TApplicationException ex = (TApplicationException) cause;
-        if (ex.getType() == TApplicationException.UNKNOWN_METHOD) {
-            // if we encountered unknown method exception on thrift server, it means this proxy
-            // has problem. we should remove it from routing service, clean up ownerships
-            routingService.removeHost(addr, cause);
-            onServerLeft(addr, sc);
-            if (op.isPresent()) {
-                ownershipCache.removeOwnerFromStream(op.get().stream, addr, cause.getMessage());
-                doSend(op.get(), addr);
-            }
-        } else {
-            handleException(cause, op, addr);
-        }
-    }
-
-    void handleRedirectResponse(ResponseHeader header, StreamOp op, SocketAddress curAddr) {
-        SocketAddress ownerAddr = null;
-        if (header.isSetLocation()) {
-            String owner = header.getLocation();
-            try {
-                ownerAddr = DLSocketAddress.deserialize(owner).getSocketAddress();
-                // if we are receiving a direct request to same host, we won't try the same host.
-                // as the proxy will shut itself down if it redirects client to itself.
-                if (curAddr.equals(ownerAddr)) {
-                    logger.warn("Request to stream {} is redirected to same server {}!", op.stream, curAddr);
-                    ownerAddr = null;
-                } else {
-                    // update ownership when redirects.
-                    ownershipCache.updateOwner(op.stream, ownerAddr);
-                }
-            } catch (IOException e) {
-                ownerAddr = null;
-            }
-        }
-        redirect(op, ownerAddr);
-    }
-
-    void updateOwnership(String stream, String location) {
-        try {
-            SocketAddress ownerAddr = DLSocketAddress.deserialize(location).getSocketAddress();
-            // update ownership
-            ownershipCache.updateOwner(stream, ownerAddr);
-        } catch (IOException e) {
-            logger.warn("Invalid ownership {} found for stream {} : ",
-                new Object[] { location, stream, e });
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java
deleted file mode 100644
index 8ccbbfc..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
-import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Ticker;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.LogRecordSet;
-import com.twitter.distributedlog.LogRecordSetBuffer;
-import com.twitter.distributedlog.client.speculative.DefaultSpeculativeRequestExecutionPolicy;
-import com.twitter.distributedlog.client.speculative.SpeculativeRequestExecutionPolicy;
-import com.twitter.distributedlog.client.speculative.SpeculativeRequestExecutor;
-import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
-import com.twitter.distributedlog.exceptions.WriteException;
-import com.twitter.distributedlog.io.CompressionCodec;
-import com.twitter.distributedlog.service.DistributedLogClient;
-import com.twitter.finagle.IndividualRequestTimeoutException;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Write to multiple streams.
- */
-public class DistributedLogMultiStreamWriter implements Runnable {
-
-    /**
-     * Create a new builder to create a multi stream writer.
-     *
-     * @return a new builder to create a multi stream writer.
-     */
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    /**
-     * Builder for the multi stream writer.
-     */
-    public static class Builder {
-
-        private DistributedLogClient client = null;
-        private List<String> streams = null;
-        private int bufferSize = 16 * 1024; // 16k
-        private long flushIntervalMicros = 2000; // 2ms
-        private CompressionCodec.Type codec = CompressionCodec.Type.NONE;
-        private ScheduledExecutorService executorService = null;
-        private long requestTimeoutMs = 500; // 500ms
-        private int firstSpeculativeTimeoutMs = 50; // 50ms
-        private int maxSpeculativeTimeoutMs = 200; // 200ms
-        private float speculativeBackoffMultiplier = 2;
-        private Ticker ticker = Ticker.systemTicker();
-
-        private Builder() {}
-
-        /**
-         * Set the distributedlog client used for multi stream writer.
-         *
-         * @param client
-         *          distributedlog client
-         * @return builder
-         */
-        public Builder client(DistributedLogClient client) {
-            this.client = client;
-            return this;
-        }
-
-        /**
-         * Set the list of streams to write to.
-         *
-         * @param streams
-         *          list of streams to write
-         * @return builder
-         */
-        public Builder streams(List<String> streams) {
-            this.streams = streams;
-            return this;
-        }
-
-        /**
-         * Set the output buffer size.
-         *
-         * <p>If output buffer size is 0, the writes will be transmitted to
-         * wire immediately.
-         *
-         * @param bufferSize
-         *          output buffer size
-         * @return builder
-         */
-        public Builder bufferSize(int bufferSize) {
-            this.bufferSize = bufferSize;
-            return this;
-        }
-
-        /**
-         * Set the flush interval in milliseconds.
-         *
-         * @param flushIntervalMs
-         *          flush interval in milliseconds.
-         * @return builder
-         */
-        public Builder flushIntervalMs(int flushIntervalMs) {
-            this.flushIntervalMicros = TimeUnit.MILLISECONDS.toMicros(flushIntervalMs);
-            return this;
-        }
-
-        /**
-         * Set the flush interval in microseconds.
-         *
-         * @param flushIntervalMicros
-         *          flush interval in microseconds.
-         * @return builder
-         */
-        public Builder flushIntervalMicros(int flushIntervalMicros) {
-            this.flushIntervalMicros = flushIntervalMicros;
-            return this;
-        }
-
-        /**
-         * Set compression codec.
-         *
-         * @param codec compression codec.
-         * @return builder
-         */
-        public Builder compressionCodec(CompressionCodec.Type codec) {
-            this.codec = codec;
-            return this;
-        }
-
-        /**
-         * Set the scheduler to flush output buffers.
-         *
-         * @param executorService
-         *          executor service to flush output buffers.
-         * @return builder
-         */
-        public Builder scheduler(ScheduledExecutorService executorService) {
-            this.executorService = executorService;
-            return this;
-        }
-
-        /**
-         * Set request timeout in milliseconds.
-         *
-         * @param requestTimeoutMs
-         *          request timeout in milliseconds.
-         * @return builder
-         */
-        public Builder requestTimeoutMs(long requestTimeoutMs) {
-            this.requestTimeoutMs = requestTimeoutMs;
-            return this;
-        }
-
-        /**
-         * Set the first speculative timeout in milliseconds.
-         *
-         * <p>The multi-streams writer does speculative writes on streams.
-         * The write issues first write request to a stream, if the write request
-         * doesn't respond within speculative timeout. it issues next write request
-         * to a different stream. It does such speculative retries until receive
-         * a success or request timeout ({@link #requestTimeoutMs(long)}).
-         *
-         * <p>This setting is to configure the first speculative timeout, in milliseconds.
-         *
-         * @param timeoutMs
-         *          timeout in milliseconds
-         * @return builder
-         */
-        public Builder firstSpeculativeTimeoutMs(int timeoutMs) {
-            this.firstSpeculativeTimeoutMs = timeoutMs;
-            return this;
-        }
-
-        /**
-         * Set the max speculative timeout in milliseconds.
-         *
-         * <p>The multi-streams writer does speculative writes on streams.
-         * The write issues first write request to a stream, if the write request
-         * doesn't respond within speculative timeout. it issues next write request
-         * to a different stream. It does such speculative retries until receive
-         * a success or request timeout ({@link #requestTimeoutMs(long)}).
-         *
-         * <p>This setting is to configure the max speculative timeout, in milliseconds.
-         *
-         * @param timeoutMs
-         *          timeout in milliseconds
-         * @return builder
-         */
-        public Builder maxSpeculativeTimeoutMs(int timeoutMs) {
-            this.maxSpeculativeTimeoutMs = timeoutMs;
-            return this;
-        }
-
-        /**
-         * Set the speculative timeout backoff multiplier.
-         *
-         * <p>The multi-streams writer does speculative writes on streams.
-         * The write issues first write request to a stream, if the write request
-         * doesn't respond within speculative timeout. it issues next write request
-         * to a different stream. It does such speculative retries until receive
-         * a success or request timeout ({@link #requestTimeoutMs(long)}).
-         *
-         * <p>This setting is to configure the speculative timeout backoff multiplier.
-         *
-         * @param multiplier
-         *          backoff multiplier
-         * @return builder
-         */
-        public Builder speculativeBackoffMultiplier(float multiplier) {
-            this.speculativeBackoffMultiplier = multiplier;
-            return this;
-        }
-
-        /**
-         * Ticker for timing.
-         *
-         * @param ticker
-         *          ticker
-         * @return builder
-         * @see Ticker
-         */
-        public Builder clockTicker(Ticker ticker) {
-            this.ticker = ticker;
-            return this;
-        }
-
-        /**
-         * Build the multi stream writer.
-         *
-         * @return the multi stream writer.
-         */
-        public DistributedLogMultiStreamWriter build() {
-            checkArgument((null != streams && !streams.isEmpty()),
-                    "No streams provided");
-            checkNotNull(client,
-                    "No distributedlog client provided");
-            checkNotNull(codec,
-                    "No compression codec provided");
-            checkArgument(firstSpeculativeTimeoutMs > 0
-                    && firstSpeculativeTimeoutMs <= maxSpeculativeTimeoutMs
-                    && speculativeBackoffMultiplier > 0
-                    && maxSpeculativeTimeoutMs < requestTimeoutMs,
-                    "Invalid speculative timeout settings");
-            return new DistributedLogMultiStreamWriter(
-                streams,
-                client,
-                Math.min(bufferSize, MAX_LOGRECORDSET_SIZE),
-                flushIntervalMicros,
-                requestTimeoutMs,
-                firstSpeculativeTimeoutMs,
-                maxSpeculativeTimeoutMs,
-                speculativeBackoffMultiplier,
-                codec,
-                ticker,
-                executorService);
-        }
-    }
-
-    /**
-     * Pending Write Request.
-     */
-    class PendingWriteRequest implements FutureEventListener<DLSN>,
-            SpeculativeRequestExecutor {
-
-        private final LogRecordSetBuffer recordSet;
-        private AtomicBoolean complete = new AtomicBoolean(false);
-        private final Stopwatch stopwatch = Stopwatch.createStarted(clockTicker);
-        private int nextStream;
-        private int numTriedStreams = 0;
-
-        PendingWriteRequest(LogRecordSetBuffer recordSet) {
-            this.recordSet = recordSet;
-            this.nextStream = Math.abs(nextStreamId.incrementAndGet()) % numStreams;
-        }
-
-        synchronized String sendNextWrite() {
-            long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-            if (elapsedMs > requestTimeoutMs || numTriedStreams >= numStreams) {
-                fail(new IndividualRequestTimeoutException(Duration.fromMilliseconds(elapsedMs)));
-                return null;
-            }
-            try {
-                return sendWriteToStream(nextStream);
-            } finally {
-                nextStream = (nextStream + 1) % numStreams;
-                ++numTriedStreams;
-            }
-        }
-
-        synchronized String sendWriteToStream(int streamId) {
-            String stream = getStream(streamId);
-            client.writeRecordSet(stream, recordSet)
-                    .addEventListener(this);
-            return stream;
-        }
-
-        @Override
-        public void onSuccess(DLSN dlsn) {
-            if (!complete.compareAndSet(false, true)) {
-                return;
-            }
-            recordSet.completeTransmit(
-                    dlsn.getLogSegmentSequenceNo(),
-                    dlsn.getEntryId(),
-                    dlsn.getSlotId());
-        }
-
-        @Override
-        public void onFailure(Throwable cause) {
-            sendNextWrite();
-        }
-
-        private void fail(Throwable cause) {
-            if (!complete.compareAndSet(false, true)) {
-                return;
-            }
-            recordSet.abortTransmit(cause);
-        }
-
-        @Override
-        public Future<Boolean> issueSpeculativeRequest() {
-            return Future.value(!complete.get() && null != sendNextWrite());
-        }
-    }
-
-    private final int numStreams;
-    private final List<String> streams;
-    private final DistributedLogClient client;
-    private final int bufferSize;
-    private final long requestTimeoutMs;
-    private final SpeculativeRequestExecutionPolicy speculativePolicy;
-    private final Ticker clockTicker;
-    private final CompressionCodec.Type codec;
-    private final ScheduledExecutorService scheduler;
-    private final boolean ownScheduler;
-    private final AtomicInteger nextStreamId;
-    private LogRecordSet.Writer recordSetWriter;
-
-    private DistributedLogMultiStreamWriter(List<String> streams,
-                                            DistributedLogClient client,
-                                            int bufferSize,
-                                            long flushIntervalMicros,
-                                            long requestTimeoutMs,
-                                            int firstSpecultiveTimeoutMs,
-                                            int maxSpeculativeTimeoutMs,
-                                            float speculativeBackoffMultiplier,
-                                            CompressionCodec.Type codec,
-                                            Ticker clockTicker,
-                                            ScheduledExecutorService scheduler) {
-        this.streams = Lists.newArrayList(streams);
-        this.numStreams = this.streams.size();
-        this.client = client;
-        this.bufferSize = bufferSize;
-        this.requestTimeoutMs = requestTimeoutMs;
-        this.codec = codec;
-        this.clockTicker = clockTicker;
-        if (null == scheduler) {
-            this.scheduler = Executors.newSingleThreadScheduledExecutor(
-                    new ThreadFactoryBuilder()
-                            .setDaemon(true)
-                            .setNameFormat("MultiStreamWriterFlushThread-%d")
-                            .build());
-            this.ownScheduler = true;
-        } else {
-            this.scheduler = scheduler;
-            this.ownScheduler = false;
-        }
-        this.speculativePolicy = new DefaultSpeculativeRequestExecutionPolicy(
-                firstSpecultiveTimeoutMs,
-                maxSpeculativeTimeoutMs,
-                speculativeBackoffMultiplier);
-        // shuffle the streams
-        Collections.shuffle(this.streams);
-        this.nextStreamId = new AtomicInteger(0);
-        this.recordSetWriter = newRecordSetWriter();
-
-        if (flushIntervalMicros > 0) {
-            this.scheduler.scheduleAtFixedRate(
-                    this,
-                    flushIntervalMicros,
-                    flushIntervalMicros,
-                    TimeUnit.MICROSECONDS);
-        }
-    }
-
-    String getStream(int streamId) {
-        return streams.get(streamId);
-    }
-
-    synchronized LogRecordSet.Writer getLogRecordSetWriter() {
-        return recordSetWriter;
-    }
-
-    private LogRecordSet.Writer newRecordSetWriter() {
-        return LogRecordSet.newWriter(
-                bufferSize,
-                codec);
-    }
-
-    public synchronized Future<DLSN> write(ByteBuffer buffer) {
-        int logRecordSize = buffer.remaining();
-        if (logRecordSize > MAX_LOGRECORD_SIZE) {
-            return Future.exception(new LogRecordTooLongException(
-                    "Log record of size " + logRecordSize + " written when only "
-                            + MAX_LOGRECORD_SIZE + " is allowed"));
-        }
-        // if exceed max number of bytes
-        if ((recordSetWriter.getNumBytes() + logRecordSize) > MAX_LOGRECORDSET_SIZE) {
-            flush();
-        }
-        Promise<DLSN> writePromise = new Promise<DLSN>();
-        try {
-            recordSetWriter.writeRecord(buffer, writePromise);
-        } catch (LogRecordTooLongException e) {
-            return Future.exception(e);
-        } catch (WriteException e) {
-            recordSetWriter.abortTransmit(e);
-            recordSetWriter = newRecordSetWriter();
-            return Future.exception(e);
-        }
-        if (recordSetWriter.getNumBytes() >= bufferSize) {
-            flush();
-        }
-        return writePromise;
-    }
-
-    @Override
-    public void run() {
-        flush();
-    }
-
-    private void flush() {
-        LogRecordSet.Writer recordSetToFlush;
-        synchronized (this) {
-            if (recordSetWriter.getNumRecords() == 0) {
-                return;
-            }
-            recordSetToFlush = recordSetWriter;
-            recordSetWriter = newRecordSetWriter();
-        }
-        transmit(recordSetToFlush);
-    }
-
-    private void transmit(LogRecordSet.Writer recordSetToFlush) {
-        PendingWriteRequest writeRequest =
-                new PendingWriteRequest(recordSetToFlush);
-        this.speculativePolicy.initiateSpeculativeRequest(scheduler, writeRequest);
-    }
-
-    public void close() {
-        if (ownScheduler) {
-            this.scheduler.shutdown();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java
deleted file mode 100644
index e541578..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/MonitorServiceClient.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.monitor;
-
-import com.twitter.util.Future;
-import java.net.SocketAddress;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Interface for distributedlog monitor service.
- */
-public interface MonitorServiceClient {
-
-    /**
-     * Check a given stream.
-     *
-     * @param stream
-     *          stream.
-     * @return check result.
-     */
-    Future<Void> check(String stream);
-
-    /**
-     * Send heartbeat to the stream and its readers.
-     *
-     * @param stream
-     *          stream.
-     * @return check result.
-     */
-    Future<Void> heartbeat(String stream);
-
-    /**
-     * Get current ownership distribution from current monitor service view.
-     *
-     * @return current ownership distribution
-     */
-    Map<SocketAddress, Set<String>> getStreamOwnershipDistribution();
-
-    /**
-     * Enable/Disable accepting new stream on a given proxy.
-     *
-     * @param enabled
-     *          flag to enable/disable accepting new streams on a given proxy
-     * @return void
-     */
-    Future<Void> setAcceptNewStream(boolean enabled);
-
-    /**
-     * Close the client.
-     */
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/package-info.java
deleted file mode 100644
index c4e7df0..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/monitor/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * DistributedLog Monitor Client.
- */
-package com.twitter.distributedlog.client.monitor;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java
deleted file mode 100644
index 387d727..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/OwnershipCache.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.ownership;
-
-import com.google.common.collect.ImmutableMap;
-import com.twitter.distributedlog.client.ClientConfig;
-import com.twitter.distributedlog.client.stats.OwnershipStatsLogger;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Client Side Ownership Cache.
- */
-public class OwnershipCache implements TimerTask {
-
-    private static final Logger logger = LoggerFactory.getLogger(OwnershipCache.class);
-
-    private final ConcurrentHashMap<String, SocketAddress> stream2Addresses =
-            new ConcurrentHashMap<String, SocketAddress>();
-    private final ConcurrentHashMap<SocketAddress, Set<String>> address2Streams =
-            new ConcurrentHashMap<SocketAddress, Set<String>>();
-    private final ClientConfig clientConfig;
-    private final HashedWheelTimer timer;
-
-    // Stats
-    private final OwnershipStatsLogger ownershipStatsLogger;
-
-    public OwnershipCache(ClientConfig clientConfig,
-                          HashedWheelTimer timer,
-                          StatsReceiver statsReceiver,
-                          StatsReceiver streamStatsReceiver) {
-        this.clientConfig = clientConfig;
-        this.timer = timer;
-        this.ownershipStatsLogger = new OwnershipStatsLogger(statsReceiver, streamStatsReceiver);
-        scheduleDumpOwnershipCache();
-    }
-
-    private void scheduleDumpOwnershipCache() {
-        if (clientConfig.isPeriodicDumpOwnershipCacheEnabled()
-            && clientConfig.getPeriodicDumpOwnershipCacheIntervalMs() > 0) {
-            timer.newTimeout(this, clientConfig.getPeriodicDumpOwnershipCacheIntervalMs(),
-                    TimeUnit.MILLISECONDS);
-        }
-    }
-
-    @Override
-    public void run(Timeout timeout) throws Exception {
-        if (timeout.isCancelled()) {
-            return;
-        }
-        logger.info("Ownership cache : {} streams cached, {} hosts cached",
-                stream2Addresses.size(), address2Streams.size());
-        logger.info("Cached streams : {}", stream2Addresses);
-        scheduleDumpOwnershipCache();
-    }
-
-    public OwnershipStatsLogger getOwnershipStatsLogger() {
-        return ownershipStatsLogger;
-    }
-
-    /**
-     * Update ownership of <i>stream</i> to <i>addr</i>.
-     *
-     * @param stream
-     *          Stream Name.
-     * @param addr
-     *          Owner Address.
-     * @return true if owner is updated
-     */
-    public boolean updateOwner(String stream, SocketAddress addr) {
-        // update ownership
-        SocketAddress oldAddr = stream2Addresses.putIfAbsent(stream, addr);
-        if (null != oldAddr && oldAddr.equals(addr)) {
-            return true;
-        }
-        if (null != oldAddr) {
-            if (stream2Addresses.replace(stream, oldAddr, addr)) {
-                // Store the relevant mappings for this topic and host combination
-                logger.info("Storing ownership for stream : {}, old host : {}, new host : {}.",
-                        new Object[] { stream, oldAddr, addr });
-                StringBuilder sb = new StringBuilder();
-                sb.append("Ownership changed '")
-                  .append(oldAddr).append("' -> '").append(addr).append("'");
-                removeOwnerFromStream(stream, oldAddr, sb.toString());
-
-                // update stats
-                ownershipStatsLogger.onRemove(stream);
-                ownershipStatsLogger.onAdd(stream);
-            } else {
-                logger.warn("Ownership of stream : {} has been changed from {} to {} when storing host : {}.",
-                        new Object[] { stream, oldAddr, stream2Addresses.get(stream), addr });
-                return false;
-            }
-        } else {
-            logger.info("Storing ownership for stream : {}, host : {}.", stream, addr);
-            // update stats
-            ownershipStatsLogger.onAdd(stream);
-        }
-
-        Set<String> streamsForHost = address2Streams.get(addr);
-        if (null == streamsForHost) {
-            Set<String> newStreamsForHost = new HashSet<String>();
-            streamsForHost = address2Streams.putIfAbsent(addr, newStreamsForHost);
-            if (null == streamsForHost) {
-                streamsForHost = newStreamsForHost;
-            }
-        }
-        synchronized (streamsForHost) {
-            // check whether the ownership changed, since it might happend after replace succeed
-            if (addr.equals(stream2Addresses.get(stream))) {
-                streamsForHost.add(stream);
-            }
-        }
-        return true;
-    }
-
-    /**
-     * Get the cached owner for stream <code>stream</code>.
-     *
-     * @param stream
-     *          stream to lookup ownership
-     * @return owner's address
-     */
-    public SocketAddress getOwner(String stream) {
-        SocketAddress address = stream2Addresses.get(stream);
-        if (null == address) {
-            ownershipStatsLogger.onMiss(stream);
-        } else {
-            ownershipStatsLogger.onHit(stream);
-        }
-        return address;
-    }
-
-    /**
-     * Remove the owner <code>addr</code> from <code>stream</code> for a given <code>reason</code>.
-     *
-     * @param stream stream name
-     * @param addr owner address
-     * @param reason reason to remove ownership
-     */
-    public void removeOwnerFromStream(String stream, SocketAddress addr, String reason) {
-        if (stream2Addresses.remove(stream, addr)) {
-            logger.info("Removed stream to host mapping for (stream: {} -> host: {}) : reason = '{}'.",
-                    new Object[] { stream, addr, reason });
-        }
-        Set<String> streamsForHost = address2Streams.get(addr);
-        if (null != streamsForHost) {
-            synchronized (streamsForHost) {
-                if (streamsForHost.remove(stream)) {
-                    logger.info("Removed stream ({}) from host {} : reason = '{}'.",
-                            new Object[] { stream, addr, reason });
-                    if (streamsForHost.isEmpty()) {
-                        address2Streams.remove(addr, streamsForHost);
-                    }
-                    ownershipStatsLogger.onRemove(stream);
-                }
-            }
-        }
-    }
-
-    /**
-     * Remove all streams from host <code>addr</code>.
-     *
-     * @param addr
-     *          host to remove ownerships
-     */
-    public void removeAllStreamsFromOwner(SocketAddress addr) {
-        logger.info("Remove streams mapping for host {}", addr);
-        Set<String> streamsForHost = address2Streams.get(addr);
-        if (null != streamsForHost) {
-            synchronized (streamsForHost) {
-                for (String s : streamsForHost) {
-                    if (stream2Addresses.remove(s, addr)) {
-                        logger.info("Removing mapping for stream : {} from host : {}", s, addr);
-                        ownershipStatsLogger.onRemove(s);
-                    }
-                }
-                address2Streams.remove(addr, streamsForHost);
-            }
-        }
-    }
-
-    /**
-     * Get the number cached streams.
-     *
-     * @return number cached streams.
-     */
-    public int getNumCachedStreams() {
-        return stream2Addresses.size();
-    }
-
-    /**
-     * Get the stream ownership distribution across proxies.
-     *
-     * @return stream ownership distribution
-     */
-    public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() {
-        return ImmutableMap.copyOf(address2Streams);
-    }
-
-    /**
-     * Get the stream ownership mapping.
-     *
-     * @return stream ownership mapping.
-     */
-    public Map<String, SocketAddress> getStreamOwnerMapping() {
-        return stream2Addresses;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/package-info.java
deleted file mode 100644
index 721702e..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ownership/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Utils for managing ownership at client side.
- */
-package com.twitter.distributedlog.client.ownership;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java
deleted file mode 100644
index aa167fb..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * DistributedLog Client.
- */
-package com.twitter.distributedlog.client;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java
deleted file mode 100644
index f8bdae7..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.proxy;
-
-import com.twitter.distributedlog.thrift.service.DistributedLogService;
-import com.twitter.finagle.Service;
-import com.twitter.finagle.thrift.ThriftClientRequest;
-import com.twitter.util.Future;
-import scala.runtime.BoxedUnit;
-
-/**
- * Cluster client.
- */
-public class ClusterClient {
-
-    private final Service<ThriftClientRequest, byte[]> client;
-    private final DistributedLogService.ServiceIface service;
-
-    public ClusterClient(Service<ThriftClientRequest, byte[]> client,
-                         DistributedLogService.ServiceIface service) {
-        this.client = client;
-        this.service = service;
-    }
-
-    public Service<ThriftClientRequest, byte[]> getClient() {
-        return client;
-    }
-
-    public DistributedLogService.ServiceIface getService() {
-        return service;
-    }
-
-    public Future<BoxedUnit> close() {
-        return client.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/HostProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/HostProvider.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/HostProvider.java
deleted file mode 100644
index 4878c1c..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/HostProvider.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.proxy;
-
-import java.net.SocketAddress;
-import java.util.Set;
-
-/**
- * Provider to provider list of hosts for handshaking.
- */
-public interface HostProvider {
-
-    /**
-     * Get the list of hosts for handshaking.
-     *
-     * @return list of hosts for handshaking.
-     */
-    Set<SocketAddress> getHosts();
-
-}