You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:24 UTC

[17/30] incubator-distributedlog git commit: DL-205: Remove StatusCode dependency on DLException

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
new file mode 100644
index 0000000..b1e2879
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.callback.NamespaceListener;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.stats.Stat;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration;
+import com.twitter.util.FutureEventListener;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monitor Service.
+ */
+public class MonitorService implements NamespaceListener {
+
+    private static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
+
+    private DistributedLogNamespace dlNamespace = null;
+    private MonitorServiceClient dlClient = null;
+    private DLZkServerSet[] zkServerSets = null;
+    private final ScheduledExecutorService executorService =
+            Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
+    private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
+    private final Map<String, StreamChecker> knownStreams = new HashMap<String, StreamChecker>();
+
+    // Settings
+    private int regionId = DistributedLogConstants.LOCAL_REGION_ID;
+    private int interval = 100;
+    private String streamRegex = null;
+    private boolean watchNamespaceChanges = false;
+    private boolean handshakeWithClientInfo = false;
+    private int heartbeatEveryChecks = 0;
+    private int instanceId = -1;
+    private int totalInstances = -1;
+    private boolean isThriftMux = false;
+
+    // Options
+    private final Optional<String> uriArg;
+    private final Optional<String> confFileArg;
+    private final Optional<String> serverSetArg;
+    private final Optional<Integer> intervalArg;
+    private final Optional<Integer> regionIdArg;
+    private final Optional<String> streamRegexArg;
+    private final Optional<Integer> instanceIdArg;
+    private final Optional<Integer> totalInstancesArg;
+    private final Optional<Integer> heartbeatEveryChecksArg;
+    private final Optional<Boolean> handshakeWithClientInfoArg;
+    private final Optional<Boolean> watchNamespaceChangesArg;
+    private final Optional<Boolean> isThriftMuxArg;
+
+    // Stats
+    private final StatsProvider statsProvider;
+    private final StatsReceiver statsReceiver;
+    private final StatsReceiver monitorReceiver;
+    private final Stat successStat;
+    private final Stat failureStat;
+    private final Gauge<Number> numOfStreamsGauge;
+    // Hash Function
+    private final HashFunction hashFunction = Hashing.md5();
+
+    class StreamChecker implements Runnable, FutureEventListener<Void>, LogSegmentListener {
+        private final String name;
+        private volatile boolean closed = false;
+        private volatile boolean checking = false;
+        private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+        private DistributedLogManager dlm = null;
+        private int numChecks = 0;
+
+        StreamChecker(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public void run() {
+            if (null == dlm) {
+                try {
+                    dlm = dlNamespace.openLog(name);
+                    dlm.registerListener(this);
+                } catch (IOException e) {
+                    if (null != dlm) {
+                        try {
+                            dlm.close();
+                        } catch (IOException e1) {
+                            logger.error("Failed to close dlm for {} : ", name, e1);
+                        }
+                        dlm = null;
+                    }
+                    executorService.schedule(this, interval, TimeUnit.MILLISECONDS);
+                }
+            } else {
+                stopwatch.reset().start();
+                boolean sendHeartBeat;
+                if (heartbeatEveryChecks > 0) {
+                    synchronized (this) {
+                        ++numChecks;
+                        if (numChecks >= Integer.MAX_VALUE) {
+                            numChecks = 0;
+                        }
+                        sendHeartBeat = (numChecks % heartbeatEveryChecks) == 0;
+                    }
+                } else {
+                    sendHeartBeat = false;
+                }
+                if (sendHeartBeat) {
+                    dlClient.heartbeat(name).addEventListener(this);
+                } else {
+                    dlClient.check(name).addEventListener(this);
+                }
+            }
+        }
+
+        @Override
+        public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
+            if (segments.size() > 0 && segments.get(0).getRegionId() == regionId) {
+                if (!checking) {
+                    logger.info("Start checking stream {}.", name);
+                    checking = true;
+                    run();
+                }
+            } else {
+                if (checking) {
+                    logger.info("Stop checking stream {}.", name);
+                }
+            }
+        }
+
+        @Override
+        public void onLogStreamDeleted() {
+            logger.info("Stream {} is deleted", name);
+        }
+
+        @Override
+        public void onSuccess(Void value) {
+            successStat.add(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            scheduleCheck();
+        }
+
+        @Override
+        public void onFailure(Throwable cause) {
+            failureStat.add(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            scheduleCheck();
+        }
+
+        private void scheduleCheck() {
+            if (closed) {
+                return;
+            }
+            if (!checking) {
+                return;
+            }
+            try {
+                executorService.schedule(this, interval, TimeUnit.MILLISECONDS);
+            } catch (RejectedExecutionException ree) {
+                logger.error("Failed to schedule checking stream {} in {} ms : ",
+                        new Object[] { name, interval, ree });
+            }
+        }
+
+        private void close() {
+            closed = true;
+            if (null != dlm) {
+                try {
+                    dlm.close();
+                } catch (IOException e) {
+                    logger.error("Failed to close dlm for {} : ", name, e);
+                }
+            }
+        }
+    }
+
+    MonitorService(Optional<String> uriArg,
+                   Optional<String> confFileArg,
+                   Optional<String> serverSetArg,
+                   Optional<Integer> intervalArg,
+                   Optional<Integer> regionIdArg,
+                   Optional<String> streamRegexArg,
+                   Optional<Integer> instanceIdArg,
+                   Optional<Integer> totalInstancesArg,
+                   Optional<Integer> heartbeatEveryChecksArg,
+                   Optional<Boolean> handshakeWithClientInfoArg,
+                   Optional<Boolean> watchNamespaceChangesArg,
+                   Optional<Boolean> isThriftMuxArg,
+                   StatsReceiver statsReceiver,
+                   StatsProvider statsProvider) {
+        // options
+        this.uriArg = uriArg;
+        this.confFileArg = confFileArg;
+        this.serverSetArg = serverSetArg;
+        this.intervalArg = intervalArg;
+        this.regionIdArg = regionIdArg;
+        this.streamRegexArg = streamRegexArg;
+        this.instanceIdArg = instanceIdArg;
+        this.totalInstancesArg = totalInstancesArg;
+        this.heartbeatEveryChecksArg = heartbeatEveryChecksArg;
+        this.handshakeWithClientInfoArg = handshakeWithClientInfoArg;
+        this.watchNamespaceChangesArg = watchNamespaceChangesArg;
+        this.isThriftMuxArg = isThriftMuxArg;
+
+        // Stats
+        this.statsReceiver = statsReceiver;
+        this.monitorReceiver = statsReceiver.scope("monitor");
+        this.successStat = monitorReceiver.stat0("success");
+        this.failureStat = monitorReceiver.stat0("failure");
+        this.statsProvider = statsProvider;
+        this.numOfStreamsGauge = new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                return knownStreams.size();
+            }
+        };
+    }
+
+    public void runServer() throws IllegalArgumentException, IOException {
+        checkArgument(uriArg.isPresent(),
+                "No distributedlog uri provided.");
+        checkArgument(serverSetArg.isPresent(),
+                "No proxy server set provided.");
+        if (intervalArg.isPresent()) {
+            interval = intervalArg.get();
+        }
+        if (regionIdArg.isPresent()) {
+            regionId = regionIdArg.get();
+        }
+        if (streamRegexArg.isPresent()) {
+            streamRegex = streamRegexArg.get();
+        }
+        if (instanceIdArg.isPresent()) {
+            instanceId = instanceIdArg.get();
+        }
+        if (totalInstancesArg.isPresent()) {
+            totalInstances = totalInstancesArg.get();
+        }
+        if (heartbeatEveryChecksArg.isPresent()) {
+            heartbeatEveryChecks = heartbeatEveryChecksArg.get();
+        }
+        if (instanceId < 0 || totalInstances <= 0 || instanceId >= totalInstances) {
+            throw new IllegalArgumentException("Invalid instance id or total instances number.");
+        }
+        handshakeWithClientInfo = handshakeWithClientInfoArg.isPresent();
+        watchNamespaceChanges = watchNamespaceChangesArg.isPresent();
+        isThriftMux = isThriftMuxArg.isPresent();
+        URI uri = URI.create(uriArg.get());
+        DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
+        if (confFileArg.isPresent()) {
+            String configFile = confFileArg.get();
+            try {
+                dlConf.loadConf(new File(configFile).toURI().toURL());
+            } catch (ConfigurationException e) {
+                throw new IOException("Failed to load distributedlog configuration from " + configFile + ".");
+            } catch (MalformedURLException e) {
+                throw new IOException("Failed to load distributedlog configuration from malformed "
+                        + configFile + ".");
+            }
+        }
+        logger.info("Starting stats provider : {}.", statsProvider.getClass());
+        statsProvider.start(dlConf);
+        String[] serverSetPaths = StringUtils.split(serverSetArg.get(), ",");
+        if (serverSetPaths.length == 0) {
+            throw new IllegalArgumentException("Invalid serverset paths provided : " + serverSetArg.get());
+        }
+
+        ServerSet[] serverSets = createServerSets(serverSetPaths);
+        ServerSet local = serverSets[0];
+        ServerSet[] remotes  = new ServerSet[serverSets.length - 1];
+        System.arraycopy(serverSets, 1, remotes, 0, remotes.length);
+
+        ClientBuilder finagleClientBuilder = ClientBuilder.get()
+            .connectTimeout(Duration.fromSeconds(1))
+            .tcpConnectTimeout(Duration.fromSeconds(1))
+            .requestTimeout(Duration.fromSeconds(2))
+            .keepAlive(true)
+            .failFast(false);
+
+        if (!isThriftMux) {
+            finagleClientBuilder = finagleClientBuilder
+                .hostConnectionLimit(2)
+                .hostConnectionCoresize(2);
+        }
+
+        dlClient = DistributedLogClientBuilder.newBuilder()
+                .name("monitor")
+                .thriftmux(isThriftMux)
+                .clientId(ClientId$.MODULE$.apply("monitor"))
+                .redirectBackoffMaxMs(50)
+                .redirectBackoffStartMs(100)
+                .requestTimeoutMs(2000)
+                .maxRedirects(2)
+                .serverSets(local, remotes)
+                .streamNameRegex(streamRegex)
+                .handshakeWithClientInfo(handshakeWithClientInfo)
+                .clientBuilder(finagleClientBuilder)
+                .statsReceiver(monitorReceiver.scope("client"))
+                .buildMonitorClient();
+        runMonitor(dlConf, uri);
+    }
+
+    ServerSet[] createServerSets(String[] serverSetPaths) {
+        ServerSet[] serverSets = new ServerSet[serverSetPaths.length];
+        zkServerSets = new DLZkServerSet[serverSetPaths.length];
+        for (int i = 0; i < serverSetPaths.length; i++) {
+            String serverSetPath = serverSetPaths[i];
+            zkServerSets[i] = parseServerSet(serverSetPath);
+            serverSets[i] = zkServerSets[i].getServerSet();
+        }
+        return serverSets;
+    }
+
+    protected DLZkServerSet parseServerSet(String serverSetPath) {
+        return DLZkServerSet.of(URI.create(serverSetPath), 60000);
+    }
+
+    @Override
+    public void onStreamsChanged(Iterator<String> streams) {
+        Set<String> newSet = new HashSet<String>();
+        while (streams.hasNext()) {
+            String s = streams.next();
+            if (null == streamRegex || s.matches(streamRegex)) {
+                if (Math.abs(hashFunction.hashUnencodedChars(s).asInt()) % totalInstances == instanceId) {
+                    newSet.add(s);
+                }
+            }
+        }
+        List<StreamChecker> tasksToCancel = new ArrayList<StreamChecker>();
+        synchronized (knownStreams) {
+            Set<String> knownStreamSet = new HashSet<String>(knownStreams.keySet());
+            Set<String> removedStreams = Sets.difference(knownStreamSet, newSet).immutableCopy();
+            Set<String> addedStreams = Sets.difference(newSet, knownStreamSet).immutableCopy();
+            for (String s : removedStreams) {
+                StreamChecker task = knownStreams.remove(s);
+                if (null != task) {
+                    logger.info("Removed stream {}", s);
+                    tasksToCancel.add(task);
+                }
+            }
+            for (String s : addedStreams) {
+                if (!knownStreams.containsKey(s)) {
+                    logger.info("Added stream {}", s);
+                    StreamChecker sc = new StreamChecker(s);
+                    knownStreams.put(s, sc);
+                    sc.run();
+                }
+            }
+        }
+        for (StreamChecker sc : tasksToCancel) {
+            sc.close();
+        }
+    }
+
+    void runMonitor(DistributedLogConfiguration conf, URI dlUri) throws IOException {
+        // stats
+        statsProvider.getStatsLogger("monitor").registerGauge("num_streams", numOfStreamsGauge);
+        logger.info("Construct dl namespace @ {}", dlUri);
+        dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
+                .conf(conf)
+                .uri(dlUri)
+                .build();
+        if (watchNamespaceChanges) {
+            dlNamespace.registerNamespaceListener(this);
+        } else {
+            onStreamsChanged(dlNamespace.getLogs());
+        }
+    }
+
+    /**
+     * Close the server.
+     */
+    public void close() {
+        logger.info("Closing monitor service.");
+        if (null != dlClient) {
+            dlClient.close();
+        }
+        if (null != zkServerSets) {
+            for (DLZkServerSet zkServerSet : zkServerSets) {
+                zkServerSet.close();
+            }
+        }
+        if (null != dlNamespace) {
+            dlNamespace.close();
+        }
+        executorService.shutdown();
+        try {
+            if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
+                executorService.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            logger.error("Interrupted on waiting shutting down monitor executor service : ", e);
+        }
+        if (null != statsProvider) {
+            // clean up the gauges
+            unregisterGauge();
+            statsProvider.stop();
+        }
+        keepAliveLatch.countDown();
+        logger.info("Closed monitor service.");
+    }
+
+    public void join() throws InterruptedException {
+        keepAliveLatch.await();
+    }
+
+    /**
+     * clean up the gauge before we close to help GC.
+     */
+    private void unregisterGauge(){
+        statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", numOfStreamsGauge);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java
new file mode 100644
index 0000000..1f45b13
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static org.apache.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
+import static org.apache.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
+import static org.apache.distributedlog.util.CommandLineUtils.getOptionalStringArg;
+
+import com.twitter.finagle.stats.NullStatsReceiver;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.io.IOException;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The launcher to run monitor service.
+ */
+public class MonitorServiceApp {
+
+    private static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class);
+
+    static final String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]";
+
+    final String[] args;
+    final Options options = new Options();
+
+    private MonitorServiceApp(String[] args) {
+        this.args = args;
+        // prepare options
+        options.addOption("u", "uri", true, "DistributedLog URI");
+        options.addOption("c", "conf", true, "DistributedLog Configuration File");
+        options.addOption("s", "serverset", true, "Proxy Server Set");
+        options.addOption("i", "interval", true, "Check interval");
+        options.addOption("d", "region", true, "Region ID");
+        options.addOption("p", "provider", true, "DistributedLog Stats Provider");
+        options.addOption("f", "filter", true, "Filter streams by regex");
+        options.addOption("w", "watch", false, "Watch stream changes under a given namespace");
+        options.addOption("n", "instance_id", true, "Instance ID");
+        options.addOption("t", "total_instances", true, "Total instances");
+        options.addOption("hck", "heartbeat-num-checks", true, "Send a heartbeat after num checks");
+        options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info");
+    }
+
+    void printUsage() {
+        HelpFormatter helpFormatter = new HelpFormatter();
+        helpFormatter.printHelp(USAGE, options);
+    }
+
+    private void run() {
+        try {
+            logger.info("Running monitor service.");
+            BasicParser parser = new BasicParser();
+            CommandLine cmdline = parser.parse(options, args);
+            runCmd(cmdline);
+        } catch (ParseException pe) {
+            printUsage();
+            Runtime.getRuntime().exit(-1);
+        } catch (IOException ie) {
+            logger.error("Failed to start monitor service : ", ie);
+            Runtime.getRuntime().exit(-1);
+        }
+    }
+
+    void runCmd(CommandLine cmdline) throws IOException {
+        StatsProvider statsProvider = new NullStatsProvider();
+        if (cmdline.hasOption("p")) {
+            String providerClass = cmdline.getOptionValue("p");
+            statsProvider = ReflectionUtils.newInstance(providerClass, StatsProvider.class);
+        }
+        StatsReceiver statsReceiver = NullStatsReceiver.get();
+
+        final MonitorService monitorService = new MonitorService(
+                getOptionalStringArg(cmdline, "u"),
+                getOptionalStringArg(cmdline, "c"),
+                getOptionalStringArg(cmdline, "s"),
+                getOptionalIntegerArg(cmdline, "i"),
+                getOptionalIntegerArg(cmdline, "d"),
+                getOptionalStringArg(cmdline, "f"),
+                getOptionalIntegerArg(cmdline, "n"),
+                getOptionalIntegerArg(cmdline, "t"),
+                getOptionalIntegerArg(cmdline, "hck"),
+                getOptionalBooleanArg(cmdline, "hsci"),
+                getOptionalBooleanArg(cmdline, "w"),
+                getOptionalBooleanArg(cmdline, "mx"),
+                statsReceiver,
+                statsProvider);
+
+        monitorService.runServer();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                logger.info("Closing monitor service.");
+                monitorService.close();
+                logger.info("Closed monitor service.");
+            }
+        });
+        try {
+            monitorService.join();
+        } catch (InterruptedException ie) {
+            logger.warn("Interrupted when waiting monitor service to be finished : ", ie);
+        }
+    }
+
+    public static void main(String[] args) {
+        final MonitorServiceApp launcher = new MonitorServiceApp(args);
+        launcher.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
new file mode 100644
index 0000000..08f4b41
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
+import org.apache.distributedlog.thrift.service.BulkWriteResponse;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+
+/**
+ * Utility methods for building write proxy service responses.
+ */
+public class ResponseUtils {
+    public static ResponseHeader deniedHeader() {
+        return new ResponseHeader(StatusCode.REQUEST_DENIED);
+    }
+
+    public static ResponseHeader streamUnavailableHeader() {
+        return new ResponseHeader(StatusCode.STREAM_UNAVAILABLE);
+    }
+
+    public static ResponseHeader successHeader() {
+        return new ResponseHeader(StatusCode.SUCCESS);
+    }
+
+    public static ResponseHeader ownerToHeader(String owner) {
+        return new ResponseHeader(StatusCode.FOUND).setLocation(owner);
+    }
+
+    public static ResponseHeader exceptionToHeader(Throwable t) {
+        ResponseHeader response = new ResponseHeader();
+        if (t instanceof DLException) {
+            DLException dle = (DLException) t;
+            if (dle instanceof OwnershipAcquireFailedException) {
+                response.setLocation(((OwnershipAcquireFailedException) dle).getCurrentOwner());
+            }
+            response.setCode(StatusCode.findByValue(dle.getCode()));
+            response.setErrMsg(dle.getMessage());
+        } else {
+            response.setCode(StatusCode.INTERNAL_SERVER_ERROR);
+            response.setErrMsg("Internal server error : " + t.getMessage());
+        }
+        return response;
+    }
+
+    public static WriteResponse write(ResponseHeader responseHeader) {
+        return new WriteResponse(responseHeader);
+    }
+
+    public static WriteResponse writeSuccess() {
+        return new WriteResponse(successHeader());
+    }
+
+    public static WriteResponse writeDenied() {
+        return new WriteResponse(deniedHeader());
+    }
+
+    public static BulkWriteResponse bulkWrite(ResponseHeader responseHeader) {
+        return new BulkWriteResponse(responseHeader);
+    }
+
+    public static BulkWriteResponse bulkWriteSuccess() {
+        return new BulkWriteResponse(successHeader());
+    }
+
+    public static BulkWriteResponse bulkWriteDenied() {
+        return new BulkWriteResponse(deniedHeader());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java
new file mode 100644
index 0000000..436145d
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+/**
+ * List of feature keys used by distributedlog server.
+ */
+public enum ServerFeatureKeys {
+
+    REGION_STOP_ACCEPT_NEW_STREAM,
+    SERVICE_RATE_LIMIT_DISABLED,
+    SERVICE_CHECKSUM_DISABLED,
+    SERVICE_GLOBAL_LIMITER_DISABLED
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java
new file mode 100644
index 0000000..ee64580
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import com.google.common.base.Stopwatch;
+import com.twitter.finagle.Service;
+import com.twitter.finagle.SimpleFilter;
+import com.twitter.util.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Track distributedlog server finagle-service stats.
+ */
+class StatsFilter<Req, Rep> extends SimpleFilter<Req, Rep> {
+
+    private final StatsLogger stats;
+    private final Counter outstandingAsync;
+    private final OpStatsLogger serviceExec;
+
+    @Override
+    public Future<Rep> apply(Req req, Service<Req, Rep> service) {
+        Future<Rep> result = null;
+        outstandingAsync.inc();
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        try {
+            result = service.apply(req);
+            serviceExec.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+        } finally {
+            outstandingAsync.dec();
+            if (null == result) {
+                serviceExec.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+            }
+        }
+        return result;
+    }
+
+    public StatsFilter(StatsLogger stats) {
+        this.stats = stats;
+        this.outstandingAsync = stats.getCounter("outstandingAsync");
+        this.serviceExec = stats.getOpStatsLogger("serviceExec");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java
new file mode 100644
index 0000000..ee64fc7
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.announcer;
+
+import java.io.IOException;
+
+/**
+ * Announce service information.
+ */
+public interface Announcer {
+
+    /**
+     * Announce service info.
+     */
+    void announce() throws IOException;
+
+    /**
+     * Unannounce the service info.
+     */
+    void unannounce() throws IOException;
+
+    /**
+     * Close the announcer.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java
new file mode 100644
index 0000000..5a1277a
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.announcer;
+
+import java.io.IOException;
+
+/**
+ * A no-op implementation of {@link Announcer}.
+ */
+public class NOPAnnouncer implements Announcer {
+    @Override
+    public void announce() throws IOException {
+        // nop
+    }
+
+    @Override
+    public void unannounce() throws IOException {
+        // nop
+    }
+
+    @Override
+    public void close() {
+        // nop
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java
new file mode 100644
index 0000000..df4a8e2
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.announcer;
+
+import com.twitter.common.zookeeper.Group;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ServerSet based announcer.
+ */
+public class ServerSetAnnouncer implements Announcer {
+
+    private static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class);
+
+    final String localAddr;
+    final InetSocketAddress serviceEndpoint;
+    final Map<String, InetSocketAddress> additionalEndpoints;
+    final int shardId;
+
+    // ServerSet
+    DLZkServerSet zkServerSet;
+
+    // Service Status
+    ServerSet.EndpointStatus serviceStatus = null;
+
+    /**
+     * Announce server infos.
+     *
+     * @param servicePort
+     *          service port
+     * @param statsPort
+     *          stats port
+     * @param shardId
+     *          shard id
+     */
+    public ServerSetAnnouncer(URI uri,
+                              int servicePort,
+                              int statsPort,
+                              int shardId) throws UnknownHostException {
+        this.shardId = shardId;
+        this.localAddr = InetAddress.getLocalHost().getHostAddress();
+        // service endpoint
+        this.serviceEndpoint = new InetSocketAddress(localAddr, servicePort);
+        // stats endpoint
+        InetSocketAddress statsEndpoint = new InetSocketAddress(localAddr, statsPort);
+        this.additionalEndpoints = new HashMap<String, InetSocketAddress>();
+        this.additionalEndpoints.put("aurora", statsEndpoint);
+        this.additionalEndpoints.put("stats", statsEndpoint);
+        this.additionalEndpoints.put("service", serviceEndpoint);
+        this.additionalEndpoints.put("thrift", serviceEndpoint);
+
+        // Create zookeeper and server set
+        this.zkServerSet = DLZkServerSet.of(uri, 60000);
+    }
+
+    @Override
+    public synchronized void announce() throws IOException {
+        try {
+            serviceStatus =
+                    zkServerSet.getServerSet().join(serviceEndpoint, additionalEndpoints, shardId);
+        } catch (Group.JoinException e) {
+            throw new IOException("Failed to announce service : ", e);
+        } catch (InterruptedException e) {
+            logger.warn("Interrupted on announcing service : ", e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public synchronized void unannounce() throws IOException {
+        if (null == serviceStatus) {
+            logger.warn("No service to unannounce.");
+            return;
+        }
+        try {
+            serviceStatus.leave();
+        } catch (ServerSet.UpdateException e) {
+            throw new IOException("Failed to unannounce service : ", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        zkServerSet.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/package-info.java
new file mode 100644
index 0000000..6559bb3
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Announcers to announce servers to server set.
+ */
+package org.apache.distributedlog.service.announcer;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java
new file mode 100644
index 0000000..cdffaa3
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * Balancer Interface.
+ *
+ * <p>A balancer is used for balance the streams across the proxy cluster.
+ */
+public interface Balancer {
+
+    /**
+     * Rebalance all the streams from <i>source</i> to others.
+     *
+     * @param source
+     *          source target name.
+     * @param rebalanceConcurrency
+     *          the concurrency to move streams for re-balance.
+     * @param rebalanceRateLimiter
+     *          the rate limiting to move streams for re-balance.
+     */
+    void balanceAll(String source,
+                    int rebalanceConcurrency,
+                    Optional<RateLimiter> rebalanceRateLimiter);
+
+    /**
+     * Balance the streams across all targets.
+     *
+     * @param rebalanceWaterMark
+     *          rebalance water mark. if number of streams of a given target is less than
+     *          the water mark, no streams will be re-balanced from this target.
+     * @param rebalanceTolerancePercentage
+     *          tolerance percentage for the balancer. if number of streams of a given target is
+     *          less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will
+     *          be re-balanced from that target.
+     * @param rebalanceConcurrency
+     *          the concurrency to move streams for re-balance.
+     * @param rebalanceRateLimiter
+     *          the rate limiting to move streams for re-balance.
+     */
+    void balance(int rebalanceWaterMark,
+                 double rebalanceTolerancePercentage,
+                 int rebalanceConcurrency,
+                 Optional<RateLimiter> rebalanceRateLimiter);
+
+    /**
+     * Close the balancer.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java
new file mode 100644
index 0000000..964c1cc
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import org.apache.distributedlog.impl.BKNamespaceDriver;
+import org.apache.distributedlog.service.ClientUtils;
+import org.apache.distributedlog.service.DLSocketAddress;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.service.DistributedLogClientBuilder;
+import org.apache.distributedlog.tools.Tool;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tool to rebalance cluster.
+ */
+public class BalancerTool extends Tool {
+
+    private static final Logger logger = LoggerFactory.getLogger(BalancerTool.class);
+
+    static DistributedLogClientBuilder createDistributedLogClientBuilder(ServerSet serverSet) {
+        return DistributedLogClientBuilder.newBuilder()
+                        .name("rebalancer_tool")
+                        .clientId(ClientId$.MODULE$.apply("rebalancer_tool"))
+                        .maxRedirects(2)
+                        .serverSet(serverSet)
+                        .clientBuilder(ClientBuilder.get()
+                                .connectionTimeout(Duration.fromSeconds(2))
+                                .tcpConnectTimeout(Duration.fromSeconds(2))
+                                .requestTimeout(Duration.fromSeconds(10))
+                                .hostConnectionLimit(1)
+                                .hostConnectionCoresize(1)
+                                .keepAlive(true)
+                                .failFast(false));
+    }
+
+    /**
+     * Base Command to run balancer.
+     */
+    protected abstract static class BalancerCommand extends OptsCommand {
+
+        protected Options options = new Options();
+        protected int rebalanceWaterMark = 0;
+        protected double rebalanceTolerancePercentage = 0.0f;
+        protected int rebalanceConcurrency = 1;
+        protected Double rate = null;
+        protected Optional<RateLimiter> rateLimiter;
+
+        BalancerCommand(String name, String description) {
+            super(name, description);
+            options.addOption("rwm", "rebalance-water-mark", true, "Rebalance water mark per proxy");
+            options.addOption("rtp", "rebalance-tolerance-percentage", true,
+                "Rebalance tolerance percentage per proxy");
+            options.addOption("rc", "rebalance-concurrency", true, "Concurrency to rebalance stream distribution");
+            options.addOption("r", "rate", true, "Rebalance rate");
+        }
+
+        Optional<RateLimiter> getRateLimiter() {
+            return rateLimiter;
+        }
+
+        @Override
+        protected Options getOptions() {
+            return options;
+        }
+
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            if (cmdline.hasOption("rwm")) {
+                this.rebalanceWaterMark = Integer.parseInt(cmdline.getOptionValue("rwm"));
+            }
+            if (cmdline.hasOption("rtp")) {
+                this.rebalanceTolerancePercentage = Double.parseDouble(cmdline.getOptionValue("rtp"));
+            }
+            if (cmdline.hasOption("rc")) {
+                this.rebalanceConcurrency = Integer.parseInt(cmdline.getOptionValue("rc"));
+            }
+            if (cmdline.hasOption("r")) {
+                this.rate = Double.parseDouble(cmdline.getOptionValue("r"));
+            }
+            checkArgument(rebalanceWaterMark >= 0,
+                    "Rebalance Water Mark should be a non-negative number");
+            checkArgument(rebalanceTolerancePercentage >= 0.0f,
+                    "Rebalance Tolerance Percentage should be a non-negative number");
+            checkArgument(rebalanceConcurrency > 0,
+                    "Rebalance Concurrency should be a positive number");
+            if (null == rate || rate <= 0.0f) {
+                rateLimiter = Optional.absent();
+            } else {
+                rateLimiter = Optional.of(RateLimiter.create(rate));
+            }
+        }
+
+        @Override
+        protected int runCmd(CommandLine cmdline) throws Exception {
+            try {
+                parseCommandLine(cmdline);
+            } catch (ParseException pe) {
+                println("ERROR: fail to parse commandline : '" + pe.getMessage() + "'");
+                printUsage();
+                return -1;
+            }
+            return executeCommand(cmdline);
+        }
+
+        protected abstract int executeCommand(CommandLine cmdline) throws Exception;
+    }
+
+    /**
+     * Command to balance streams within a cluster.
+     */
+    protected static class ClusterBalancerCommand extends BalancerCommand {
+
+        protected URI uri;
+        protected String source = null;
+
+        protected ClusterBalancerCommand() {
+            super("clusterbalancer", "Balance streams inside a cluster");
+            options.addOption("u", "uri", true, "DistributedLog URI");
+            options.addOption("sp", "source-proxy", true, "Source proxy to balance");
+        }
+
+        @Override
+        protected String getUsage() {
+            return "clusterbalancer [options]";
+        }
+
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            super.parseCommandLine(cmdline);
+            if (!cmdline.hasOption("u")) {
+                throw new ParseException("No proxy serverset provided.");
+            }
+            uri = URI.create(cmdline.getOptionValue("u"));
+            if (cmdline.hasOption("sp")) {
+                String sourceProxyStr = cmdline.getOptionValue("sp");
+                try {
+                    DLSocketAddress.parseSocketAddress(sourceProxyStr);
+                } catch (IllegalArgumentException iae) {
+                    throw new ParseException("Invalid source proxy " + sourceProxyStr + " : " + iae.getMessage());
+                }
+                this.source = sourceProxyStr;
+            }
+        }
+
+        @Override
+        protected int executeCommand(CommandLine cmdline) throws Exception {
+            DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000);
+            logger.info("Created serverset for {}", uri);
+            try {
+                DistributedLogClientBuilder clientBuilder =
+                        createDistributedLogClientBuilder(serverSet.getServerSet());
+                ClusterBalancer balancer = new ClusterBalancer(clientBuilder);
+                try {
+                    return runBalancer(clientBuilder, balancer);
+                } finally {
+                    balancer.close();
+                }
+            } finally {
+                serverSet.close();
+            }
+        }
+
+        protected int runBalancer(DistributedLogClientBuilder clientBuilder,
+                                  ClusterBalancer balancer)
+                throws Exception {
+            if (null == source) {
+                balancer.balance(
+                    rebalanceWaterMark,
+                    rebalanceTolerancePercentage,
+                    rebalanceConcurrency,
+                    getRateLimiter());
+            } else {
+                balanceFromSource(clientBuilder, balancer, source, getRateLimiter());
+            }
+            return 0;
+        }
+
+        protected void balanceFromSource(DistributedLogClientBuilder clientBuilder,
+                                         ClusterBalancer balancer,
+                                         String source,
+                                         Optional<RateLimiter> rateLimiter)
+                throws Exception {
+            InetSocketAddress sourceAddr = DLSocketAddress.parseSocketAddress(source);
+            DistributedLogClientBuilder sourceClientBuilder =
+                    DistributedLogClientBuilder.newBuilder(clientBuilder)
+                            .host(sourceAddr);
+
+            Pair<DistributedLogClient, MonitorServiceClient> clientPair =
+                    ClientUtils.buildClient(sourceClientBuilder);
+            try {
+                Await.result(clientPair.getRight().setAcceptNewStream(false));
+                logger.info("Disable accepting new stream on proxy {}.", source);
+                balancer.balanceAll(source, rebalanceConcurrency, rateLimiter);
+            } finally {
+                clientPair.getLeft().close();
+            }
+        }
+    }
+
+    /**
+     * Command to balance streams between regions.
+     */
+    protected static class RegionBalancerCommand extends BalancerCommand {
+
+        protected URI region1;
+        protected URI region2;
+        protected String source = null;
+
+        protected RegionBalancerCommand() {
+            super("regionbalancer", "Balance streams between regions");
+            options.addOption("rs", "regions", true, "DistributedLog Region URI: uri1[,uri2]");
+            options.addOption("s", "source", true, "DistributedLog Source Region to balance");
+        }
+
+        @Override
+        protected String getUsage() {
+            return "regionbalancer [options]";
+        }
+
+        @Override
+        protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+            super.parseCommandLine(cmdline);
+            if (!cmdline.hasOption("rs")) {
+                throw new ParseException("No regions provided.");
+            }
+            String regionsStr = cmdline.getOptionValue("rs");
+            String[] regions = regionsStr.split(",");
+            if (regions.length != 2) {
+                throw new ParseException("Invalid regions provided. Expected : serverset1[,serverset2]");
+            }
+            region1 = URI.create(regions[0]);
+            region2 = URI.create(regions[1]);
+            if (cmdline.hasOption("s")) {
+                source = cmdline.getOptionValue("s");
+            }
+        }
+
+        @Override
+        protected int executeCommand(CommandLine cmdline) throws Exception {
+            DLZkServerSet serverSet1 = DLZkServerSet.of(region1, 60000);
+            logger.info("Created serverset for {}", region1);
+            DLZkServerSet serverSet2 = DLZkServerSet.of(region2, 60000);
+            logger.info("Created serverset for {}", region2);
+            try {
+                DistributedLogClientBuilder builder1 =
+                        createDistributedLogClientBuilder(serverSet1.getServerSet());
+                Pair<DistributedLogClient, MonitorServiceClient> pair1 =
+                        ClientUtils.buildClient(builder1);
+                DistributedLogClientBuilder builder2 =
+                        createDistributedLogClientBuilder(serverSet2.getServerSet());
+                Pair<DistributedLogClient, MonitorServiceClient> pair2 =
+                        ClientUtils.buildClient(builder2);
+                try {
+                    SimpleBalancer balancer = new SimpleBalancer(
+                            BKNamespaceDriver.getZKServersFromDLUri(region1), pair1.getLeft(), pair1.getRight(),
+                            BKNamespaceDriver.getZKServersFromDLUri(region2), pair2.getLeft(), pair2.getRight());
+                    try {
+                        return runBalancer(balancer);
+                    } finally {
+                        balancer.close();
+                    }
+                } finally {
+                    pair1.getLeft().close();
+                    pair2.getLeft().close();
+                }
+            } finally {
+                serverSet1.close();
+                serverSet2.close();
+            }
+        }
+
+        protected int runBalancer(SimpleBalancer balancer) throws Exception {
+            if (null == source) {
+                balancer.balance(
+                    rebalanceWaterMark,
+                    rebalanceTolerancePercentage,
+                    rebalanceConcurrency,
+                    getRateLimiter());
+            } else {
+                balancer.balanceAll(source, rebalanceConcurrency, getRateLimiter());
+            }
+            return 0;
+        }
+    }
+
+    public BalancerTool() {
+        super();
+        addCommand(new ClusterBalancerCommand());
+        addCommand(new RegionBalancerCommand());
+    }
+
+    @Override
+    protected String getName() {
+        return "balancer";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java
new file mode 100644
index 0000000..4c9e075
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import java.util.Map;
+
+/**
+ * Utils for balancer.
+ */
+public class BalancerUtils {
+
+    /**
+     * Util function to calculate how many streams to balance for <i>nodeToRebalance</i>,
+     * based on the load distribution <i>loadDistribution</i>.
+     *
+     * @param nodeToRebalance
+     *          node to rebalance
+     * @param loadDistribution
+     *          load distribution map
+     * @param rebalanceWaterMark
+     *          if number of streams of <i>nodeToRebalance</i>
+     *          is less than <i>rebalanceWaterMark</i>, no streams will be re-balanced.
+     * @param tolerancePercentage
+     *          tolerance percentage for the balancer. if number of streams of <i>nodeToRebalance</i>
+     *          is less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will
+     *          be re-balanced.
+     * @param <K>
+     * @return number of streams to rebalance
+     */
+    public static <K> int calculateNumStreamsToRebalance(K nodeToRebalance,
+                                                         Map<K, Integer> loadDistribution,
+                                                         int rebalanceWaterMark,
+                                                         double tolerancePercentage) {
+        Integer myLoad = loadDistribution.get(nodeToRebalance);
+        if (null == myLoad || myLoad <= rebalanceWaterMark) {
+            return 0;
+        }
+
+        long totalLoad = 0L;
+        int numNodes = loadDistribution.size();
+
+        for (Map.Entry<K, Integer> entry : loadDistribution.entrySet()) {
+            if (null == entry.getKey() || null == entry.getValue()) {
+                continue;
+            }
+            totalLoad += entry.getValue();
+        }
+
+        double averageLoad = ((double) totalLoad) / numNodes;
+        long permissibleLoad =
+                Math.max(1L, (long) Math.ceil(averageLoad + averageLoad * tolerancePercentage / 100.0f));
+
+        if (myLoad <= permissibleLoad) {
+            return 0;
+        }
+
+        return Math.max(0, myLoad - (int) Math.ceil(averageLoad));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java
new file mode 100644
index 0000000..5add339
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.service.ClientUtils;
+import org.apache.distributedlog.service.DLSocketAddress;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.service.DistributedLogClientBuilder;
+import com.twitter.util.Await;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import java.io.Serializable;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A balancer balances ownerships with a cluster of targets.
+ */
+public class ClusterBalancer implements Balancer {
+
+    private static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class);
+
+    /**
+     * Represent a single host. Ordered by number of streams in desc order.
+     */
+    static class Host {
+
+        final SocketAddress address;
+        final Set<String> streams;
+        final DistributedLogClientBuilder clientBuilder;
+        DistributedLogClient client = null;
+        MonitorServiceClient monitor = null;
+
+        Host(SocketAddress address, Set<String> streams,
+             DistributedLogClientBuilder clientBuilder) {
+            this.address = address;
+            this.streams = streams;
+            this.clientBuilder = clientBuilder;
+        }
+
+        private void initializeClientsIfNeeded() {
+            if (null == client) {
+                Pair<DistributedLogClient, MonitorServiceClient> clientPair =
+                        createDistributedLogClient(address, clientBuilder);
+                client = clientPair.getLeft();
+                monitor = clientPair.getRight();
+            }
+        }
+
+        synchronized DistributedLogClient getClient() {
+            initializeClientsIfNeeded();
+            return client;
+        }
+
+        synchronized MonitorServiceClient getMonitor() {
+            initializeClientsIfNeeded();
+            return monitor;
+        }
+
+        synchronized void close() {
+            if (null != client) {
+                client.close();
+            }
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("Host(").append(address).append(")");
+            return sb.toString();
+        }
+    }
+
+    static class HostComparator implements Comparator<Host>, Serializable {
+        private static final long serialVersionUID = 7984973796525102538L;
+
+        @Override
+        public int compare(Host h1, Host h2) {
+            return h2.streams.size() - h1.streams.size();
+        }
+    }
+
+    protected final DistributedLogClientBuilder clientBuilder;
+    protected final DistributedLogClient client;
+    protected final MonitorServiceClient monitor;
+
+    public ClusterBalancer(DistributedLogClientBuilder clientBuilder) {
+        this(clientBuilder, ClientUtils.buildClient(clientBuilder));
+    }
+
+    ClusterBalancer(DistributedLogClientBuilder clientBuilder,
+                    Pair<DistributedLogClient, MonitorServiceClient> clientPair) {
+        this.clientBuilder = clientBuilder;
+        this.client = clientPair.getLeft();
+        this.monitor = clientPair.getRight();
+    }
+
+    /**
+     * Build a new distributedlog client to a single host <i>host</i>.
+     *
+     * @param host
+     *          host to access
+     * @return distributedlog clients
+     */
+    static Pair<DistributedLogClient, MonitorServiceClient> createDistributedLogClient(
+            SocketAddress host, DistributedLogClientBuilder clientBuilder) {
+        DistributedLogClientBuilder newBuilder =
+                DistributedLogClientBuilder.newBuilder(clientBuilder).host(host);
+        return ClientUtils.buildClient(newBuilder);
+    }
+
+    @Override
+    public void balanceAll(String source,
+                           int rebalanceConcurrency, /* unused */
+                           Optional<RateLimiter> rebalanceRateLimiter) {
+        balance(0, 0.0f, rebalanceConcurrency, Optional.of(source), rebalanceRateLimiter);
+    }
+
+    @Override
+    public void balance(int rebalanceWaterMark,
+                        double rebalanceTolerancePercentage,
+                        int rebalanceConcurrency, /* unused */
+                        Optional<RateLimiter> rebalanceRateLimiter) {
+        Optional<String> source = Optional.absent();
+        balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, source, rebalanceRateLimiter);
+    }
+
+    public void balance(int rebalanceWaterMark,
+                        double rebalanceTolerancePercentage,
+                        int rebalanceConcurrency,
+                        Optional<String> source,
+                        Optional<RateLimiter> rebalanceRateLimiter) {
+        Map<SocketAddress, Set<String>> distribution = monitor.getStreamOwnershipDistribution();
+        if (distribution.size() <= 1) {
+            return;
+        }
+        SocketAddress sourceAddr = null;
+        if (source.isPresent()) {
+            sourceAddr = DLSocketAddress.parseSocketAddress(source.get());
+            logger.info("Balancer source is {}", sourceAddr);
+            if (!distribution.containsKey(sourceAddr)) {
+                return;
+            }
+        }
+        // Get the list of hosts ordered by number of streams in DESC order
+        List<Host> hosts = new ArrayList<Host>(distribution.size());
+        for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) {
+            Host host = new Host(entry.getKey(), entry.getValue(), clientBuilder);
+            hosts.add(host);
+        }
+        Collections.sort(hosts, new HostComparator());
+        try {
+
+            // find the host to move streams from.
+            int hostIdxMoveFrom = -1;
+            if (null != sourceAddr) {
+                for (Host host : hosts) {
+                    ++hostIdxMoveFrom;
+                    if (sourceAddr.equals(host.address)) {
+                        break;
+                    }
+                }
+            }
+
+            // compute the average load.
+            int totalStream = 0;
+            for (Host host : hosts) {
+                totalStream += host.streams.size();
+            }
+            double averageLoad;
+            if (hostIdxMoveFrom >= 0) {
+                averageLoad = ((double) totalStream / (hosts.size() - 1));
+            } else {
+                averageLoad = ((double) totalStream / hosts.size());
+            }
+
+            int moveFromLowWaterMark;
+            int moveToHighWaterMark =
+                Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f));
+
+            if (hostIdxMoveFrom >= 0) {
+                moveFromLowWaterMark = Math.max(0, rebalanceWaterMark);
+                moveStreams(
+                        hosts,
+                        new AtomicInteger(hostIdxMoveFrom), moveFromLowWaterMark,
+                        new AtomicInteger(hosts.size() - 1), moveToHighWaterMark,
+                        rebalanceRateLimiter);
+                moveRemainingStreamsFromSource(hosts.get(hostIdxMoveFrom), hosts, rebalanceRateLimiter);
+            } else {
+                moveFromLowWaterMark = Math.max((int) Math.ceil(averageLoad), rebalanceWaterMark);
+                AtomicInteger moveFrom = new AtomicInteger(0);
+                AtomicInteger moveTo = new AtomicInteger(hosts.size() - 1);
+                while (moveFrom.get() < moveTo.get()) {
+                    moveStreams(hosts, moveFrom, moveFromLowWaterMark,
+                        moveTo, moveToHighWaterMark, rebalanceRateLimiter);
+                    moveFrom.incrementAndGet();
+                }
+            }
+        } finally {
+            for (Host host : hosts) {
+                host.close();
+            }
+        }
+    }
+
+    void moveStreams(List<Host> hosts,
+                     AtomicInteger hostIdxMoveFrom,
+                     int moveFromLowWaterMark,
+                     AtomicInteger hostIdxMoveTo,
+                     int moveToHighWaterMark,
+                     Optional<RateLimiter> rateLimiter) {
+        if (hostIdxMoveFrom.get() < 0 || hostIdxMoveFrom.get() >= hosts.size()
+                || hostIdxMoveTo.get() < 0 || hostIdxMoveTo.get() >= hosts.size()
+                || hostIdxMoveFrom.get() >= hostIdxMoveTo.get()) {
+            return;
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Moving streams : hosts = {}, from = {}, to = {} :"
+                + " from_low_water_mark = {}, to_high_water_mark = {}",
+                new Object[] {
+                    hosts,
+                    hostIdxMoveFrom.get(),
+                    hostIdxMoveTo.get(),
+                    moveFromLowWaterMark,
+                    moveToHighWaterMark });
+        }
+
+        Host hostMoveFrom = hosts.get(hostIdxMoveFrom.get());
+        int numStreamsOnFromHost = hostMoveFrom.streams.size();
+        if (numStreamsOnFromHost <= moveFromLowWaterMark) {
+            // do nothing
+            return;
+        }
+
+        int numStreamsToMove = numStreamsOnFromHost - moveFromLowWaterMark;
+        LinkedList<String> streamsToMove = new LinkedList<String>(hostMoveFrom.streams);
+        Collections.shuffle(streamsToMove);
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Try to move {} streams from host {} : streams = {}",
+                         new Object[] { numStreamsToMove, hostMoveFrom.address, streamsToMove });
+        }
+
+        while (numStreamsToMove-- > 0 && !streamsToMove.isEmpty()) {
+            if (rateLimiter.isPresent()) {
+                rateLimiter.get().acquire();
+            }
+
+            // pick a host to move
+            Host hostMoveTo = hosts.get(hostIdxMoveTo.get());
+            while (hostMoveTo.streams.size() >= moveToHighWaterMark) {
+                int hostIdx = hostIdxMoveTo.decrementAndGet();
+                logger.info("move to host : {}, from {}", hostIdx, hostIdxMoveFrom.get());
+                if (hostIdx <= hostIdxMoveFrom.get()) {
+                    return;
+                } else {
+                    hostMoveTo = hosts.get(hostIdx);
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Target host to move moved to host {} @ {}",
+                                hostIdx, hostMoveTo);
+                    }
+                }
+            }
+
+            // pick a stream
+            String stream = streamsToMove.remove();
+
+            // move the stream
+            if (moveStream(stream, hostMoveFrom, hostMoveTo)) {
+                hostMoveFrom.streams.remove(stream);
+                hostMoveTo.streams.add(stream);
+            }
+        }
+
+    }
+
+    void moveRemainingStreamsFromSource(Host source,
+                                        List<Host> hosts,
+                                        Optional<RateLimiter> rateLimiter) {
+        LinkedList<String> streamsToMove = new LinkedList<String>(source.streams);
+        Collections.shuffle(streamsToMove);
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Try to move remaining streams from {} : {}", source, streamsToMove);
+        }
+
+        int hostIdx = hosts.size() - 1;
+
+        while (!streamsToMove.isEmpty()) {
+            if (rateLimiter.isPresent()) {
+                rateLimiter.get().acquire();
+            }
+
+            Host target = hosts.get(hostIdx);
+            if (!target.address.equals(source.address)) {
+                String stream = streamsToMove.remove();
+                // move the stream
+                if (moveStream(stream, source, target)) {
+                    source.streams.remove(stream);
+                    target.streams.add(stream);
+                }
+            }
+            --hostIdx;
+            if (hostIdx < 0) {
+                hostIdx = hosts.size() - 1;
+            }
+        }
+    }
+
+    private boolean moveStream(String stream, Host from, Host to) {
+        try {
+            doMoveStream(stream, from, to);
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
+    private void doMoveStream(final String stream, final Host from, final Host to) throws Exception {
+        logger.info("Moving stream {} from {} to {}.",
+                    new Object[] { stream, from.address, to.address });
+        Await.result(from.getClient().release(stream).flatMap(new Function<Void, Future<Void>>() {
+            @Override
+            public Future<Void> apply(Void result) {
+                logger.info("Released stream {} from {}.", stream, from.address);
+                return to.getMonitor().check(stream).addEventListener(new FutureEventListener<Void>() {
+
+                    @Override
+                    public void onSuccess(Void value) {
+                        logger.info("Moved stream {} from {} to {}.",
+                                    new Object[] { stream, from.address, to.address });
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        logger.info("Failed to move stream {} from {} to {} : ",
+                                    new Object[] { stream, from.address, to.address, cause });
+                    }
+                });
+            }
+        }));
+    }
+
+    @Override
+    public void close() {
+        client.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java
new file mode 100644
index 0000000..6a43179
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * A stream chooser based on number of streams.
+ */
+class CountBasedStreamChooser implements StreamChooser, Serializable,
+        Comparator<Pair<SocketAddress, LinkedList<String>>> {
+
+    private static final long serialVersionUID = 4664153397369979203L;
+
+    final List<Pair<SocketAddress, LinkedList<String>>> streamsDistribution;
+
+    // pivot index in the list of hosts. the chooser will remove streams from the hosts before
+    // pivot, which will reduce their stream counts to make them equal to the stream count of the pivot.
+    int pivot;
+    int pivotCount;
+
+    // next index in the list of hosts to choose stream from.
+    int next;
+
+    CountBasedStreamChooser(Map<SocketAddress, Set<String>> streams) {
+        checkArgument(streams.size() > 0, "Only support no-empty streams distribution");
+        streamsDistribution = new ArrayList<Pair<SocketAddress, LinkedList<String>>>(streams.size());
+        for (Map.Entry<SocketAddress, Set<String>> entry : streams.entrySet()) {
+            LinkedList<String> randomizedStreams = new LinkedList<String>(entry.getValue());
+            Collections.shuffle(randomizedStreams);
+            streamsDistribution.add(Pair.of(entry.getKey(), randomizedStreams));
+        }
+        // sort the hosts by the number of streams in descending order
+        Collections.sort(streamsDistribution, this);
+        pivot = 0;
+        pivotCount = streamsDistribution.get(0).getValue().size();
+        findNextPivot();
+        next = 0;
+    }
+
+    private void findNextPivot() {
+        int prevPivotCount = pivotCount;
+        while (++pivot < streamsDistribution.size()) {
+            pivotCount = streamsDistribution.get(pivot).getValue().size();
+            if (pivotCount < prevPivotCount) {
+                return;
+            }
+        }
+        pivot = streamsDistribution.size();
+        pivotCount = 0;
+    }
+
+    @Override
+    public synchronized String choose() {
+        // reach the pivot
+        if (next == pivot) {
+            if (streamsDistribution.get(next - 1).getRight().size() > pivotCount) {
+                next = 0;
+            } else if (pivotCount == 0) { // the streams are empty now
+                return null;
+            } else {
+                findNextPivot();
+                next = 0;
+            }
+        }
+
+        // get stream count that next host to choose from
+        LinkedList<String> nextStreams = streamsDistribution.get(next).getRight();
+        if (nextStreams.size() == 0) {
+            return null;
+        }
+
+        String chosenStream = nextStreams.remove();
+        ++next;
+        return chosenStream;
+    }
+
+    @Override
+    public int compare(Pair<SocketAddress, LinkedList<String>> o1,
+                       Pair<SocketAddress, LinkedList<String>> o2) {
+        return o2.getValue().size() - o1.getValue().size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java
new file mode 100644
index 0000000..4aefc5e
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+/**
+ * A stream chooser that can only choose limited number of streams.
+ */
+public class LimitedStreamChooser implements StreamChooser {
+
+  /**
+   * Create a limited stream chooser by {@code limit}.
+   *
+   * @param underlying the underlying stream chooser.
+   * @param limit the limit of number of streams to choose.
+   * @return the limited stream chooser.
+   */
+    public static LimitedStreamChooser of(StreamChooser underlying, int limit) {
+        return new LimitedStreamChooser(underlying, limit);
+    }
+
+    final StreamChooser underlying;
+    int limit;
+
+    LimitedStreamChooser(StreamChooser underlying, int limit) {
+        this.underlying = underlying;
+        this.limit = limit;
+    }
+
+    @Override
+    public synchronized String choose() {
+        if (limit <= 0) {
+            return null;
+        }
+        String s = underlying.choose();
+        if (s == null) {
+            limit = 0;
+            return null;
+        }
+        --limit;
+        return s;
+    }
+}