You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:24 UTC
[17/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
new file mode 100644
index 0000000..b1e2879
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.callback.NamespaceListener;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.stats.Stat;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration;
+import com.twitter.util.FutureEventListener;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monitor Service.
+ */
+public class MonitorService implements NamespaceListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
+
+ private DistributedLogNamespace dlNamespace = null;
+ private MonitorServiceClient dlClient = null;
+ private DLZkServerSet[] zkServerSets = null;
+ private final ScheduledExecutorService executorService =
+ Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
+ private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
+ private final Map<String, StreamChecker> knownStreams = new HashMap<String, StreamChecker>();
+
+ // Settings
+ private int regionId = DistributedLogConstants.LOCAL_REGION_ID;
+ private int interval = 100;
+ private String streamRegex = null;
+ private boolean watchNamespaceChanges = false;
+ private boolean handshakeWithClientInfo = false;
+ private int heartbeatEveryChecks = 0;
+ private int instanceId = -1;
+ private int totalInstances = -1;
+ private boolean isThriftMux = false;
+
+ // Options
+ private final Optional<String> uriArg;
+ private final Optional<String> confFileArg;
+ private final Optional<String> serverSetArg;
+ private final Optional<Integer> intervalArg;
+ private final Optional<Integer> regionIdArg;
+ private final Optional<String> streamRegexArg;
+ private final Optional<Integer> instanceIdArg;
+ private final Optional<Integer> totalInstancesArg;
+ private final Optional<Integer> heartbeatEveryChecksArg;
+ private final Optional<Boolean> handshakeWithClientInfoArg;
+ private final Optional<Boolean> watchNamespaceChangesArg;
+ private final Optional<Boolean> isThriftMuxArg;
+
+ // Stats
+ private final StatsProvider statsProvider;
+ private final StatsReceiver statsReceiver;
+ private final StatsReceiver monitorReceiver;
+ private final Stat successStat;
+ private final Stat failureStat;
+ private final Gauge<Number> numOfStreamsGauge;
+ // Hash Function
+ private final HashFunction hashFunction = Hashing.md5();
+
+ class StreamChecker implements Runnable, FutureEventListener<Void>, LogSegmentListener {
+ private final String name;
+ private volatile boolean closed = false;
+ private volatile boolean checking = false;
+ private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+ private DistributedLogManager dlm = null;
+ private int numChecks = 0;
+
+ StreamChecker(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void run() {
+ if (null == dlm) {
+ try {
+ dlm = dlNamespace.openLog(name);
+ dlm.registerListener(this);
+ } catch (IOException e) {
+ if (null != dlm) {
+ try {
+ dlm.close();
+ } catch (IOException e1) {
+ logger.error("Failed to close dlm for {} : ", name, e1);
+ }
+ dlm = null;
+ }
+ executorService.schedule(this, interval, TimeUnit.MILLISECONDS);
+ }
+ } else {
+ stopwatch.reset().start();
+ boolean sendHeartBeat;
+ if (heartbeatEveryChecks > 0) {
+ synchronized (this) {
+ ++numChecks;
+ if (numChecks >= Integer.MAX_VALUE) {
+ numChecks = 0;
+ }
+ sendHeartBeat = (numChecks % heartbeatEveryChecks) == 0;
+ }
+ } else {
+ sendHeartBeat = false;
+ }
+ if (sendHeartBeat) {
+ dlClient.heartbeat(name).addEventListener(this);
+ } else {
+ dlClient.check(name).addEventListener(this);
+ }
+ }
+ }
+
+ @Override
+ public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
+ if (segments.size() > 0 && segments.get(0).getRegionId() == regionId) {
+ if (!checking) {
+ logger.info("Start checking stream {}.", name);
+ checking = true;
+ run();
+ }
+ } else {
+ if (checking) {
+ logger.info("Stop checking stream {}.", name);
+ }
+ }
+ }
+
+ @Override
+ public void onLogStreamDeleted() {
+ logger.info("Stream {} is deleted", name);
+ }
+
+ @Override
+ public void onSuccess(Void value) {
+ successStat.add(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ scheduleCheck();
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ failureStat.add(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ scheduleCheck();
+ }
+
+ private void scheduleCheck() {
+ if (closed) {
+ return;
+ }
+ if (!checking) {
+ return;
+ }
+ try {
+ executorService.schedule(this, interval, TimeUnit.MILLISECONDS);
+ } catch (RejectedExecutionException ree) {
+ logger.error("Failed to schedule checking stream {} in {} ms : ",
+ new Object[] { name, interval, ree });
+ }
+ }
+
+ private void close() {
+ closed = true;
+ if (null != dlm) {
+ try {
+ dlm.close();
+ } catch (IOException e) {
+ logger.error("Failed to close dlm for {} : ", name, e);
+ }
+ }
+ }
+ }
+
+ MonitorService(Optional<String> uriArg,
+ Optional<String> confFileArg,
+ Optional<String> serverSetArg,
+ Optional<Integer> intervalArg,
+ Optional<Integer> regionIdArg,
+ Optional<String> streamRegexArg,
+ Optional<Integer> instanceIdArg,
+ Optional<Integer> totalInstancesArg,
+ Optional<Integer> heartbeatEveryChecksArg,
+ Optional<Boolean> handshakeWithClientInfoArg,
+ Optional<Boolean> watchNamespaceChangesArg,
+ Optional<Boolean> isThriftMuxArg,
+ StatsReceiver statsReceiver,
+ StatsProvider statsProvider) {
+ // options
+ this.uriArg = uriArg;
+ this.confFileArg = confFileArg;
+ this.serverSetArg = serverSetArg;
+ this.intervalArg = intervalArg;
+ this.regionIdArg = regionIdArg;
+ this.streamRegexArg = streamRegexArg;
+ this.instanceIdArg = instanceIdArg;
+ this.totalInstancesArg = totalInstancesArg;
+ this.heartbeatEveryChecksArg = heartbeatEveryChecksArg;
+ this.handshakeWithClientInfoArg = handshakeWithClientInfoArg;
+ this.watchNamespaceChangesArg = watchNamespaceChangesArg;
+ this.isThriftMuxArg = isThriftMuxArg;
+
+ // Stats
+ this.statsReceiver = statsReceiver;
+ this.monitorReceiver = statsReceiver.scope("monitor");
+ this.successStat = monitorReceiver.stat0("success");
+ this.failureStat = monitorReceiver.stat0("failure");
+ this.statsProvider = statsProvider;
+ this.numOfStreamsGauge = new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ return knownStreams.size();
+ }
+ };
+ }
+
+ public void runServer() throws IllegalArgumentException, IOException {
+ checkArgument(uriArg.isPresent(),
+ "No distributedlog uri provided.");
+ checkArgument(serverSetArg.isPresent(),
+ "No proxy server set provided.");
+ if (intervalArg.isPresent()) {
+ interval = intervalArg.get();
+ }
+ if (regionIdArg.isPresent()) {
+ regionId = regionIdArg.get();
+ }
+ if (streamRegexArg.isPresent()) {
+ streamRegex = streamRegexArg.get();
+ }
+ if (instanceIdArg.isPresent()) {
+ instanceId = instanceIdArg.get();
+ }
+ if (totalInstancesArg.isPresent()) {
+ totalInstances = totalInstancesArg.get();
+ }
+ if (heartbeatEveryChecksArg.isPresent()) {
+ heartbeatEveryChecks = heartbeatEveryChecksArg.get();
+ }
+ if (instanceId < 0 || totalInstances <= 0 || instanceId >= totalInstances) {
+ throw new IllegalArgumentException("Invalid instance id or total instances number.");
+ }
+ handshakeWithClientInfo = handshakeWithClientInfoArg.isPresent();
+ watchNamespaceChanges = watchNamespaceChangesArg.isPresent();
+ isThriftMux = isThriftMuxArg.isPresent();
+ URI uri = URI.create(uriArg.get());
+ DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
+ if (confFileArg.isPresent()) {
+ String configFile = confFileArg.get();
+ try {
+ dlConf.loadConf(new File(configFile).toURI().toURL());
+ } catch (ConfigurationException e) {
+ throw new IOException("Failed to load distributedlog configuration from " + configFile + ".");
+ } catch (MalformedURLException e) {
+ throw new IOException("Failed to load distributedlog configuration from malformed "
+ + configFile + ".");
+ }
+ }
+ logger.info("Starting stats provider : {}.", statsProvider.getClass());
+ statsProvider.start(dlConf);
+ String[] serverSetPaths = StringUtils.split(serverSetArg.get(), ",");
+ if (serverSetPaths.length == 0) {
+ throw new IllegalArgumentException("Invalid serverset paths provided : " + serverSetArg.get());
+ }
+
+ ServerSet[] serverSets = createServerSets(serverSetPaths);
+ ServerSet local = serverSets[0];
+ ServerSet[] remotes = new ServerSet[serverSets.length - 1];
+ System.arraycopy(serverSets, 1, remotes, 0, remotes.length);
+
+ ClientBuilder finagleClientBuilder = ClientBuilder.get()
+ .connectTimeout(Duration.fromSeconds(1))
+ .tcpConnectTimeout(Duration.fromSeconds(1))
+ .requestTimeout(Duration.fromSeconds(2))
+ .keepAlive(true)
+ .failFast(false);
+
+ if (!isThriftMux) {
+ finagleClientBuilder = finagleClientBuilder
+ .hostConnectionLimit(2)
+ .hostConnectionCoresize(2);
+ }
+
+ dlClient = DistributedLogClientBuilder.newBuilder()
+ .name("monitor")
+ .thriftmux(isThriftMux)
+ .clientId(ClientId$.MODULE$.apply("monitor"))
+ .redirectBackoffMaxMs(50)
+ .redirectBackoffStartMs(100)
+ .requestTimeoutMs(2000)
+ .maxRedirects(2)
+ .serverSets(local, remotes)
+ .streamNameRegex(streamRegex)
+ .handshakeWithClientInfo(handshakeWithClientInfo)
+ .clientBuilder(finagleClientBuilder)
+ .statsReceiver(monitorReceiver.scope("client"))
+ .buildMonitorClient();
+ runMonitor(dlConf, uri);
+ }
+
+ ServerSet[] createServerSets(String[] serverSetPaths) {
+ ServerSet[] serverSets = new ServerSet[serverSetPaths.length];
+ zkServerSets = new DLZkServerSet[serverSetPaths.length];
+ for (int i = 0; i < serverSetPaths.length; i++) {
+ String serverSetPath = serverSetPaths[i];
+ zkServerSets[i] = parseServerSet(serverSetPath);
+ serverSets[i] = zkServerSets[i].getServerSet();
+ }
+ return serverSets;
+ }
+
+ protected DLZkServerSet parseServerSet(String serverSetPath) {
+ return DLZkServerSet.of(URI.create(serverSetPath), 60000);
+ }
+
+ @Override
+ public void onStreamsChanged(Iterator<String> streams) {
+ Set<String> newSet = new HashSet<String>();
+ while (streams.hasNext()) {
+ String s = streams.next();
+ if (null == streamRegex || s.matches(streamRegex)) {
+ if (Math.abs(hashFunction.hashUnencodedChars(s).asInt()) % totalInstances == instanceId) {
+ newSet.add(s);
+ }
+ }
+ }
+ List<StreamChecker> tasksToCancel = new ArrayList<StreamChecker>();
+ synchronized (knownStreams) {
+ Set<String> knownStreamSet = new HashSet<String>(knownStreams.keySet());
+ Set<String> removedStreams = Sets.difference(knownStreamSet, newSet).immutableCopy();
+ Set<String> addedStreams = Sets.difference(newSet, knownStreamSet).immutableCopy();
+ for (String s : removedStreams) {
+ StreamChecker task = knownStreams.remove(s);
+ if (null != task) {
+ logger.info("Removed stream {}", s);
+ tasksToCancel.add(task);
+ }
+ }
+ for (String s : addedStreams) {
+ if (!knownStreams.containsKey(s)) {
+ logger.info("Added stream {}", s);
+ StreamChecker sc = new StreamChecker(s);
+ knownStreams.put(s, sc);
+ sc.run();
+ }
+ }
+ }
+ for (StreamChecker sc : tasksToCancel) {
+ sc.close();
+ }
+ }
+
+ void runMonitor(DistributedLogConfiguration conf, URI dlUri) throws IOException {
+ // stats
+ statsProvider.getStatsLogger("monitor").registerGauge("num_streams", numOfStreamsGauge);
+ logger.info("Construct dl namespace @ {}", dlUri);
+ dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
+ .conf(conf)
+ .uri(dlUri)
+ .build();
+ if (watchNamespaceChanges) {
+ dlNamespace.registerNamespaceListener(this);
+ } else {
+ onStreamsChanged(dlNamespace.getLogs());
+ }
+ }
+
+ /**
+ * Close the server.
+ */
+ public void close() {
+ logger.info("Closing monitor service.");
+ if (null != dlClient) {
+ dlClient.close();
+ }
+ if (null != zkServerSets) {
+ for (DLZkServerSet zkServerSet : zkServerSets) {
+ zkServerSet.close();
+ }
+ }
+ if (null != dlNamespace) {
+ dlNamespace.close();
+ }
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ logger.error("Interrupted on waiting shutting down monitor executor service : ", e);
+ }
+ if (null != statsProvider) {
+ // clean up the gauges
+ unregisterGauge();
+ statsProvider.stop();
+ }
+ keepAliveLatch.countDown();
+ logger.info("Closed monitor service.");
+ }
+
+ public void join() throws InterruptedException {
+ keepAliveLatch.await();
+ }
+
+ /**
+ * clean up the gauge before we close to help GC.
+ */
+ private void unregisterGauge(){
+ statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", numOfStreamsGauge);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java
new file mode 100644
index 0000000..1f45b13
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static org.apache.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
+import static org.apache.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
+import static org.apache.distributedlog.util.CommandLineUtils.getOptionalStringArg;
+
+import com.twitter.finagle.stats.NullStatsReceiver;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.io.IOException;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The launcher to run monitor service.
+ */
+public class MonitorServiceApp {
+
+ private static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class);
+
+ static final String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]";
+
+ final String[] args;
+ final Options options = new Options();
+
+ private MonitorServiceApp(String[] args) {
+ this.args = args;
+ // prepare options
+ options.addOption("u", "uri", true, "DistributedLog URI");
+ options.addOption("c", "conf", true, "DistributedLog Configuration File");
+ options.addOption("s", "serverset", true, "Proxy Server Set");
+ options.addOption("i", "interval", true, "Check interval");
+ options.addOption("d", "region", true, "Region ID");
+ options.addOption("p", "provider", true, "DistributedLog Stats Provider");
+ options.addOption("f", "filter", true, "Filter streams by regex");
+ options.addOption("w", "watch", false, "Watch stream changes under a given namespace");
+ options.addOption("n", "instance_id", true, "Instance ID");
+ options.addOption("t", "total_instances", true, "Total instances");
+ options.addOption("hck", "heartbeat-num-checks", true, "Send a heartbeat after num checks");
+ options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info");
+ }
+
+ void printUsage() {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp(USAGE, options);
+ }
+
+ private void run() {
+ try {
+ logger.info("Running monitor service.");
+ BasicParser parser = new BasicParser();
+ CommandLine cmdline = parser.parse(options, args);
+ runCmd(cmdline);
+ } catch (ParseException pe) {
+ printUsage();
+ Runtime.getRuntime().exit(-1);
+ } catch (IOException ie) {
+ logger.error("Failed to start monitor service : ", ie);
+ Runtime.getRuntime().exit(-1);
+ }
+ }
+
+ void runCmd(CommandLine cmdline) throws IOException {
+ StatsProvider statsProvider = new NullStatsProvider();
+ if (cmdline.hasOption("p")) {
+ String providerClass = cmdline.getOptionValue("p");
+ statsProvider = ReflectionUtils.newInstance(providerClass, StatsProvider.class);
+ }
+ StatsReceiver statsReceiver = NullStatsReceiver.get();
+
+ final MonitorService monitorService = new MonitorService(
+ getOptionalStringArg(cmdline, "u"),
+ getOptionalStringArg(cmdline, "c"),
+ getOptionalStringArg(cmdline, "s"),
+ getOptionalIntegerArg(cmdline, "i"),
+ getOptionalIntegerArg(cmdline, "d"),
+ getOptionalStringArg(cmdline, "f"),
+ getOptionalIntegerArg(cmdline, "n"),
+ getOptionalIntegerArg(cmdline, "t"),
+ getOptionalIntegerArg(cmdline, "hck"),
+ getOptionalBooleanArg(cmdline, "hsci"),
+ getOptionalBooleanArg(cmdline, "w"),
+ getOptionalBooleanArg(cmdline, "mx"),
+ statsReceiver,
+ statsProvider);
+
+ monitorService.runServer();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ logger.info("Closing monitor service.");
+ monitorService.close();
+ logger.info("Closed monitor service.");
+ }
+ });
+ try {
+ monitorService.join();
+ } catch (InterruptedException ie) {
+ logger.warn("Interrupted when waiting monitor service to be finished : ", ie);
+ }
+ }
+
+ public static void main(String[] args) {
+ final MonitorServiceApp launcher = new MonitorServiceApp(args);
+ launcher.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
new file mode 100644
index 0000000..08f4b41
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
+import org.apache.distributedlog.thrift.service.BulkWriteResponse;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+
+/**
+ * Utility methods for building write proxy service responses.
+ */
+public class ResponseUtils {
+ public static ResponseHeader deniedHeader() {
+ return new ResponseHeader(StatusCode.REQUEST_DENIED);
+ }
+
+ public static ResponseHeader streamUnavailableHeader() {
+ return new ResponseHeader(StatusCode.STREAM_UNAVAILABLE);
+ }
+
+ public static ResponseHeader successHeader() {
+ return new ResponseHeader(StatusCode.SUCCESS);
+ }
+
+ public static ResponseHeader ownerToHeader(String owner) {
+ return new ResponseHeader(StatusCode.FOUND).setLocation(owner);
+ }
+
+ public static ResponseHeader exceptionToHeader(Throwable t) {
+ ResponseHeader response = new ResponseHeader();
+ if (t instanceof DLException) {
+ DLException dle = (DLException) t;
+ if (dle instanceof OwnershipAcquireFailedException) {
+ response.setLocation(((OwnershipAcquireFailedException) dle).getCurrentOwner());
+ }
+ response.setCode(StatusCode.findByValue(dle.getCode()));
+ response.setErrMsg(dle.getMessage());
+ } else {
+ response.setCode(StatusCode.INTERNAL_SERVER_ERROR);
+ response.setErrMsg("Internal server error : " + t.getMessage());
+ }
+ return response;
+ }
+
+ public static WriteResponse write(ResponseHeader responseHeader) {
+ return new WriteResponse(responseHeader);
+ }
+
+ public static WriteResponse writeSuccess() {
+ return new WriteResponse(successHeader());
+ }
+
+ public static WriteResponse writeDenied() {
+ return new WriteResponse(deniedHeader());
+ }
+
+ public static BulkWriteResponse bulkWrite(ResponseHeader responseHeader) {
+ return new BulkWriteResponse(responseHeader);
+ }
+
+ public static BulkWriteResponse bulkWriteSuccess() {
+ return new BulkWriteResponse(successHeader());
+ }
+
+ public static BulkWriteResponse bulkWriteDenied() {
+ return new BulkWriteResponse(deniedHeader());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java
new file mode 100644
index 0000000..436145d
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+/**
+ * List of feature keys used by distributedlog server.
+ */
+public enum ServerFeatureKeys {
+
+ REGION_STOP_ACCEPT_NEW_STREAM,
+ SERVICE_RATE_LIMIT_DISABLED,
+ SERVICE_CHECKSUM_DISABLED,
+ SERVICE_GLOBAL_LIMITER_DISABLED
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java
new file mode 100644
index 0000000..ee64580
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import com.google.common.base.Stopwatch;
+import com.twitter.finagle.Service;
+import com.twitter.finagle.SimpleFilter;
+import com.twitter.util.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * Track distributedlog server finagle-service stats.
+ */
+class StatsFilter<Req, Rep> extends SimpleFilter<Req, Rep> {
+
+ private final StatsLogger stats;
+ private final Counter outstandingAsync;
+ private final OpStatsLogger serviceExec;
+
+ @Override
+ public Future<Rep> apply(Req req, Service<Req, Rep> service) {
+ Future<Rep> result = null;
+ outstandingAsync.inc();
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ result = service.apply(req);
+ serviceExec.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ } finally {
+ outstandingAsync.dec();
+ if (null == result) {
+ serviceExec.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
+ }
+ }
+ return result;
+ }
+
+ public StatsFilter(StatsLogger stats) {
+ this.stats = stats;
+ this.outstandingAsync = stats.getCounter("outstandingAsync");
+ this.serviceExec = stats.getOpStatsLogger("serviceExec");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java
new file mode 100644
index 0000000..ee64fc7
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.announcer;
+
+import java.io.IOException;
+
+/**
+ * Announce service information.
+ */
+public interface Announcer {
+
+ /**
+ * Announce service info.
+ */
+ void announce() throws IOException;
+
+ /**
+ * Unannounce the service info.
+ */
+ void unannounce() throws IOException;
+
+ /**
+ * Close the announcer.
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java
new file mode 100644
index 0000000..5a1277a
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.announcer;
+
+import java.io.IOException;
+
+/**
+ * A no-op implementation of {@link Announcer}.
+ */
+public class NOPAnnouncer implements Announcer {
+ @Override
+ public void announce() throws IOException {
+ // nop
+ }
+
+ @Override
+ public void unannounce() throws IOException {
+ // nop
+ }
+
+ @Override
+ public void close() {
+ // nop
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java
new file mode 100644
index 0000000..df4a8e2
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.announcer;
+
+import com.twitter.common.zookeeper.Group;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ServerSet based announcer.
+ */
+public class ServerSetAnnouncer implements Announcer {
+
+ private static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class);
+
+ final String localAddr;
+ final InetSocketAddress serviceEndpoint;
+ final Map<String, InetSocketAddress> additionalEndpoints;
+ final int shardId;
+
+ // ServerSet
+ DLZkServerSet zkServerSet;
+
+ // Service Status
+ ServerSet.EndpointStatus serviceStatus = null;
+
+ /**
+ * Announce server infos.
+ *
+ * @param servicePort
+ * service port
+ * @param statsPort
+ * stats port
+ * @param shardId
+ * shard id
+ */
+ public ServerSetAnnouncer(URI uri,
+ int servicePort,
+ int statsPort,
+ int shardId) throws UnknownHostException {
+ this.shardId = shardId;
+ this.localAddr = InetAddress.getLocalHost().getHostAddress();
+ // service endpoint
+ this.serviceEndpoint = new InetSocketAddress(localAddr, servicePort);
+ // stats endpoint
+ InetSocketAddress statsEndpoint = new InetSocketAddress(localAddr, statsPort);
+ this.additionalEndpoints = new HashMap<String, InetSocketAddress>();
+ this.additionalEndpoints.put("aurora", statsEndpoint);
+ this.additionalEndpoints.put("stats", statsEndpoint);
+ this.additionalEndpoints.put("service", serviceEndpoint);
+ this.additionalEndpoints.put("thrift", serviceEndpoint);
+
+ // Create zookeeper and server set
+ this.zkServerSet = DLZkServerSet.of(uri, 60000);
+ }
+
+ @Override
+ public synchronized void announce() throws IOException {
+ try {
+ serviceStatus =
+ zkServerSet.getServerSet().join(serviceEndpoint, additionalEndpoints, shardId);
+ } catch (Group.JoinException e) {
+ throw new IOException("Failed to announce service : ", e);
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted on announcing service : ", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public synchronized void unannounce() throws IOException {
+ if (null == serviceStatus) {
+ logger.warn("No service to unannounce.");
+ return;
+ }
+ try {
+ serviceStatus.leave();
+ } catch (ServerSet.UpdateException e) {
+ throw new IOException("Failed to unannounce service : ", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ zkServerSet.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/package-info.java
new file mode 100644
index 0000000..6559bb3
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Announcers to announce servers to server set.
+ */
+package org.apache.distributedlog.service.announcer;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java
new file mode 100644
index 0000000..cdffaa3
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * Balancer Interface.
+ *
+ * <p>A balancer is used for balance the streams across the proxy cluster.
+ */
+public interface Balancer {
+
+ /**
+ * Rebalance all the streams from <i>source</i> to others.
+ *
+ * @param source
+ * source target name.
+ * @param rebalanceConcurrency
+ * the concurrency to move streams for re-balance.
+ * @param rebalanceRateLimiter
+ * the rate limiting to move streams for re-balance.
+ */
+ void balanceAll(String source,
+ int rebalanceConcurrency,
+ Optional<RateLimiter> rebalanceRateLimiter);
+
+ /**
+ * Balance the streams across all targets.
+ *
+ * @param rebalanceWaterMark
+ * rebalance water mark. if number of streams of a given target is less than
+ * the water mark, no streams will be re-balanced from this target.
+ * @param rebalanceTolerancePercentage
+ * tolerance percentage for the balancer. if number of streams of a given target is
+ * less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will
+ * be re-balanced from that target.
+ * @param rebalanceConcurrency
+ * the concurrency to move streams for re-balance.
+ * @param rebalanceRateLimiter
+ * the rate limiting to move streams for re-balance.
+ */
+ void balance(int rebalanceWaterMark,
+ double rebalanceTolerancePercentage,
+ int rebalanceConcurrency,
+ Optional<RateLimiter> rebalanceRateLimiter);
+
+ /**
+ * Close the balancer.
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java
new file mode 100644
index 0000000..964c1cc
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.client.serverset.DLZkServerSet;
+import org.apache.distributedlog.impl.BKNamespaceDriver;
+import org.apache.distributedlog.service.ClientUtils;
+import org.apache.distributedlog.service.DLSocketAddress;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.service.DistributedLogClientBuilder;
+import org.apache.distributedlog.tools.Tool;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tool to rebalance cluster.
+ */
+public class BalancerTool extends Tool {
+
+ private static final Logger logger = LoggerFactory.getLogger(BalancerTool.class);
+
+ static DistributedLogClientBuilder createDistributedLogClientBuilder(ServerSet serverSet) {
+ return DistributedLogClientBuilder.newBuilder()
+ .name("rebalancer_tool")
+ .clientId(ClientId$.MODULE$.apply("rebalancer_tool"))
+ .maxRedirects(2)
+ .serverSet(serverSet)
+ .clientBuilder(ClientBuilder.get()
+ .connectionTimeout(Duration.fromSeconds(2))
+ .tcpConnectTimeout(Duration.fromSeconds(2))
+ .requestTimeout(Duration.fromSeconds(10))
+ .hostConnectionLimit(1)
+ .hostConnectionCoresize(1)
+ .keepAlive(true)
+ .failFast(false));
+ }
+
+ /**
+ * Base Command to run balancer.
+ */
+ protected abstract static class BalancerCommand extends OptsCommand {
+
+ protected Options options = new Options();
+ protected int rebalanceWaterMark = 0;
+ protected double rebalanceTolerancePercentage = 0.0f;
+ protected int rebalanceConcurrency = 1;
+ protected Double rate = null;
+ protected Optional<RateLimiter> rateLimiter;
+
+ BalancerCommand(String name, String description) {
+ super(name, description);
+ options.addOption("rwm", "rebalance-water-mark", true, "Rebalance water mark per proxy");
+ options.addOption("rtp", "rebalance-tolerance-percentage", true,
+ "Rebalance tolerance percentage per proxy");
+ options.addOption("rc", "rebalance-concurrency", true, "Concurrency to rebalance stream distribution");
+ options.addOption("r", "rate", true, "Rebalance rate");
+ }
+
+ Optional<RateLimiter> getRateLimiter() {
+ return rateLimiter;
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+ if (cmdline.hasOption("rwm")) {
+ this.rebalanceWaterMark = Integer.parseInt(cmdline.getOptionValue("rwm"));
+ }
+ if (cmdline.hasOption("rtp")) {
+ this.rebalanceTolerancePercentage = Double.parseDouble(cmdline.getOptionValue("rtp"));
+ }
+ if (cmdline.hasOption("rc")) {
+ this.rebalanceConcurrency = Integer.parseInt(cmdline.getOptionValue("rc"));
+ }
+ if (cmdline.hasOption("r")) {
+ this.rate = Double.parseDouble(cmdline.getOptionValue("r"));
+ }
+ checkArgument(rebalanceWaterMark >= 0,
+ "Rebalance Water Mark should be a non-negative number");
+ checkArgument(rebalanceTolerancePercentage >= 0.0f,
+ "Rebalance Tolerance Percentage should be a non-negative number");
+ checkArgument(rebalanceConcurrency > 0,
+ "Rebalance Concurrency should be a positive number");
+ if (null == rate || rate <= 0.0f) {
+ rateLimiter = Optional.absent();
+ } else {
+ rateLimiter = Optional.of(RateLimiter.create(rate));
+ }
+ }
+
+ @Override
+ protected int runCmd(CommandLine cmdline) throws Exception {
+ try {
+ parseCommandLine(cmdline);
+ } catch (ParseException pe) {
+ println("ERROR: fail to parse commandline : '" + pe.getMessage() + "'");
+ printUsage();
+ return -1;
+ }
+ return executeCommand(cmdline);
+ }
+
+ protected abstract int executeCommand(CommandLine cmdline) throws Exception;
+ }
+
+ /**
+ * Command to balance streams within a cluster.
+ */
+ protected static class ClusterBalancerCommand extends BalancerCommand {
+
+ protected URI uri;
+ protected String source = null;
+
+ protected ClusterBalancerCommand() {
+ super("clusterbalancer", "Balance streams inside a cluster");
+ options.addOption("u", "uri", true, "DistributedLog URI");
+ options.addOption("sp", "source-proxy", true, "Source proxy to balance");
+ }
+
+ @Override
+ protected String getUsage() {
+ return "clusterbalancer [options]";
+ }
+
+ protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+ super.parseCommandLine(cmdline);
+ if (!cmdline.hasOption("u")) {
+ throw new ParseException("No proxy serverset provided.");
+ }
+ uri = URI.create(cmdline.getOptionValue("u"));
+ if (cmdline.hasOption("sp")) {
+ String sourceProxyStr = cmdline.getOptionValue("sp");
+ try {
+ DLSocketAddress.parseSocketAddress(sourceProxyStr);
+ } catch (IllegalArgumentException iae) {
+ throw new ParseException("Invalid source proxy " + sourceProxyStr + " : " + iae.getMessage());
+ }
+ this.source = sourceProxyStr;
+ }
+ }
+
+ @Override
+ protected int executeCommand(CommandLine cmdline) throws Exception {
+ DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000);
+ logger.info("Created serverset for {}", uri);
+ try {
+ DistributedLogClientBuilder clientBuilder =
+ createDistributedLogClientBuilder(serverSet.getServerSet());
+ ClusterBalancer balancer = new ClusterBalancer(clientBuilder);
+ try {
+ return runBalancer(clientBuilder, balancer);
+ } finally {
+ balancer.close();
+ }
+ } finally {
+ serverSet.close();
+ }
+ }
+
+ protected int runBalancer(DistributedLogClientBuilder clientBuilder,
+ ClusterBalancer balancer)
+ throws Exception {
+ if (null == source) {
+ balancer.balance(
+ rebalanceWaterMark,
+ rebalanceTolerancePercentage,
+ rebalanceConcurrency,
+ getRateLimiter());
+ } else {
+ balanceFromSource(clientBuilder, balancer, source, getRateLimiter());
+ }
+ return 0;
+ }
+
+ protected void balanceFromSource(DistributedLogClientBuilder clientBuilder,
+ ClusterBalancer balancer,
+ String source,
+ Optional<RateLimiter> rateLimiter)
+ throws Exception {
+ InetSocketAddress sourceAddr = DLSocketAddress.parseSocketAddress(source);
+ DistributedLogClientBuilder sourceClientBuilder =
+ DistributedLogClientBuilder.newBuilder(clientBuilder)
+ .host(sourceAddr);
+
+ Pair<DistributedLogClient, MonitorServiceClient> clientPair =
+ ClientUtils.buildClient(sourceClientBuilder);
+ try {
+ Await.result(clientPair.getRight().setAcceptNewStream(false));
+ logger.info("Disable accepting new stream on proxy {}.", source);
+ balancer.balanceAll(source, rebalanceConcurrency, rateLimiter);
+ } finally {
+ clientPair.getLeft().close();
+ }
+ }
+ }
+
+ /**
+ * Command to balance streams between regions.
+ */
+ protected static class RegionBalancerCommand extends BalancerCommand {
+
+ protected URI region1;
+ protected URI region2;
+ protected String source = null;
+
+ protected RegionBalancerCommand() {
+ super("regionbalancer", "Balance streams between regions");
+ options.addOption("rs", "regions", true, "DistributedLog Region URI: uri1[,uri2]");
+ options.addOption("s", "source", true, "DistributedLog Source Region to balance");
+ }
+
+ @Override
+ protected String getUsage() {
+ return "regionbalancer [options]";
+ }
+
+ @Override
+ protected void parseCommandLine(CommandLine cmdline) throws ParseException {
+ super.parseCommandLine(cmdline);
+ if (!cmdline.hasOption("rs")) {
+ throw new ParseException("No regions provided.");
+ }
+ String regionsStr = cmdline.getOptionValue("rs");
+ String[] regions = regionsStr.split(",");
+ if (regions.length != 2) {
+ throw new ParseException("Invalid regions provided. Expected : serverset1[,serverset2]");
+ }
+ region1 = URI.create(regions[0]);
+ region2 = URI.create(regions[1]);
+ if (cmdline.hasOption("s")) {
+ source = cmdline.getOptionValue("s");
+ }
+ }
+
+ @Override
+ protected int executeCommand(CommandLine cmdline) throws Exception {
+ DLZkServerSet serverSet1 = DLZkServerSet.of(region1, 60000);
+ logger.info("Created serverset for {}", region1);
+ DLZkServerSet serverSet2 = DLZkServerSet.of(region2, 60000);
+ logger.info("Created serverset for {}", region2);
+ try {
+ DistributedLogClientBuilder builder1 =
+ createDistributedLogClientBuilder(serverSet1.getServerSet());
+ Pair<DistributedLogClient, MonitorServiceClient> pair1 =
+ ClientUtils.buildClient(builder1);
+ DistributedLogClientBuilder builder2 =
+ createDistributedLogClientBuilder(serverSet2.getServerSet());
+ Pair<DistributedLogClient, MonitorServiceClient> pair2 =
+ ClientUtils.buildClient(builder2);
+ try {
+ SimpleBalancer balancer = new SimpleBalancer(
+ BKNamespaceDriver.getZKServersFromDLUri(region1), pair1.getLeft(), pair1.getRight(),
+ BKNamespaceDriver.getZKServersFromDLUri(region2), pair2.getLeft(), pair2.getRight());
+ try {
+ return runBalancer(balancer);
+ } finally {
+ balancer.close();
+ }
+ } finally {
+ pair1.getLeft().close();
+ pair2.getLeft().close();
+ }
+ } finally {
+ serverSet1.close();
+ serverSet2.close();
+ }
+ }
+
+ protected int runBalancer(SimpleBalancer balancer) throws Exception {
+ if (null == source) {
+ balancer.balance(
+ rebalanceWaterMark,
+ rebalanceTolerancePercentage,
+ rebalanceConcurrency,
+ getRateLimiter());
+ } else {
+ balancer.balanceAll(source, rebalanceConcurrency, getRateLimiter());
+ }
+ return 0;
+ }
+ }
+
+ public BalancerTool() {
+ super();
+ addCommand(new ClusterBalancerCommand());
+ addCommand(new RegionBalancerCommand());
+ }
+
+ @Override
+ protected String getName() {
+ return "balancer";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java
new file mode 100644
index 0000000..4c9e075
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import java.util.Map;
+
+/**
+ * Utils for balancer.
+ */
+public class BalancerUtils {
+
+ /**
+ * Util function to calculate how many streams to balance for <i>nodeToRebalance</i>,
+ * based on the load distribution <i>loadDistribution</i>.
+ *
+ * @param nodeToRebalance
+ * node to rebalance
+ * @param loadDistribution
+ * load distribution map
+ * @param rebalanceWaterMark
+ * if number of streams of <i>nodeToRebalance</i>
+ * is less than <i>rebalanceWaterMark</i>, no streams will be re-balanced.
+ * @param tolerancePercentage
+ * tolerance percentage for the balancer. if number of streams of <i>nodeToRebalance</i>
+ * is less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will
+ * be re-balanced.
+ * @param <K>
+ * @return number of streams to rebalance
+ */
+ public static <K> int calculateNumStreamsToRebalance(K nodeToRebalance,
+ Map<K, Integer> loadDistribution,
+ int rebalanceWaterMark,
+ double tolerancePercentage) {
+ Integer myLoad = loadDistribution.get(nodeToRebalance);
+ if (null == myLoad || myLoad <= rebalanceWaterMark) {
+ return 0;
+ }
+
+ long totalLoad = 0L;
+ int numNodes = loadDistribution.size();
+
+ for (Map.Entry<K, Integer> entry : loadDistribution.entrySet()) {
+ if (null == entry.getKey() || null == entry.getValue()) {
+ continue;
+ }
+ totalLoad += entry.getValue();
+ }
+
+ double averageLoad = ((double) totalLoad) / numNodes;
+ long permissibleLoad =
+ Math.max(1L, (long) Math.ceil(averageLoad + averageLoad * tolerancePercentage / 100.0f));
+
+ if (myLoad <= permissibleLoad) {
+ return 0;
+ }
+
+ return Math.max(0, myLoad - (int) Math.ceil(averageLoad));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java
new file mode 100644
index 0000000..5add339
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.RateLimiter;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.service.ClientUtils;
+import org.apache.distributedlog.service.DLSocketAddress;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.service.DistributedLogClientBuilder;
+import com.twitter.util.Await;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import java.io.Serializable;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A balancer balances ownerships with a cluster of targets.
+ */
+public class ClusterBalancer implements Balancer {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class);
+
+ /**
+ * Represent a single host. Ordered by number of streams in desc order.
+ */
+ static class Host {
+
+ final SocketAddress address;
+ final Set<String> streams;
+ final DistributedLogClientBuilder clientBuilder;
+ DistributedLogClient client = null;
+ MonitorServiceClient monitor = null;
+
+ Host(SocketAddress address, Set<String> streams,
+ DistributedLogClientBuilder clientBuilder) {
+ this.address = address;
+ this.streams = streams;
+ this.clientBuilder = clientBuilder;
+ }
+
+ private void initializeClientsIfNeeded() {
+ if (null == client) {
+ Pair<DistributedLogClient, MonitorServiceClient> clientPair =
+ createDistributedLogClient(address, clientBuilder);
+ client = clientPair.getLeft();
+ monitor = clientPair.getRight();
+ }
+ }
+
+ synchronized DistributedLogClient getClient() {
+ initializeClientsIfNeeded();
+ return client;
+ }
+
+ synchronized MonitorServiceClient getMonitor() {
+ initializeClientsIfNeeded();
+ return monitor;
+ }
+
+ synchronized void close() {
+ if (null != client) {
+ client.close();
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Host(").append(address).append(")");
+ return sb.toString();
+ }
+ }
+
+ static class HostComparator implements Comparator<Host>, Serializable {
+ private static final long serialVersionUID = 7984973796525102538L;
+
+ @Override
+ public int compare(Host h1, Host h2) {
+ return h2.streams.size() - h1.streams.size();
+ }
+ }
+
+ protected final DistributedLogClientBuilder clientBuilder;
+ protected final DistributedLogClient client;
+ protected final MonitorServiceClient monitor;
+
+ public ClusterBalancer(DistributedLogClientBuilder clientBuilder) {
+ this(clientBuilder, ClientUtils.buildClient(clientBuilder));
+ }
+
+ ClusterBalancer(DistributedLogClientBuilder clientBuilder,
+ Pair<DistributedLogClient, MonitorServiceClient> clientPair) {
+ this.clientBuilder = clientBuilder;
+ this.client = clientPair.getLeft();
+ this.monitor = clientPair.getRight();
+ }
+
+ /**
+ * Build a new distributedlog client to a single host <i>host</i>.
+ *
+ * @param host
+ * host to access
+ * @return distributedlog clients
+ */
+ static Pair<DistributedLogClient, MonitorServiceClient> createDistributedLogClient(
+ SocketAddress host, DistributedLogClientBuilder clientBuilder) {
+ DistributedLogClientBuilder newBuilder =
+ DistributedLogClientBuilder.newBuilder(clientBuilder).host(host);
+ return ClientUtils.buildClient(newBuilder);
+ }
+
+ @Override
+ public void balanceAll(String source,
+ int rebalanceConcurrency, /* unused */
+ Optional<RateLimiter> rebalanceRateLimiter) {
+ balance(0, 0.0f, rebalanceConcurrency, Optional.of(source), rebalanceRateLimiter);
+ }
+
+ @Override
+ public void balance(int rebalanceWaterMark,
+ double rebalanceTolerancePercentage,
+ int rebalanceConcurrency, /* unused */
+ Optional<RateLimiter> rebalanceRateLimiter) {
+ Optional<String> source = Optional.absent();
+ balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, source, rebalanceRateLimiter);
+ }
+
+ public void balance(int rebalanceWaterMark,
+ double rebalanceTolerancePercentage,
+ int rebalanceConcurrency,
+ Optional<String> source,
+ Optional<RateLimiter> rebalanceRateLimiter) {
+ Map<SocketAddress, Set<String>> distribution = monitor.getStreamOwnershipDistribution();
+ if (distribution.size() <= 1) {
+ return;
+ }
+ SocketAddress sourceAddr = null;
+ if (source.isPresent()) {
+ sourceAddr = DLSocketAddress.parseSocketAddress(source.get());
+ logger.info("Balancer source is {}", sourceAddr);
+ if (!distribution.containsKey(sourceAddr)) {
+ return;
+ }
+ }
+ // Get the list of hosts ordered by number of streams in DESC order
+ List<Host> hosts = new ArrayList<Host>(distribution.size());
+ for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) {
+ Host host = new Host(entry.getKey(), entry.getValue(), clientBuilder);
+ hosts.add(host);
+ }
+ Collections.sort(hosts, new HostComparator());
+ try {
+
+ // find the host to move streams from.
+ int hostIdxMoveFrom = -1;
+ if (null != sourceAddr) {
+ for (Host host : hosts) {
+ ++hostIdxMoveFrom;
+ if (sourceAddr.equals(host.address)) {
+ break;
+ }
+ }
+ }
+
+ // compute the average load.
+ int totalStream = 0;
+ for (Host host : hosts) {
+ totalStream += host.streams.size();
+ }
+ double averageLoad;
+ if (hostIdxMoveFrom >= 0) {
+ averageLoad = ((double) totalStream / (hosts.size() - 1));
+ } else {
+ averageLoad = ((double) totalStream / hosts.size());
+ }
+
+ int moveFromLowWaterMark;
+ int moveToHighWaterMark =
+ Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f));
+
+ if (hostIdxMoveFrom >= 0) {
+ moveFromLowWaterMark = Math.max(0, rebalanceWaterMark);
+ moveStreams(
+ hosts,
+ new AtomicInteger(hostIdxMoveFrom), moveFromLowWaterMark,
+ new AtomicInteger(hosts.size() - 1), moveToHighWaterMark,
+ rebalanceRateLimiter);
+ moveRemainingStreamsFromSource(hosts.get(hostIdxMoveFrom), hosts, rebalanceRateLimiter);
+ } else {
+ moveFromLowWaterMark = Math.max((int) Math.ceil(averageLoad), rebalanceWaterMark);
+ AtomicInteger moveFrom = new AtomicInteger(0);
+ AtomicInteger moveTo = new AtomicInteger(hosts.size() - 1);
+ while (moveFrom.get() < moveTo.get()) {
+ moveStreams(hosts, moveFrom, moveFromLowWaterMark,
+ moveTo, moveToHighWaterMark, rebalanceRateLimiter);
+ moveFrom.incrementAndGet();
+ }
+ }
+ } finally {
+ for (Host host : hosts) {
+ host.close();
+ }
+ }
+ }
+
+ void moveStreams(List<Host> hosts,
+ AtomicInteger hostIdxMoveFrom,
+ int moveFromLowWaterMark,
+ AtomicInteger hostIdxMoveTo,
+ int moveToHighWaterMark,
+ Optional<RateLimiter> rateLimiter) {
+ if (hostIdxMoveFrom.get() < 0 || hostIdxMoveFrom.get() >= hosts.size()
+ || hostIdxMoveTo.get() < 0 || hostIdxMoveTo.get() >= hosts.size()
+ || hostIdxMoveFrom.get() >= hostIdxMoveTo.get()) {
+ return;
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Moving streams : hosts = {}, from = {}, to = {} :"
+ + " from_low_water_mark = {}, to_high_water_mark = {}",
+ new Object[] {
+ hosts,
+ hostIdxMoveFrom.get(),
+ hostIdxMoveTo.get(),
+ moveFromLowWaterMark,
+ moveToHighWaterMark });
+ }
+
+ Host hostMoveFrom = hosts.get(hostIdxMoveFrom.get());
+ int numStreamsOnFromHost = hostMoveFrom.streams.size();
+ if (numStreamsOnFromHost <= moveFromLowWaterMark) {
+ // do nothing
+ return;
+ }
+
+ int numStreamsToMove = numStreamsOnFromHost - moveFromLowWaterMark;
+ LinkedList<String> streamsToMove = new LinkedList<String>(hostMoveFrom.streams);
+ Collections.shuffle(streamsToMove);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Try to move {} streams from host {} : streams = {}",
+ new Object[] { numStreamsToMove, hostMoveFrom.address, streamsToMove });
+ }
+
+ while (numStreamsToMove-- > 0 && !streamsToMove.isEmpty()) {
+ if (rateLimiter.isPresent()) {
+ rateLimiter.get().acquire();
+ }
+
+ // pick a host to move
+ Host hostMoveTo = hosts.get(hostIdxMoveTo.get());
+ while (hostMoveTo.streams.size() >= moveToHighWaterMark) {
+ int hostIdx = hostIdxMoveTo.decrementAndGet();
+ logger.info("move to host : {}, from {}", hostIdx, hostIdxMoveFrom.get());
+ if (hostIdx <= hostIdxMoveFrom.get()) {
+ return;
+ } else {
+ hostMoveTo = hosts.get(hostIdx);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Target host to move moved to host {} @ {}",
+ hostIdx, hostMoveTo);
+ }
+ }
+ }
+
+ // pick a stream
+ String stream = streamsToMove.remove();
+
+ // move the stream
+ if (moveStream(stream, hostMoveFrom, hostMoveTo)) {
+ hostMoveFrom.streams.remove(stream);
+ hostMoveTo.streams.add(stream);
+ }
+ }
+
+ }
+
+ void moveRemainingStreamsFromSource(Host source,
+ List<Host> hosts,
+ Optional<RateLimiter> rateLimiter) {
+ LinkedList<String> streamsToMove = new LinkedList<String>(source.streams);
+ Collections.shuffle(streamsToMove);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Try to move remaining streams from {} : {}", source, streamsToMove);
+ }
+
+ int hostIdx = hosts.size() - 1;
+
+ while (!streamsToMove.isEmpty()) {
+ if (rateLimiter.isPresent()) {
+ rateLimiter.get().acquire();
+ }
+
+ Host target = hosts.get(hostIdx);
+ if (!target.address.equals(source.address)) {
+ String stream = streamsToMove.remove();
+ // move the stream
+ if (moveStream(stream, source, target)) {
+ source.streams.remove(stream);
+ target.streams.add(stream);
+ }
+ }
+ --hostIdx;
+ if (hostIdx < 0) {
+ hostIdx = hosts.size() - 1;
+ }
+ }
+ }
+
+ private boolean moveStream(String stream, Host from, Host to) {
+ try {
+ doMoveStream(stream, from, to);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ private void doMoveStream(final String stream, final Host from, final Host to) throws Exception {
+ logger.info("Moving stream {} from {} to {}.",
+ new Object[] { stream, from.address, to.address });
+ Await.result(from.getClient().release(stream).flatMap(new Function<Void, Future<Void>>() {
+ @Override
+ public Future<Void> apply(Void result) {
+ logger.info("Released stream {} from {}.", stream, from.address);
+ return to.getMonitor().check(stream).addEventListener(new FutureEventListener<Void>() {
+
+ @Override
+ public void onSuccess(Void value) {
+ logger.info("Moved stream {} from {} to {}.",
+ new Object[] { stream, from.address, to.address });
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ logger.info("Failed to move stream {} from {} to {} : ",
+ new Object[] { stream, from.address, to.address, cause });
+ }
+ });
+ }
+ }));
+ }
+
+ @Override
+ public void close() {
+ client.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java
new file mode 100644
index 0000000..6a43179
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * A stream chooser based on number of streams.
+ */
+class CountBasedStreamChooser implements StreamChooser, Serializable,
+ Comparator<Pair<SocketAddress, LinkedList<String>>> {
+
+ private static final long serialVersionUID = 4664153397369979203L;
+
+ final List<Pair<SocketAddress, LinkedList<String>>> streamsDistribution;
+
+ // pivot index in the list of hosts. the chooser will remove streams from the hosts before
+ // pivot, which will reduce their stream counts to make them equal to the stream count of the pivot.
+ int pivot;
+ int pivotCount;
+
+ // next index in the list of hosts to choose stream from.
+ int next;
+
+ CountBasedStreamChooser(Map<SocketAddress, Set<String>> streams) {
+ checkArgument(streams.size() > 0, "Only support no-empty streams distribution");
+ streamsDistribution = new ArrayList<Pair<SocketAddress, LinkedList<String>>>(streams.size());
+ for (Map.Entry<SocketAddress, Set<String>> entry : streams.entrySet()) {
+ LinkedList<String> randomizedStreams = new LinkedList<String>(entry.getValue());
+ Collections.shuffle(randomizedStreams);
+ streamsDistribution.add(Pair.of(entry.getKey(), randomizedStreams));
+ }
+ // sort the hosts by the number of streams in descending order
+ Collections.sort(streamsDistribution, this);
+ pivot = 0;
+ pivotCount = streamsDistribution.get(0).getValue().size();
+ findNextPivot();
+ next = 0;
+ }
+
+ private void findNextPivot() {
+ int prevPivotCount = pivotCount;
+ while (++pivot < streamsDistribution.size()) {
+ pivotCount = streamsDistribution.get(pivot).getValue().size();
+ if (pivotCount < prevPivotCount) {
+ return;
+ }
+ }
+ pivot = streamsDistribution.size();
+ pivotCount = 0;
+ }
+
+ @Override
+ public synchronized String choose() {
+ // reach the pivot
+ if (next == pivot) {
+ if (streamsDistribution.get(next - 1).getRight().size() > pivotCount) {
+ next = 0;
+ } else if (pivotCount == 0) { // the streams are empty now
+ return null;
+ } else {
+ findNextPivot();
+ next = 0;
+ }
+ }
+
+ // get stream count that next host to choose from
+ LinkedList<String> nextStreams = streamsDistribution.get(next).getRight();
+ if (nextStreams.size() == 0) {
+ return null;
+ }
+
+ String chosenStream = nextStreams.remove();
+ ++next;
+ return chosenStream;
+ }
+
+ @Override
+ public int compare(Pair<SocketAddress, LinkedList<String>> o1,
+ Pair<SocketAddress, LinkedList<String>> o2) {
+ return o2.getValue().size() - o1.getValue().size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java
new file mode 100644
index 0000000..4aefc5e
--- /dev/null
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service.balancer;
+
+/**
+ * A stream chooser that can only choose limited number of streams.
+ */
+public class LimitedStreamChooser implements StreamChooser {
+
+ /**
+ * Create a limited stream chooser by {@code limit}.
+ *
+ * @param underlying the underlying stream chooser.
+ * @param limit the limit of number of streams to choose.
+ * @return the limited stream chooser.
+ */
+ public static LimitedStreamChooser of(StreamChooser underlying, int limit) {
+ return new LimitedStreamChooser(underlying, limit);
+ }
+
+ final StreamChooser underlying;
+ int limit;
+
+ LimitedStreamChooser(StreamChooser underlying, int limit) {
+ this.underlying = underlying;
+ this.limit = limit;
+ }
+
+ @Override
+ public synchronized String choose() {
+ if (limit <= 0) {
+ return null;
+ }
+ String s = underlying.choose();
+ if (s == null) {
+ limit = 0;
+ return null;
+ }
+ --limit;
+ return s;
+ }
+}