You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:15 UTC

[08/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorService.java
deleted file mode 100644
index b1e2879..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorService.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogConstants;
-import org.apache.distributedlog.DistributedLogManager;
-import org.apache.distributedlog.LogSegmentMetadata;
-import org.apache.distributedlog.callback.LogSegmentListener;
-import org.apache.distributedlog.callback.NamespaceListener;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.client.serverset.DLZkServerSet;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.stats.Stat;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Duration;
-import com.twitter.util.FutureEventListener;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Monitor Service.
- */
-public class MonitorService implements NamespaceListener {
-
-    private static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
-
-    private DistributedLogNamespace dlNamespace = null;
-    private MonitorServiceClient dlClient = null;
-    private DLZkServerSet[] zkServerSets = null;
-    private final ScheduledExecutorService executorService =
-            Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
-    private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
-    private final Map<String, StreamChecker> knownStreams = new HashMap<String, StreamChecker>();
-
-    // Settings
-    private int regionId = DistributedLogConstants.LOCAL_REGION_ID;
-    private int interval = 100;
-    private String streamRegex = null;
-    private boolean watchNamespaceChanges = false;
-    private boolean handshakeWithClientInfo = false;
-    private int heartbeatEveryChecks = 0;
-    private int instanceId = -1;
-    private int totalInstances = -1;
-    private boolean isThriftMux = false;
-
-    // Options
-    private final Optional<String> uriArg;
-    private final Optional<String> confFileArg;
-    private final Optional<String> serverSetArg;
-    private final Optional<Integer> intervalArg;
-    private final Optional<Integer> regionIdArg;
-    private final Optional<String> streamRegexArg;
-    private final Optional<Integer> instanceIdArg;
-    private final Optional<Integer> totalInstancesArg;
-    private final Optional<Integer> heartbeatEveryChecksArg;
-    private final Optional<Boolean> handshakeWithClientInfoArg;
-    private final Optional<Boolean> watchNamespaceChangesArg;
-    private final Optional<Boolean> isThriftMuxArg;
-
-    // Stats
-    private final StatsProvider statsProvider;
-    private final StatsReceiver statsReceiver;
-    private final StatsReceiver monitorReceiver;
-    private final Stat successStat;
-    private final Stat failureStat;
-    private final Gauge<Number> numOfStreamsGauge;
-    // Hash Function
-    private final HashFunction hashFunction = Hashing.md5();
-
-    class StreamChecker implements Runnable, FutureEventListener<Void>, LogSegmentListener {
-        private final String name;
-        private volatile boolean closed = false;
-        private volatile boolean checking = false;
-        private final Stopwatch stopwatch = Stopwatch.createUnstarted();
-        private DistributedLogManager dlm = null;
-        private int numChecks = 0;
-
-        StreamChecker(String name) {
-            this.name = name;
-        }
-
-        @Override
-        public void run() {
-            if (null == dlm) {
-                try {
-                    dlm = dlNamespace.openLog(name);
-                    dlm.registerListener(this);
-                } catch (IOException e) {
-                    if (null != dlm) {
-                        try {
-                            dlm.close();
-                        } catch (IOException e1) {
-                            logger.error("Failed to close dlm for {} : ", name, e1);
-                        }
-                        dlm = null;
-                    }
-                    executorService.schedule(this, interval, TimeUnit.MILLISECONDS);
-                }
-            } else {
-                stopwatch.reset().start();
-                boolean sendHeartBeat;
-                if (heartbeatEveryChecks > 0) {
-                    synchronized (this) {
-                        ++numChecks;
-                        if (numChecks >= Integer.MAX_VALUE) {
-                            numChecks = 0;
-                        }
-                        sendHeartBeat = (numChecks % heartbeatEveryChecks) == 0;
-                    }
-                } else {
-                    sendHeartBeat = false;
-                }
-                if (sendHeartBeat) {
-                    dlClient.heartbeat(name).addEventListener(this);
-                } else {
-                    dlClient.check(name).addEventListener(this);
-                }
-            }
-        }
-
-        @Override
-        public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
-            if (segments.size() > 0 && segments.get(0).getRegionId() == regionId) {
-                if (!checking) {
-                    logger.info("Start checking stream {}.", name);
-                    checking = true;
-                    run();
-                }
-            } else {
-                if (checking) {
-                    logger.info("Stop checking stream {}.", name);
-                }
-            }
-        }
-
-        @Override
-        public void onLogStreamDeleted() {
-            logger.info("Stream {} is deleted", name);
-        }
-
-        @Override
-        public void onSuccess(Void value) {
-            successStat.add(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-            scheduleCheck();
-        }
-
-        @Override
-        public void onFailure(Throwable cause) {
-            failureStat.add(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-            scheduleCheck();
-        }
-
-        private void scheduleCheck() {
-            if (closed) {
-                return;
-            }
-            if (!checking) {
-                return;
-            }
-            try {
-                executorService.schedule(this, interval, TimeUnit.MILLISECONDS);
-            } catch (RejectedExecutionException ree) {
-                logger.error("Failed to schedule checking stream {} in {} ms : ",
-                        new Object[] { name, interval, ree });
-            }
-        }
-
-        private void close() {
-            closed = true;
-            if (null != dlm) {
-                try {
-                    dlm.close();
-                } catch (IOException e) {
-                    logger.error("Failed to close dlm for {} : ", name, e);
-                }
-            }
-        }
-    }
-
-    MonitorService(Optional<String> uriArg,
-                   Optional<String> confFileArg,
-                   Optional<String> serverSetArg,
-                   Optional<Integer> intervalArg,
-                   Optional<Integer> regionIdArg,
-                   Optional<String> streamRegexArg,
-                   Optional<Integer> instanceIdArg,
-                   Optional<Integer> totalInstancesArg,
-                   Optional<Integer> heartbeatEveryChecksArg,
-                   Optional<Boolean> handshakeWithClientInfoArg,
-                   Optional<Boolean> watchNamespaceChangesArg,
-                   Optional<Boolean> isThriftMuxArg,
-                   StatsReceiver statsReceiver,
-                   StatsProvider statsProvider) {
-        // options
-        this.uriArg = uriArg;
-        this.confFileArg = confFileArg;
-        this.serverSetArg = serverSetArg;
-        this.intervalArg = intervalArg;
-        this.regionIdArg = regionIdArg;
-        this.streamRegexArg = streamRegexArg;
-        this.instanceIdArg = instanceIdArg;
-        this.totalInstancesArg = totalInstancesArg;
-        this.heartbeatEveryChecksArg = heartbeatEveryChecksArg;
-        this.handshakeWithClientInfoArg = handshakeWithClientInfoArg;
-        this.watchNamespaceChangesArg = watchNamespaceChangesArg;
-        this.isThriftMuxArg = isThriftMuxArg;
-
-        // Stats
-        this.statsReceiver = statsReceiver;
-        this.monitorReceiver = statsReceiver.scope("monitor");
-        this.successStat = monitorReceiver.stat0("success");
-        this.failureStat = monitorReceiver.stat0("failure");
-        this.statsProvider = statsProvider;
-        this.numOfStreamsGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return knownStreams.size();
-            }
-        };
-    }
-
-    public void runServer() throws IllegalArgumentException, IOException {
-        checkArgument(uriArg.isPresent(),
-                "No distributedlog uri provided.");
-        checkArgument(serverSetArg.isPresent(),
-                "No proxy server set provided.");
-        if (intervalArg.isPresent()) {
-            interval = intervalArg.get();
-        }
-        if (regionIdArg.isPresent()) {
-            regionId = regionIdArg.get();
-        }
-        if (streamRegexArg.isPresent()) {
-            streamRegex = streamRegexArg.get();
-        }
-        if (instanceIdArg.isPresent()) {
-            instanceId = instanceIdArg.get();
-        }
-        if (totalInstancesArg.isPresent()) {
-            totalInstances = totalInstancesArg.get();
-        }
-        if (heartbeatEveryChecksArg.isPresent()) {
-            heartbeatEveryChecks = heartbeatEveryChecksArg.get();
-        }
-        if (instanceId < 0 || totalInstances <= 0 || instanceId >= totalInstances) {
-            throw new IllegalArgumentException("Invalid instance id or total instances number.");
-        }
-        handshakeWithClientInfo = handshakeWithClientInfoArg.isPresent();
-        watchNamespaceChanges = watchNamespaceChangesArg.isPresent();
-        isThriftMux = isThriftMuxArg.isPresent();
-        URI uri = URI.create(uriArg.get());
-        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
-        if (confFileArg.isPresent()) {
-            String configFile = confFileArg.get();
-            try {
-                dlConf.loadConf(new File(configFile).toURI().toURL());
-            } catch (ConfigurationException e) {
-                throw new IOException("Failed to load distributedlog configuration from " + configFile + ".");
-            } catch (MalformedURLException e) {
-                throw new IOException("Failed to load distributedlog configuration from malformed "
-                        + configFile + ".");
-            }
-        }
-        logger.info("Starting stats provider : {}.", statsProvider.getClass());
-        statsProvider.start(dlConf);
-        String[] serverSetPaths = StringUtils.split(serverSetArg.get(), ",");
-        if (serverSetPaths.length == 0) {
-            throw new IllegalArgumentException("Invalid serverset paths provided : " + serverSetArg.get());
-        }
-
-        ServerSet[] serverSets = createServerSets(serverSetPaths);
-        ServerSet local = serverSets[0];
-        ServerSet[] remotes  = new ServerSet[serverSets.length - 1];
-        System.arraycopy(serverSets, 1, remotes, 0, remotes.length);
-
-        ClientBuilder finagleClientBuilder = ClientBuilder.get()
-            .connectTimeout(Duration.fromSeconds(1))
-            .tcpConnectTimeout(Duration.fromSeconds(1))
-            .requestTimeout(Duration.fromSeconds(2))
-            .keepAlive(true)
-            .failFast(false);
-
-        if (!isThriftMux) {
-            finagleClientBuilder = finagleClientBuilder
-                .hostConnectionLimit(2)
-                .hostConnectionCoresize(2);
-        }
-
-        dlClient = DistributedLogClientBuilder.newBuilder()
-                .name("monitor")
-                .thriftmux(isThriftMux)
-                .clientId(ClientId$.MODULE$.apply("monitor"))
-                .redirectBackoffMaxMs(50)
-                .redirectBackoffStartMs(100)
-                .requestTimeoutMs(2000)
-                .maxRedirects(2)
-                .serverSets(local, remotes)
-                .streamNameRegex(streamRegex)
-                .handshakeWithClientInfo(handshakeWithClientInfo)
-                .clientBuilder(finagleClientBuilder)
-                .statsReceiver(monitorReceiver.scope("client"))
-                .buildMonitorClient();
-        runMonitor(dlConf, uri);
-    }
-
-    ServerSet[] createServerSets(String[] serverSetPaths) {
-        ServerSet[] serverSets = new ServerSet[serverSetPaths.length];
-        zkServerSets = new DLZkServerSet[serverSetPaths.length];
-        for (int i = 0; i < serverSetPaths.length; i++) {
-            String serverSetPath = serverSetPaths[i];
-            zkServerSets[i] = parseServerSet(serverSetPath);
-            serverSets[i] = zkServerSets[i].getServerSet();
-        }
-        return serverSets;
-    }
-
-    protected DLZkServerSet parseServerSet(String serverSetPath) {
-        return DLZkServerSet.of(URI.create(serverSetPath), 60000);
-    }
-
-    @Override
-    public void onStreamsChanged(Iterator<String> streams) {
-        Set<String> newSet = new HashSet<String>();
-        while (streams.hasNext()) {
-            String s = streams.next();
-            if (null == streamRegex || s.matches(streamRegex)) {
-                if (Math.abs(hashFunction.hashUnencodedChars(s).asInt()) % totalInstances == instanceId) {
-                    newSet.add(s);
-                }
-            }
-        }
-        List<StreamChecker> tasksToCancel = new ArrayList<StreamChecker>();
-        synchronized (knownStreams) {
-            Set<String> knownStreamSet = new HashSet<String>(knownStreams.keySet());
-            Set<String> removedStreams = Sets.difference(knownStreamSet, newSet).immutableCopy();
-            Set<String> addedStreams = Sets.difference(newSet, knownStreamSet).immutableCopy();
-            for (String s : removedStreams) {
-                StreamChecker task = knownStreams.remove(s);
-                if (null != task) {
-                    logger.info("Removed stream {}", s);
-                    tasksToCancel.add(task);
-                }
-            }
-            for (String s : addedStreams) {
-                if (!knownStreams.containsKey(s)) {
-                    logger.info("Added stream {}", s);
-                    StreamChecker sc = new StreamChecker(s);
-                    knownStreams.put(s, sc);
-                    sc.run();
-                }
-            }
-        }
-        for (StreamChecker sc : tasksToCancel) {
-            sc.close();
-        }
-    }
-
-    void runMonitor(DistributedLogConfiguration conf, URI dlUri) throws IOException {
-        // stats
-        statsProvider.getStatsLogger("monitor").registerGauge("num_streams", numOfStreamsGauge);
-        logger.info("Construct dl namespace @ {}", dlUri);
-        dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
-                .conf(conf)
-                .uri(dlUri)
-                .build();
-        if (watchNamespaceChanges) {
-            dlNamespace.registerNamespaceListener(this);
-        } else {
-            onStreamsChanged(dlNamespace.getLogs());
-        }
-    }
-
-    /**
-     * Close the server.
-     */
-    public void close() {
-        logger.info("Closing monitor service.");
-        if (null != dlClient) {
-            dlClient.close();
-        }
-        if (null != zkServerSets) {
-            for (DLZkServerSet zkServerSet : zkServerSets) {
-                zkServerSet.close();
-            }
-        }
-        if (null != dlNamespace) {
-            dlNamespace.close();
-        }
-        executorService.shutdown();
-        try {
-            if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
-                executorService.shutdownNow();
-            }
-        } catch (InterruptedException e) {
-            logger.error("Interrupted on waiting shutting down monitor executor service : ", e);
-        }
-        if (null != statsProvider) {
-            // clean up the gauges
-            unregisterGauge();
-            statsProvider.stop();
-        }
-        keepAliveLatch.countDown();
-        logger.info("Closed monitor service.");
-    }
-
-    public void join() throws InterruptedException {
-        keepAliveLatch.await();
-    }
-
-    /**
-     * clean up the gauge before we close to help GC.
-     */
-    private void unregisterGauge(){
-        statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", numOfStreamsGauge);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java
deleted file mode 100644
index 1f45b13..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static org.apache.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
-import static org.apache.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
-import static org.apache.distributedlog.util.CommandLineUtils.getOptionalStringArg;
-
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.io.IOException;
-import org.apache.bookkeeper.stats.NullStatsProvider;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The launcher to run monitor service.
- */
-public class MonitorServiceApp {
-
-    private static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class);
-
-    static final String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]";
-
-    final String[] args;
-    final Options options = new Options();
-
-    private MonitorServiceApp(String[] args) {
-        this.args = args;
-        // prepare options
-        options.addOption("u", "uri", true, "DistributedLog URI");
-        options.addOption("c", "conf", true, "DistributedLog Configuration File");
-        options.addOption("s", "serverset", true, "Proxy Server Set");
-        options.addOption("i", "interval", true, "Check interval");
-        options.addOption("d", "region", true, "Region ID");
-        options.addOption("p", "provider", true, "DistributedLog Stats Provider");
-        options.addOption("f", "filter", true, "Filter streams by regex");
-        options.addOption("w", "watch", false, "Watch stream changes under a given namespace");
-        options.addOption("n", "instance_id", true, "Instance ID");
-        options.addOption("t", "total_instances", true, "Total instances");
-        options.addOption("hck", "heartbeat-num-checks", true, "Send a heartbeat after num checks");
-        options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info");
-    }
-
-    void printUsage() {
-        HelpFormatter helpFormatter = new HelpFormatter();
-        helpFormatter.printHelp(USAGE, options);
-    }
-
-    private void run() {
-        try {
-            logger.info("Running monitor service.");
-            BasicParser parser = new BasicParser();
-            CommandLine cmdline = parser.parse(options, args);
-            runCmd(cmdline);
-        } catch (ParseException pe) {
-            printUsage();
-            Runtime.getRuntime().exit(-1);
-        } catch (IOException ie) {
-            logger.error("Failed to start monitor service : ", ie);
-            Runtime.getRuntime().exit(-1);
-        }
-    }
-
-    void runCmd(CommandLine cmdline) throws IOException {
-        StatsProvider statsProvider = new NullStatsProvider();
-        if (cmdline.hasOption("p")) {
-            String providerClass = cmdline.getOptionValue("p");
-            statsProvider = ReflectionUtils.newInstance(providerClass, StatsProvider.class);
-        }
-        StatsReceiver statsReceiver = NullStatsReceiver.get();
-
-        final MonitorService monitorService = new MonitorService(
-                getOptionalStringArg(cmdline, "u"),
-                getOptionalStringArg(cmdline, "c"),
-                getOptionalStringArg(cmdline, "s"),
-                getOptionalIntegerArg(cmdline, "i"),
-                getOptionalIntegerArg(cmdline, "d"),
-                getOptionalStringArg(cmdline, "f"),
-                getOptionalIntegerArg(cmdline, "n"),
-                getOptionalIntegerArg(cmdline, "t"),
-                getOptionalIntegerArg(cmdline, "hck"),
-                getOptionalBooleanArg(cmdline, "hsci"),
-                getOptionalBooleanArg(cmdline, "w"),
-                getOptionalBooleanArg(cmdline, "mx"),
-                statsReceiver,
-                statsProvider);
-
-        monitorService.runServer();
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                logger.info("Closing monitor service.");
-                monitorService.close();
-                logger.info("Closed monitor service.");
-            }
-        });
-        try {
-            monitorService.join();
-        } catch (InterruptedException ie) {
-            logger.warn("Interrupted when waiting monitor service to be finished : ", ie);
-        }
-    }
-
-    public static void main(String[] args) {
-        final MonitorServiceApp launcher = new MonitorServiceApp(args);
-        launcher.run();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ResponseUtils.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
deleted file mode 100644
index a2691d3..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import org.apache.distributedlog.thrift.service.BulkWriteResponse;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-
-/**
- * Utility methods for building write proxy service responses.
- */
-public class ResponseUtils {
-    public static ResponseHeader deniedHeader() {
-        return new ResponseHeader(StatusCode.REQUEST_DENIED);
-    }
-
-    public static ResponseHeader streamUnavailableHeader() {
-        return new ResponseHeader(StatusCode.STREAM_UNAVAILABLE);
-    }
-
-    public static ResponseHeader successHeader() {
-        return new ResponseHeader(StatusCode.SUCCESS);
-    }
-
-    public static ResponseHeader ownerToHeader(String owner) {
-        return new ResponseHeader(StatusCode.FOUND).setLocation(owner);
-    }
-
-    public static ResponseHeader exceptionToHeader(Throwable t) {
-        ResponseHeader response = new ResponseHeader();
-        if (t instanceof DLException) {
-            DLException dle = (DLException) t;
-            if (dle instanceof OwnershipAcquireFailedException) {
-                response.setLocation(((OwnershipAcquireFailedException) dle).getCurrentOwner());
-            }
-            response.setCode(dle.getCode());
-            response.setErrMsg(dle.getMessage());
-        } else {
-            response.setCode(StatusCode.INTERNAL_SERVER_ERROR);
-            response.setErrMsg("Internal server error : " + t.getMessage());
-        }
-        return response;
-    }
-
-    public static WriteResponse write(ResponseHeader responseHeader) {
-        return new WriteResponse(responseHeader);
-    }
-
-    public static WriteResponse writeSuccess() {
-        return new WriteResponse(successHeader());
-    }
-
-    public static WriteResponse writeDenied() {
-        return new WriteResponse(deniedHeader());
-    }
-
-    public static BulkWriteResponse bulkWrite(ResponseHeader responseHeader) {
-        return new BulkWriteResponse(responseHeader);
-    }
-
-    public static BulkWriteResponse bulkWriteSuccess() {
-        return new BulkWriteResponse(successHeader());
-    }
-
-    public static BulkWriteResponse bulkWriteDenied() {
-        return new BulkWriteResponse(deniedHeader());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java
deleted file mode 100644
index 436145d..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-/**
- * List of feature keys used by distributedlog server.
- */
-public enum ServerFeatureKeys {
-
-    REGION_STOP_ACCEPT_NEW_STREAM,
-    SERVICE_RATE_LIMIT_DISABLED,
-    SERVICE_CHECKSUM_DISABLED,
-    SERVICE_GLOBAL_LIMITER_DISABLED
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/StatsFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/StatsFilter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/StatsFilter.java
deleted file mode 100644
index ee64580..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/StatsFilter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import com.google.common.base.Stopwatch;
-import com.twitter.finagle.Service;
-import com.twitter.finagle.SimpleFilter;
-import com.twitter.util.Future;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-/**
- * Track distributedlog server finagle-service stats.
- */
-class StatsFilter<Req, Rep> extends SimpleFilter<Req, Rep> {
-
-    private final StatsLogger stats;
-    private final Counter outstandingAsync;
-    private final OpStatsLogger serviceExec;
-
-    @Override
-    public Future<Rep> apply(Req req, Service<Req, Rep> service) {
-        Future<Rep> result = null;
-        outstandingAsync.inc();
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        try {
-            result = service.apply(req);
-            serviceExec.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-        } finally {
-            outstandingAsync.dec();
-            if (null == result) {
-                serviceExec.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
-            }
-        }
-        return result;
-    }
-
-    public StatsFilter(StatsLogger stats) {
-        this.stats = stats;
-        this.outstandingAsync = stats.getCounter("outstandingAsync");
-        this.serviceExec = stats.getOpStatsLogger("serviceExec");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java
deleted file mode 100644
index ee64fc7..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.announcer;
-
-import java.io.IOException;
-
-/**
- * Announce service information.
- */
-public interface Announcer {
-
-    /**
-     * Announce service info.
-     */
-    void announce() throws IOException;
-
-    /**
-     * Unannounce the service info.
-     */
-    void unannounce() throws IOException;
-
-    /**
-     * Close the announcer.
-     */
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java
deleted file mode 100644
index 5a1277a..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.announcer;
-
-import java.io.IOException;
-
-/**
- * A no-op implementation of {@link Announcer}.
- */
-public class NOPAnnouncer implements Announcer {
-    @Override
-    public void announce() throws IOException {
-        // nop
-    }
-
-    @Override
-    public void unannounce() throws IOException {
-        // nop
-    }
-
-    @Override
-    public void close() {
-        // nop
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java
deleted file mode 100644
index df4a8e2..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.announcer;
-
-import com.twitter.common.zookeeper.Group;
-import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.client.serverset.DLZkServerSet;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * ServerSet based announcer.
- */
-public class ServerSetAnnouncer implements Announcer {
-
-    private static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class);
-
-    final String localAddr;
-    final InetSocketAddress serviceEndpoint;
-    final Map<String, InetSocketAddress> additionalEndpoints;
-    final int shardId;
-
-    // ServerSet
-    DLZkServerSet zkServerSet;
-
-    // Service Status
-    ServerSet.EndpointStatus serviceStatus = null;
-
-    /**
-     * Announce server infos.
-     *
-     * @param servicePort
-     *          service port
-     * @param statsPort
-     *          stats port
-     * @param shardId
-     *          shard id
-     */
-    public ServerSetAnnouncer(URI uri,
-                              int servicePort,
-                              int statsPort,
-                              int shardId) throws UnknownHostException {
-        this.shardId = shardId;
-        this.localAddr = InetAddress.getLocalHost().getHostAddress();
-        // service endpoint
-        this.serviceEndpoint = new InetSocketAddress(localAddr, servicePort);
-        // stats endpoint
-        InetSocketAddress statsEndpoint = new InetSocketAddress(localAddr, statsPort);
-        this.additionalEndpoints = new HashMap<String, InetSocketAddress>();
-        this.additionalEndpoints.put("aurora", statsEndpoint);
-        this.additionalEndpoints.put("stats", statsEndpoint);
-        this.additionalEndpoints.put("service", serviceEndpoint);
-        this.additionalEndpoints.put("thrift", serviceEndpoint);
-
-        // Create zookeeper and server set
-        this.zkServerSet = DLZkServerSet.of(uri, 60000);
-    }
-
-    @Override
-    public synchronized void announce() throws IOException {
-        try {
-            serviceStatus =
-                    zkServerSet.getServerSet().join(serviceEndpoint, additionalEndpoints, shardId);
-        } catch (Group.JoinException e) {
-            throw new IOException("Failed to announce service : ", e);
-        } catch (InterruptedException e) {
-            logger.warn("Interrupted on announcing service : ", e);
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public synchronized void unannounce() throws IOException {
-        if (null == serviceStatus) {
-            logger.warn("No service to unannounce.");
-            return;
-        }
-        try {
-            serviceStatus.leave();
-        } catch (ServerSet.UpdateException e) {
-            throw new IOException("Failed to unannounce service : ", e);
-        }
-    }
-
-    @Override
-    public void close() {
-        zkServerSet.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/package-info.java
deleted file mode 100644
index 6559bb3..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Announcers to announce servers to server set.
- */
-package org.apache.distributedlog.service.announcer;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java
deleted file mode 100644
index cdffaa3..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-
-/**
- * Balancer Interface.
- *
- * <p>A balancer is used for balance the streams across the proxy cluster.
- */
-public interface Balancer {
-
-    /**
-     * Rebalance all the streams from <i>source</i> to others.
-     *
-     * @param source
-     *          source target name.
-     * @param rebalanceConcurrency
-     *          the concurrency to move streams for re-balance.
-     * @param rebalanceRateLimiter
-     *          the rate limiting to move streams for re-balance.
-     */
-    void balanceAll(String source,
-                    int rebalanceConcurrency,
-                    Optional<RateLimiter> rebalanceRateLimiter);
-
-    /**
-     * Balance the streams across all targets.
-     *
-     * @param rebalanceWaterMark
-     *          rebalance water mark. if number of streams of a given target is less than
-     *          the water mark, no streams will be re-balanced from this target.
-     * @param rebalanceTolerancePercentage
-     *          tolerance percentage for the balancer. if number of streams of a given target is
-     *          less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will
-     *          be re-balanced from that target.
-     * @param rebalanceConcurrency
-     *          the concurrency to move streams for re-balance.
-     * @param rebalanceRateLimiter
-     *          the rate limiting to move streams for re-balance.
-     */
-    void balance(int rebalanceWaterMark,
-                 double rebalanceTolerancePercentage,
-                 int rebalanceConcurrency,
-                 Optional<RateLimiter> rebalanceRateLimiter);
-
-    /**
-     * Close the balancer.
-     */
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java
deleted file mode 100644
index 964c1cc..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.client.serverset.DLZkServerSet;
-import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.service.ClientUtils;
-import org.apache.distributedlog.service.DLSocketAddress;
-import org.apache.distributedlog.service.DistributedLogClient;
-import org.apache.distributedlog.service.DistributedLogClientBuilder;
-import org.apache.distributedlog.tools.Tool;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Tool to rebalance cluster.
- */
-public class BalancerTool extends Tool {
-
-    private static final Logger logger = LoggerFactory.getLogger(BalancerTool.class);
-
-    static DistributedLogClientBuilder createDistributedLogClientBuilder(ServerSet serverSet) {
-        return DistributedLogClientBuilder.newBuilder()
-                        .name("rebalancer_tool")
-                        .clientId(ClientId$.MODULE$.apply("rebalancer_tool"))
-                        .maxRedirects(2)
-                        .serverSet(serverSet)
-                        .clientBuilder(ClientBuilder.get()
-                                .connectionTimeout(Duration.fromSeconds(2))
-                                .tcpConnectTimeout(Duration.fromSeconds(2))
-                                .requestTimeout(Duration.fromSeconds(10))
-                                .hostConnectionLimit(1)
-                                .hostConnectionCoresize(1)
-                                .keepAlive(true)
-                                .failFast(false));
-    }
-
-    /**
-     * Base Command to run balancer.
-     */
-    protected abstract static class BalancerCommand extends OptsCommand {
-
-        protected Options options = new Options();
-        protected int rebalanceWaterMark = 0;
-        protected double rebalanceTolerancePercentage = 0.0f;
-        protected int rebalanceConcurrency = 1;
-        protected Double rate = null;
-        protected Optional<RateLimiter> rateLimiter;
-
-        BalancerCommand(String name, String description) {
-            super(name, description);
-            options.addOption("rwm", "rebalance-water-mark", true, "Rebalance water mark per proxy");
-            options.addOption("rtp", "rebalance-tolerance-percentage", true,
-                "Rebalance tolerance percentage per proxy");
-            options.addOption("rc", "rebalance-concurrency", true, "Concurrency to rebalance stream distribution");
-            options.addOption("r", "rate", true, "Rebalance rate");
-        }
-
-        Optional<RateLimiter> getRateLimiter() {
-            return rateLimiter;
-        }
-
-        @Override
-        protected Options getOptions() {
-            return options;
-        }
-
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            if (cmdline.hasOption("rwm")) {
-                this.rebalanceWaterMark = Integer.parseInt(cmdline.getOptionValue("rwm"));
-            }
-            if (cmdline.hasOption("rtp")) {
-                this.rebalanceTolerancePercentage = Double.parseDouble(cmdline.getOptionValue("rtp"));
-            }
-            if (cmdline.hasOption("rc")) {
-                this.rebalanceConcurrency = Integer.parseInt(cmdline.getOptionValue("rc"));
-            }
-            if (cmdline.hasOption("r")) {
-                this.rate = Double.parseDouble(cmdline.getOptionValue("r"));
-            }
-            checkArgument(rebalanceWaterMark >= 0,
-                    "Rebalance Water Mark should be a non-negative number");
-            checkArgument(rebalanceTolerancePercentage >= 0.0f,
-                    "Rebalance Tolerance Percentage should be a non-negative number");
-            checkArgument(rebalanceConcurrency > 0,
-                    "Rebalance Concurrency should be a positive number");
-            if (null == rate || rate <= 0.0f) {
-                rateLimiter = Optional.absent();
-            } else {
-                rateLimiter = Optional.of(RateLimiter.create(rate));
-            }
-        }
-
-        @Override
-        protected int runCmd(CommandLine cmdline) throws Exception {
-            try {
-                parseCommandLine(cmdline);
-            } catch (ParseException pe) {
-                println("ERROR: fail to parse commandline : '" + pe.getMessage() + "'");
-                printUsage();
-                return -1;
-            }
-            return executeCommand(cmdline);
-        }
-
-        protected abstract int executeCommand(CommandLine cmdline) throws Exception;
-    }
-
-    /**
-     * Command to balance streams within a cluster.
-     */
-    protected static class ClusterBalancerCommand extends BalancerCommand {
-
-        protected URI uri;
-        protected String source = null;
-
-        protected ClusterBalancerCommand() {
-            super("clusterbalancer", "Balance streams inside a cluster");
-            options.addOption("u", "uri", true, "DistributedLog URI");
-            options.addOption("sp", "source-proxy", true, "Source proxy to balance");
-        }
-
-        @Override
-        protected String getUsage() {
-            return "clusterbalancer [options]";
-        }
-
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            super.parseCommandLine(cmdline);
-            if (!cmdline.hasOption("u")) {
-                throw new ParseException("No proxy serverset provided.");
-            }
-            uri = URI.create(cmdline.getOptionValue("u"));
-            if (cmdline.hasOption("sp")) {
-                String sourceProxyStr = cmdline.getOptionValue("sp");
-                try {
-                    DLSocketAddress.parseSocketAddress(sourceProxyStr);
-                } catch (IllegalArgumentException iae) {
-                    throw new ParseException("Invalid source proxy " + sourceProxyStr + " : " + iae.getMessage());
-                }
-                this.source = sourceProxyStr;
-            }
-        }
-
-        @Override
-        protected int executeCommand(CommandLine cmdline) throws Exception {
-            DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000);
-            logger.info("Created serverset for {}", uri);
-            try {
-                DistributedLogClientBuilder clientBuilder =
-                        createDistributedLogClientBuilder(serverSet.getServerSet());
-                ClusterBalancer balancer = new ClusterBalancer(clientBuilder);
-                try {
-                    return runBalancer(clientBuilder, balancer);
-                } finally {
-                    balancer.close();
-                }
-            } finally {
-                serverSet.close();
-            }
-        }
-
-        protected int runBalancer(DistributedLogClientBuilder clientBuilder,
-                                  ClusterBalancer balancer)
-                throws Exception {
-            if (null == source) {
-                balancer.balance(
-                    rebalanceWaterMark,
-                    rebalanceTolerancePercentage,
-                    rebalanceConcurrency,
-                    getRateLimiter());
-            } else {
-                balanceFromSource(clientBuilder, balancer, source, getRateLimiter());
-            }
-            return 0;
-        }
-
-        protected void balanceFromSource(DistributedLogClientBuilder clientBuilder,
-                                         ClusterBalancer balancer,
-                                         String source,
-                                         Optional<RateLimiter> rateLimiter)
-                throws Exception {
-            InetSocketAddress sourceAddr = DLSocketAddress.parseSocketAddress(source);
-            DistributedLogClientBuilder sourceClientBuilder =
-                    DistributedLogClientBuilder.newBuilder(clientBuilder)
-                            .host(sourceAddr);
-
-            Pair<DistributedLogClient, MonitorServiceClient> clientPair =
-                    ClientUtils.buildClient(sourceClientBuilder);
-            try {
-                Await.result(clientPair.getRight().setAcceptNewStream(false));
-                logger.info("Disable accepting new stream on proxy {}.", source);
-                balancer.balanceAll(source, rebalanceConcurrency, rateLimiter);
-            } finally {
-                clientPair.getLeft().close();
-            }
-        }
-    }
-
-    /**
-     * Command to balance streams between regions.
-     */
-    protected static class RegionBalancerCommand extends BalancerCommand {
-
-        protected URI region1;
-        protected URI region2;
-        protected String source = null;
-
-        protected RegionBalancerCommand() {
-            super("regionbalancer", "Balance streams between regions");
-            options.addOption("rs", "regions", true, "DistributedLog Region URI: uri1[,uri2]");
-            options.addOption("s", "source", true, "DistributedLog Source Region to balance");
-        }
-
-        @Override
-        protected String getUsage() {
-            return "regionbalancer [options]";
-        }
-
-        @Override
-        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
-            super.parseCommandLine(cmdline);
-            if (!cmdline.hasOption("rs")) {
-                throw new ParseException("No regions provided.");
-            }
-            String regionsStr = cmdline.getOptionValue("rs");
-            String[] regions = regionsStr.split(",");
-            if (regions.length != 2) {
-                throw new ParseException("Invalid regions provided. Expected : serverset1[,serverset2]");
-            }
-            region1 = URI.create(regions[0]);
-            region2 = URI.create(regions[1]);
-            if (cmdline.hasOption("s")) {
-                source = cmdline.getOptionValue("s");
-            }
-        }
-
-        @Override
-        protected int executeCommand(CommandLine cmdline) throws Exception {
-            DLZkServerSet serverSet1 = DLZkServerSet.of(region1, 60000);
-            logger.info("Created serverset for {}", region1);
-            DLZkServerSet serverSet2 = DLZkServerSet.of(region2, 60000);
-            logger.info("Created serverset for {}", region2);
-            try {
-                DistributedLogClientBuilder builder1 =
-                        createDistributedLogClientBuilder(serverSet1.getServerSet());
-                Pair<DistributedLogClient, MonitorServiceClient> pair1 =
-                        ClientUtils.buildClient(builder1);
-                DistributedLogClientBuilder builder2 =
-                        createDistributedLogClientBuilder(serverSet2.getServerSet());
-                Pair<DistributedLogClient, MonitorServiceClient> pair2 =
-                        ClientUtils.buildClient(builder2);
-                try {
-                    SimpleBalancer balancer = new SimpleBalancer(
-                            BKNamespaceDriver.getZKServersFromDLUri(region1), pair1.getLeft(), pair1.getRight(),
-                            BKNamespaceDriver.getZKServersFromDLUri(region2), pair2.getLeft(), pair2.getRight());
-                    try {
-                        return runBalancer(balancer);
-                    } finally {
-                        balancer.close();
-                    }
-                } finally {
-                    pair1.getLeft().close();
-                    pair2.getLeft().close();
-                }
-            } finally {
-                serverSet1.close();
-                serverSet2.close();
-            }
-        }
-
-        protected int runBalancer(SimpleBalancer balancer) throws Exception {
-            if (null == source) {
-                balancer.balance(
-                    rebalanceWaterMark,
-                    rebalanceTolerancePercentage,
-                    rebalanceConcurrency,
-                    getRateLimiter());
-            } else {
-                balancer.balanceAll(source, rebalanceConcurrency, getRateLimiter());
-            }
-            return 0;
-        }
-    }
-
-    public BalancerTool() {
-        super();
-        addCommand(new ClusterBalancerCommand());
-        addCommand(new RegionBalancerCommand());
-    }
-
-    @Override
-    protected String getName() {
-        return "balancer";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java
deleted file mode 100644
index 4c9e075..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import java.util.Map;
-
-/**
- * Utils for balancer.
- */
-public class BalancerUtils {
-
-    /**
-     * Util function to calculate how many streams to balance for <i>nodeToRebalance</i>,
-     * based on the load distribution <i>loadDistribution</i>.
-     *
-     * @param nodeToRebalance
-     *          node to rebalance
-     * @param loadDistribution
-     *          load distribution map
-     * @param rebalanceWaterMark
-     *          if number of streams of <i>nodeToRebalance</i>
-     *          is less than <i>rebalanceWaterMark</i>, no streams will be re-balanced.
-     * @param tolerancePercentage
-     *          tolerance percentage for the balancer. if number of streams of <i>nodeToRebalance</i>
-     *          is less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will
-     *          be re-balanced.
-     * @param <K>
-     * @return number of streams to rebalance
-     */
-    public static <K> int calculateNumStreamsToRebalance(K nodeToRebalance,
-                                                         Map<K, Integer> loadDistribution,
-                                                         int rebalanceWaterMark,
-                                                         double tolerancePercentage) {
-        Integer myLoad = loadDistribution.get(nodeToRebalance);
-        if (null == myLoad || myLoad <= rebalanceWaterMark) {
-            return 0;
-        }
-
-        long totalLoad = 0L;
-        int numNodes = loadDistribution.size();
-
-        for (Map.Entry<K, Integer> entry : loadDistribution.entrySet()) {
-            if (null == entry.getKey() || null == entry.getValue()) {
-                continue;
-            }
-            totalLoad += entry.getValue();
-        }
-
-        double averageLoad = ((double) totalLoad) / numNodes;
-        long permissibleLoad =
-                Math.max(1L, (long) Math.ceil(averageLoad + averageLoad * tolerancePercentage / 100.0f));
-
-        if (myLoad <= permissibleLoad) {
-            return 0;
-        }
-
-        return Math.max(0, myLoad - (int) Math.ceil(averageLoad));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java
deleted file mode 100644
index 5add339..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.service.ClientUtils;
-import org.apache.distributedlog.service.DLSocketAddress;
-import org.apache.distributedlog.service.DistributedLogClient;
-import org.apache.distributedlog.service.DistributedLogClientBuilder;
-import com.twitter.util.Await;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import java.io.Serializable;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A balancer balances ownerships with a cluster of targets.
- */
-public class ClusterBalancer implements Balancer {
-
-    private static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class);
-
-    /**
-     * Represent a single host. Ordered by number of streams in desc order.
-     */
-    static class Host {
-
-        final SocketAddress address;
-        final Set<String> streams;
-        final DistributedLogClientBuilder clientBuilder;
-        DistributedLogClient client = null;
-        MonitorServiceClient monitor = null;
-
-        Host(SocketAddress address, Set<String> streams,
-             DistributedLogClientBuilder clientBuilder) {
-            this.address = address;
-            this.streams = streams;
-            this.clientBuilder = clientBuilder;
-        }
-
-        private void initializeClientsIfNeeded() {
-            if (null == client) {
-                Pair<DistributedLogClient, MonitorServiceClient> clientPair =
-                        createDistributedLogClient(address, clientBuilder);
-                client = clientPair.getLeft();
-                monitor = clientPair.getRight();
-            }
-        }
-
-        synchronized DistributedLogClient getClient() {
-            initializeClientsIfNeeded();
-            return client;
-        }
-
-        synchronized MonitorServiceClient getMonitor() {
-            initializeClientsIfNeeded();
-            return monitor;
-        }
-
-        synchronized void close() {
-            if (null != client) {
-                client.close();
-            }
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            sb.append("Host(").append(address).append(")");
-            return sb.toString();
-        }
-    }
-
-    static class HostComparator implements Comparator<Host>, Serializable {
-        private static final long serialVersionUID = 7984973796525102538L;
-
-        @Override
-        public int compare(Host h1, Host h2) {
-            return h2.streams.size() - h1.streams.size();
-        }
-    }
-
-    protected final DistributedLogClientBuilder clientBuilder;
-    protected final DistributedLogClient client;
-    protected final MonitorServiceClient monitor;
-
-    public ClusterBalancer(DistributedLogClientBuilder clientBuilder) {
-        this(clientBuilder, ClientUtils.buildClient(clientBuilder));
-    }
-
-    ClusterBalancer(DistributedLogClientBuilder clientBuilder,
-                    Pair<DistributedLogClient, MonitorServiceClient> clientPair) {
-        this.clientBuilder = clientBuilder;
-        this.client = clientPair.getLeft();
-        this.monitor = clientPair.getRight();
-    }
-
-    /**
-     * Build a new distributedlog client to a single host <i>host</i>.
-     *
-     * @param host
-     *          host to access
-     * @return distributedlog clients
-     */
-    static Pair<DistributedLogClient, MonitorServiceClient> createDistributedLogClient(
-            SocketAddress host, DistributedLogClientBuilder clientBuilder) {
-        DistributedLogClientBuilder newBuilder =
-                DistributedLogClientBuilder.newBuilder(clientBuilder).host(host);
-        return ClientUtils.buildClient(newBuilder);
-    }
-
-    @Override
-    public void balanceAll(String source,
-                           int rebalanceConcurrency, /* unused */
-                           Optional<RateLimiter> rebalanceRateLimiter) {
-        balance(0, 0.0f, rebalanceConcurrency, Optional.of(source), rebalanceRateLimiter);
-    }
-
-    @Override
-    public void balance(int rebalanceWaterMark,
-                        double rebalanceTolerancePercentage,
-                        int rebalanceConcurrency, /* unused */
-                        Optional<RateLimiter> rebalanceRateLimiter) {
-        Optional<String> source = Optional.absent();
-        balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, source, rebalanceRateLimiter);
-    }
-
-    public void balance(int rebalanceWaterMark,
-                        double rebalanceTolerancePercentage,
-                        int rebalanceConcurrency,
-                        Optional<String> source,
-                        Optional<RateLimiter> rebalanceRateLimiter) {
-        Map<SocketAddress, Set<String>> distribution = monitor.getStreamOwnershipDistribution();
-        if (distribution.size() <= 1) {
-            return;
-        }
-        SocketAddress sourceAddr = null;
-        if (source.isPresent()) {
-            sourceAddr = DLSocketAddress.parseSocketAddress(source.get());
-            logger.info("Balancer source is {}", sourceAddr);
-            if (!distribution.containsKey(sourceAddr)) {
-                return;
-            }
-        }
-        // Get the list of hosts ordered by number of streams in DESC order
-        List<Host> hosts = new ArrayList<Host>(distribution.size());
-        for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) {
-            Host host = new Host(entry.getKey(), entry.getValue(), clientBuilder);
-            hosts.add(host);
-        }
-        Collections.sort(hosts, new HostComparator());
-        try {
-
-            // find the host to move streams from.
-            int hostIdxMoveFrom = -1;
-            if (null != sourceAddr) {
-                for (Host host : hosts) {
-                    ++hostIdxMoveFrom;
-                    if (sourceAddr.equals(host.address)) {
-                        break;
-                    }
-                }
-            }
-
-            // compute the average load.
-            int totalStream = 0;
-            for (Host host : hosts) {
-                totalStream += host.streams.size();
-            }
-            double averageLoad;
-            if (hostIdxMoveFrom >= 0) {
-                averageLoad = ((double) totalStream / (hosts.size() - 1));
-            } else {
-                averageLoad = ((double) totalStream / hosts.size());
-            }
-
-            int moveFromLowWaterMark;
-            int moveToHighWaterMark =
-                Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f));
-
-            if (hostIdxMoveFrom >= 0) {
-                moveFromLowWaterMark = Math.max(0, rebalanceWaterMark);
-                moveStreams(
-                        hosts,
-                        new AtomicInteger(hostIdxMoveFrom), moveFromLowWaterMark,
-                        new AtomicInteger(hosts.size() - 1), moveToHighWaterMark,
-                        rebalanceRateLimiter);
-                moveRemainingStreamsFromSource(hosts.get(hostIdxMoveFrom), hosts, rebalanceRateLimiter);
-            } else {
-                moveFromLowWaterMark = Math.max((int) Math.ceil(averageLoad), rebalanceWaterMark);
-                AtomicInteger moveFrom = new AtomicInteger(0);
-                AtomicInteger moveTo = new AtomicInteger(hosts.size() - 1);
-                while (moveFrom.get() < moveTo.get()) {
-                    moveStreams(hosts, moveFrom, moveFromLowWaterMark,
-                        moveTo, moveToHighWaterMark, rebalanceRateLimiter);
-                    moveFrom.incrementAndGet();
-                }
-            }
-        } finally {
-            for (Host host : hosts) {
-                host.close();
-            }
-        }
-    }
-
-    void moveStreams(List<Host> hosts,
-                     AtomicInteger hostIdxMoveFrom,
-                     int moveFromLowWaterMark,
-                     AtomicInteger hostIdxMoveTo,
-                     int moveToHighWaterMark,
-                     Optional<RateLimiter> rateLimiter) {
-        if (hostIdxMoveFrom.get() < 0 || hostIdxMoveFrom.get() >= hosts.size()
-                || hostIdxMoveTo.get() < 0 || hostIdxMoveTo.get() >= hosts.size()
-                || hostIdxMoveFrom.get() >= hostIdxMoveTo.get()) {
-            return;
-        }
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Moving streams : hosts = {}, from = {}, to = {} :"
-                + " from_low_water_mark = {}, to_high_water_mark = {}",
-                new Object[] {
-                    hosts,
-                    hostIdxMoveFrom.get(),
-                    hostIdxMoveTo.get(),
-                    moveFromLowWaterMark,
-                    moveToHighWaterMark });
-        }
-
-        Host hostMoveFrom = hosts.get(hostIdxMoveFrom.get());
-        int numStreamsOnFromHost = hostMoveFrom.streams.size();
-        if (numStreamsOnFromHost <= moveFromLowWaterMark) {
-            // do nothing
-            return;
-        }
-
-        int numStreamsToMove = numStreamsOnFromHost - moveFromLowWaterMark;
-        LinkedList<String> streamsToMove = new LinkedList<String>(hostMoveFrom.streams);
-        Collections.shuffle(streamsToMove);
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Try to move {} streams from host {} : streams = {}",
-                         new Object[] { numStreamsToMove, hostMoveFrom.address, streamsToMove });
-        }
-
-        while (numStreamsToMove-- > 0 && !streamsToMove.isEmpty()) {
-            if (rateLimiter.isPresent()) {
-                rateLimiter.get().acquire();
-            }
-
-            // pick a host to move
-            Host hostMoveTo = hosts.get(hostIdxMoveTo.get());
-            while (hostMoveTo.streams.size() >= moveToHighWaterMark) {
-                int hostIdx = hostIdxMoveTo.decrementAndGet();
-                logger.info("move to host : {}, from {}", hostIdx, hostIdxMoveFrom.get());
-                if (hostIdx <= hostIdxMoveFrom.get()) {
-                    return;
-                } else {
-                    hostMoveTo = hosts.get(hostIdx);
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Target host to move moved to host {} @ {}",
-                                hostIdx, hostMoveTo);
-                    }
-                }
-            }
-
-            // pick a stream
-            String stream = streamsToMove.remove();
-
-            // move the stream
-            if (moveStream(stream, hostMoveFrom, hostMoveTo)) {
-                hostMoveFrom.streams.remove(stream);
-                hostMoveTo.streams.add(stream);
-            }
-        }
-
-    }
-
-    void moveRemainingStreamsFromSource(Host source,
-                                        List<Host> hosts,
-                                        Optional<RateLimiter> rateLimiter) {
-        LinkedList<String> streamsToMove = new LinkedList<String>(source.streams);
-        Collections.shuffle(streamsToMove);
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Try to move remaining streams from {} : {}", source, streamsToMove);
-        }
-
-        int hostIdx = hosts.size() - 1;
-
-        while (!streamsToMove.isEmpty()) {
-            if (rateLimiter.isPresent()) {
-                rateLimiter.get().acquire();
-            }
-
-            Host target = hosts.get(hostIdx);
-            if (!target.address.equals(source.address)) {
-                String stream = streamsToMove.remove();
-                // move the stream
-                if (moveStream(stream, source, target)) {
-                    source.streams.remove(stream);
-                    target.streams.add(stream);
-                }
-            }
-            --hostIdx;
-            if (hostIdx < 0) {
-                hostIdx = hosts.size() - 1;
-            }
-        }
-    }
-
-    private boolean moveStream(String stream, Host from, Host to) {
-        try {
-            doMoveStream(stream, from, to);
-            return true;
-        } catch (Exception e) {
-            return false;
-        }
-    }
-
-    private void doMoveStream(final String stream, final Host from, final Host to) throws Exception {
-        logger.info("Moving stream {} from {} to {}.",
-                    new Object[] { stream, from.address, to.address });
-        Await.result(from.getClient().release(stream).flatMap(new Function<Void, Future<Void>>() {
-            @Override
-            public Future<Void> apply(Void result) {
-                logger.info("Released stream {} from {}.", stream, from.address);
-                return to.getMonitor().check(stream).addEventListener(new FutureEventListener<Void>() {
-
-                    @Override
-                    public void onSuccess(Void value) {
-                        logger.info("Moved stream {} from {} to {}.",
-                                    new Object[] { stream, from.address, to.address });
-                    }
-
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        logger.info("Failed to move stream {} from {} to {} : ",
-                                    new Object[] { stream, from.address, to.address, cause });
-                    }
-                });
-            }
-        }));
-    }
-
-    @Override
-    public void close() {
-        client.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java
deleted file mode 100644
index 6a43179..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.Serializable;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.lang3.tuple.Pair;
-
-/**
- * A stream chooser based on number of streams.
- */
-class CountBasedStreamChooser implements StreamChooser, Serializable,
-        Comparator<Pair<SocketAddress, LinkedList<String>>> {
-
-    private static final long serialVersionUID = 4664153397369979203L;
-
-    final List<Pair<SocketAddress, LinkedList<String>>> streamsDistribution;
-
-    // pivot index in the list of hosts. the chooser will remove streams from the hosts before
-    // pivot, which will reduce their stream counts to make them equal to the stream count of the pivot.
-    int pivot;
-    int pivotCount;
-
-    // next index in the list of hosts to choose stream from.
-    int next;
-
-    CountBasedStreamChooser(Map<SocketAddress, Set<String>> streams) {
-        checkArgument(streams.size() > 0, "Only support no-empty streams distribution");
-        streamsDistribution = new ArrayList<Pair<SocketAddress, LinkedList<String>>>(streams.size());
-        for (Map.Entry<SocketAddress, Set<String>> entry : streams.entrySet()) {
-            LinkedList<String> randomizedStreams = new LinkedList<String>(entry.getValue());
-            Collections.shuffle(randomizedStreams);
-            streamsDistribution.add(Pair.of(entry.getKey(), randomizedStreams));
-        }
-        // sort the hosts by the number of streams in descending order
-        Collections.sort(streamsDistribution, this);
-        pivot = 0;
-        pivotCount = streamsDistribution.get(0).getValue().size();
-        findNextPivot();
-        next = 0;
-    }
-
-    private void findNextPivot() {
-        int prevPivotCount = pivotCount;
-        while (++pivot < streamsDistribution.size()) {
-            pivotCount = streamsDistribution.get(pivot).getValue().size();
-            if (pivotCount < prevPivotCount) {
-                return;
-            }
-        }
-        pivot = streamsDistribution.size();
-        pivotCount = 0;
-    }
-
-    @Override
-    public synchronized String choose() {
-        // reach the pivot
-        if (next == pivot) {
-            if (streamsDistribution.get(next - 1).getRight().size() > pivotCount) {
-                next = 0;
-            } else if (pivotCount == 0) { // the streams are empty now
-                return null;
-            } else {
-                findNextPivot();
-                next = 0;
-            }
-        }
-
-        // get stream count that next host to choose from
-        LinkedList<String> nextStreams = streamsDistribution.get(next).getRight();
-        if (nextStreams.size() == 0) {
-            return null;
-        }
-
-        String chosenStream = nextStreams.remove();
-        ++next;
-        return chosenStream;
-    }
-
-    @Override
-    public int compare(Pair<SocketAddress, LinkedList<String>> o1,
-                       Pair<SocketAddress, LinkedList<String>> o2) {
-        return o2.getValue().size() - o1.getValue().size();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java
deleted file mode 100644
index 4aefc5e..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-/**
- * A stream chooser that can only choose limited number of streams.
- */
-public class LimitedStreamChooser implements StreamChooser {
-
-  /**
-   * Create a limited stream chooser by {@code limit}.
-   *
-   * @param underlying the underlying stream chooser.
-   * @param limit the limit of number of streams to choose.
-   * @return the limited stream chooser.
-   */
-    public static LimitedStreamChooser of(StreamChooser underlying, int limit) {
-        return new LimitedStreamChooser(underlying, limit);
-    }
-
-    final StreamChooser underlying;
-    int limit;
-
-    LimitedStreamChooser(StreamChooser underlying, int limit) {
-        this.underlying = underlying;
-        this.limit = limit;
-    }
-
-    @Override
-    public synchronized String choose() {
-        if (limit <= 0) {
-            return null;
-        }
-        String s = underlying.choose();
-        if (s == null) {
-            limit = 0;
-            return null;
-        }
-        --limit;
-        return s;
-    }
-}