You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:15 UTC
[08/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorService.java
deleted file mode 100644
index b1e2879..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorService.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogConstants;
-import org.apache.distributedlog.DistributedLogManager;
-import org.apache.distributedlog.LogSegmentMetadata;
-import org.apache.distributedlog.callback.LogSegmentListener;
-import org.apache.distributedlog.callback.NamespaceListener;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.client.serverset.DLZkServerSet;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.stats.Stat;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Duration;
-import com.twitter.util.FutureEventListener;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Monitor Service.
- */
-public class MonitorService implements NamespaceListener {
-
- private static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
-
- private DistributedLogNamespace dlNamespace = null;
- private MonitorServiceClient dlClient = null;
- private DLZkServerSet[] zkServerSets = null;
- private final ScheduledExecutorService executorService =
- Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
- private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
- private final Map<String, StreamChecker> knownStreams = new HashMap<String, StreamChecker>();
-
- // Settings
- private int regionId = DistributedLogConstants.LOCAL_REGION_ID;
- private int interval = 100;
- private String streamRegex = null;
- private boolean watchNamespaceChanges = false;
- private boolean handshakeWithClientInfo = false;
- private int heartbeatEveryChecks = 0;
- private int instanceId = -1;
- private int totalInstances = -1;
- private boolean isThriftMux = false;
-
- // Options
- private final Optional<String> uriArg;
- private final Optional<String> confFileArg;
- private final Optional<String> serverSetArg;
- private final Optional<Integer> intervalArg;
- private final Optional<Integer> regionIdArg;
- private final Optional<String> streamRegexArg;
- private final Optional<Integer> instanceIdArg;
- private final Optional<Integer> totalInstancesArg;
- private final Optional<Integer> heartbeatEveryChecksArg;
- private final Optional<Boolean> handshakeWithClientInfoArg;
- private final Optional<Boolean> watchNamespaceChangesArg;
- private final Optional<Boolean> isThriftMuxArg;
-
- // Stats
- private final StatsProvider statsProvider;
- private final StatsReceiver statsReceiver;
- private final StatsReceiver monitorReceiver;
- private final Stat successStat;
- private final Stat failureStat;
- private final Gauge<Number> numOfStreamsGauge;
- // Hash Function
- private final HashFunction hashFunction = Hashing.md5();
-
- class StreamChecker implements Runnable, FutureEventListener<Void>, LogSegmentListener {
- private final String name;
- private volatile boolean closed = false;
- private volatile boolean checking = false;
- private final Stopwatch stopwatch = Stopwatch.createUnstarted();
- private DistributedLogManager dlm = null;
- private int numChecks = 0;
-
- StreamChecker(String name) {
- this.name = name;
- }
-
- @Override
- public void run() {
- if (null == dlm) {
- try {
- dlm = dlNamespace.openLog(name);
- dlm.registerListener(this);
- } catch (IOException e) {
- if (null != dlm) {
- try {
- dlm.close();
- } catch (IOException e1) {
- logger.error("Failed to close dlm for {} : ", name, e1);
- }
- dlm = null;
- }
- executorService.schedule(this, interval, TimeUnit.MILLISECONDS);
- }
- } else {
- stopwatch.reset().start();
- boolean sendHeartBeat;
- if (heartbeatEveryChecks > 0) {
- synchronized (this) {
- ++numChecks;
- if (numChecks >= Integer.MAX_VALUE) {
- numChecks = 0;
- }
- sendHeartBeat = (numChecks % heartbeatEveryChecks) == 0;
- }
- } else {
- sendHeartBeat = false;
- }
- if (sendHeartBeat) {
- dlClient.heartbeat(name).addEventListener(this);
- } else {
- dlClient.check(name).addEventListener(this);
- }
- }
- }
-
- @Override
- public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
- if (segments.size() > 0 && segments.get(0).getRegionId() == regionId) {
- if (!checking) {
- logger.info("Start checking stream {}.", name);
- checking = true;
- run();
- }
- } else {
- if (checking) {
- logger.info("Stop checking stream {}.", name);
- }
- }
- }
-
- @Override
- public void onLogStreamDeleted() {
- logger.info("Stream {} is deleted", name);
- }
-
- @Override
- public void onSuccess(Void value) {
- successStat.add(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- scheduleCheck();
- }
-
- @Override
- public void onFailure(Throwable cause) {
- failureStat.add(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- scheduleCheck();
- }
-
- private void scheduleCheck() {
- if (closed) {
- return;
- }
- if (!checking) {
- return;
- }
- try {
- executorService.schedule(this, interval, TimeUnit.MILLISECONDS);
- } catch (RejectedExecutionException ree) {
- logger.error("Failed to schedule checking stream {} in {} ms : ",
- new Object[] { name, interval, ree });
- }
- }
-
- private void close() {
- closed = true;
- if (null != dlm) {
- try {
- dlm.close();
- } catch (IOException e) {
- logger.error("Failed to close dlm for {} : ", name, e);
- }
- }
- }
- }
-
- MonitorService(Optional<String> uriArg,
- Optional<String> confFileArg,
- Optional<String> serverSetArg,
- Optional<Integer> intervalArg,
- Optional<Integer> regionIdArg,
- Optional<String> streamRegexArg,
- Optional<Integer> instanceIdArg,
- Optional<Integer> totalInstancesArg,
- Optional<Integer> heartbeatEveryChecksArg,
- Optional<Boolean> handshakeWithClientInfoArg,
- Optional<Boolean> watchNamespaceChangesArg,
- Optional<Boolean> isThriftMuxArg,
- StatsReceiver statsReceiver,
- StatsProvider statsProvider) {
- // options
- this.uriArg = uriArg;
- this.confFileArg = confFileArg;
- this.serverSetArg = serverSetArg;
- this.intervalArg = intervalArg;
- this.regionIdArg = regionIdArg;
- this.streamRegexArg = streamRegexArg;
- this.instanceIdArg = instanceIdArg;
- this.totalInstancesArg = totalInstancesArg;
- this.heartbeatEveryChecksArg = heartbeatEveryChecksArg;
- this.handshakeWithClientInfoArg = handshakeWithClientInfoArg;
- this.watchNamespaceChangesArg = watchNamespaceChangesArg;
- this.isThriftMuxArg = isThriftMuxArg;
-
- // Stats
- this.statsReceiver = statsReceiver;
- this.monitorReceiver = statsReceiver.scope("monitor");
- this.successStat = monitorReceiver.stat0("success");
- this.failureStat = monitorReceiver.stat0("failure");
- this.statsProvider = statsProvider;
- this.numOfStreamsGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return knownStreams.size();
- }
- };
- }
-
- public void runServer() throws IllegalArgumentException, IOException {
- checkArgument(uriArg.isPresent(),
- "No distributedlog uri provided.");
- checkArgument(serverSetArg.isPresent(),
- "No proxy server set provided.");
- if (intervalArg.isPresent()) {
- interval = intervalArg.get();
- }
- if (regionIdArg.isPresent()) {
- regionId = regionIdArg.get();
- }
- if (streamRegexArg.isPresent()) {
- streamRegex = streamRegexArg.get();
- }
- if (instanceIdArg.isPresent()) {
- instanceId = instanceIdArg.get();
- }
- if (totalInstancesArg.isPresent()) {
- totalInstances = totalInstancesArg.get();
- }
- if (heartbeatEveryChecksArg.isPresent()) {
- heartbeatEveryChecks = heartbeatEveryChecksArg.get();
- }
- if (instanceId < 0 || totalInstances <= 0 || instanceId >= totalInstances) {
- throw new IllegalArgumentException("Invalid instance id or total instances number.");
- }
- handshakeWithClientInfo = handshakeWithClientInfoArg.isPresent();
- watchNamespaceChanges = watchNamespaceChangesArg.isPresent();
- isThriftMux = isThriftMuxArg.isPresent();
- URI uri = URI.create(uriArg.get());
- DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
- if (confFileArg.isPresent()) {
- String configFile = confFileArg.get();
- try {
- dlConf.loadConf(new File(configFile).toURI().toURL());
- } catch (ConfigurationException e) {
- throw new IOException("Failed to load distributedlog configuration from " + configFile + ".");
- } catch (MalformedURLException e) {
- throw new IOException("Failed to load distributedlog configuration from malformed "
- + configFile + ".");
- }
- }
- logger.info("Starting stats provider : {}.", statsProvider.getClass());
- statsProvider.start(dlConf);
- String[] serverSetPaths = StringUtils.split(serverSetArg.get(), ",");
- if (serverSetPaths.length == 0) {
- throw new IllegalArgumentException("Invalid serverset paths provided : " + serverSetArg.get());
- }
-
- ServerSet[] serverSets = createServerSets(serverSetPaths);
- ServerSet local = serverSets[0];
- ServerSet[] remotes = new ServerSet[serverSets.length - 1];
- System.arraycopy(serverSets, 1, remotes, 0, remotes.length);
-
- ClientBuilder finagleClientBuilder = ClientBuilder.get()
- .connectTimeout(Duration.fromSeconds(1))
- .tcpConnectTimeout(Duration.fromSeconds(1))
- .requestTimeout(Duration.fromSeconds(2))
- .keepAlive(true)
- .failFast(false);
-
- if (!isThriftMux) {
- finagleClientBuilder = finagleClientBuilder
- .hostConnectionLimit(2)
- .hostConnectionCoresize(2);
- }
-
- dlClient = DistributedLogClientBuilder.newBuilder()
- .name("monitor")
- .thriftmux(isThriftMux)
- .clientId(ClientId$.MODULE$.apply("monitor"))
- .redirectBackoffMaxMs(50)
- .redirectBackoffStartMs(100)
- .requestTimeoutMs(2000)
- .maxRedirects(2)
- .serverSets(local, remotes)
- .streamNameRegex(streamRegex)
- .handshakeWithClientInfo(handshakeWithClientInfo)
- .clientBuilder(finagleClientBuilder)
- .statsReceiver(monitorReceiver.scope("client"))
- .buildMonitorClient();
- runMonitor(dlConf, uri);
- }
-
- ServerSet[] createServerSets(String[] serverSetPaths) {
- ServerSet[] serverSets = new ServerSet[serverSetPaths.length];
- zkServerSets = new DLZkServerSet[serverSetPaths.length];
- for (int i = 0; i < serverSetPaths.length; i++) {
- String serverSetPath = serverSetPaths[i];
- zkServerSets[i] = parseServerSet(serverSetPath);
- serverSets[i] = zkServerSets[i].getServerSet();
- }
- return serverSets;
- }
-
- protected DLZkServerSet parseServerSet(String serverSetPath) {
- return DLZkServerSet.of(URI.create(serverSetPath), 60000);
- }
-
- @Override
- public void onStreamsChanged(Iterator<String> streams) {
- Set<String> newSet = new HashSet<String>();
- while (streams.hasNext()) {
- String s = streams.next();
- if (null == streamRegex || s.matches(streamRegex)) {
- if (Math.abs(hashFunction.hashUnencodedChars(s).asInt()) % totalInstances == instanceId) {
- newSet.add(s);
- }
- }
- }
- List<StreamChecker> tasksToCancel = new ArrayList<StreamChecker>();
- synchronized (knownStreams) {
- Set<String> knownStreamSet = new HashSet<String>(knownStreams.keySet());
- Set<String> removedStreams = Sets.difference(knownStreamSet, newSet).immutableCopy();
- Set<String> addedStreams = Sets.difference(newSet, knownStreamSet).immutableCopy();
- for (String s : removedStreams) {
- StreamChecker task = knownStreams.remove(s);
- if (null != task) {
- logger.info("Removed stream {}", s);
- tasksToCancel.add(task);
- }
- }
- for (String s : addedStreams) {
- if (!knownStreams.containsKey(s)) {
- logger.info("Added stream {}", s);
- StreamChecker sc = new StreamChecker(s);
- knownStreams.put(s, sc);
- sc.run();
- }
- }
- }
- for (StreamChecker sc : tasksToCancel) {
- sc.close();
- }
- }
-
- void runMonitor(DistributedLogConfiguration conf, URI dlUri) throws IOException {
- // stats
- statsProvider.getStatsLogger("monitor").registerGauge("num_streams", numOfStreamsGauge);
- logger.info("Construct dl namespace @ {}", dlUri);
- dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
- .conf(conf)
- .uri(dlUri)
- .build();
- if (watchNamespaceChanges) {
- dlNamespace.registerNamespaceListener(this);
- } else {
- onStreamsChanged(dlNamespace.getLogs());
- }
- }
-
- /**
- * Close the server.
- */
- public void close() {
- logger.info("Closing monitor service.");
- if (null != dlClient) {
- dlClient.close();
- }
- if (null != zkServerSets) {
- for (DLZkServerSet zkServerSet : zkServerSets) {
- zkServerSet.close();
- }
- }
- if (null != dlNamespace) {
- dlNamespace.close();
- }
- executorService.shutdown();
- try {
- if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
- executorService.shutdownNow();
- }
- } catch (InterruptedException e) {
- logger.error("Interrupted on waiting shutting down monitor executor service : ", e);
- }
- if (null != statsProvider) {
- // clean up the gauges
- unregisterGauge();
- statsProvider.stop();
- }
- keepAliveLatch.countDown();
- logger.info("Closed monitor service.");
- }
-
- public void join() throws InterruptedException {
- keepAliveLatch.await();
- }
-
- /**
- * clean up the gauge before we close to help GC.
- */
- private void unregisterGauge(){
- statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", numOfStreamsGauge);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java
deleted file mode 100644
index 1f45b13..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static org.apache.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
-import static org.apache.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
-import static org.apache.distributedlog.util.CommandLineUtils.getOptionalStringArg;
-
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.io.IOException;
-import org.apache.bookkeeper.stats.NullStatsProvider;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The launcher to run monitor service.
- */
-public class MonitorServiceApp {
-
- private static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class);
-
- static final String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]";
-
- final String[] args;
- final Options options = new Options();
-
- private MonitorServiceApp(String[] args) {
- this.args = args;
- // prepare options
- options.addOption("u", "uri", true, "DistributedLog URI");
- options.addOption("c", "conf", true, "DistributedLog Configuration File");
- options.addOption("s", "serverset", true, "Proxy Server Set");
- options.addOption("i", "interval", true, "Check interval");
- options.addOption("d", "region", true, "Region ID");
- options.addOption("p", "provider", true, "DistributedLog Stats Provider");
- options.addOption("f", "filter", true, "Filter streams by regex");
- options.addOption("w", "watch", false, "Watch stream changes under a given namespace");
- options.addOption("n", "instance_id", true, "Instance ID");
- options.addOption("t", "total_instances", true, "Total instances");
- options.addOption("hck", "heartbeat-num-checks", true, "Send a heartbeat after num checks");
- options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info");
- }
-
- void printUsage() {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp(USAGE, options);
- }
-
- private void run() {
- try {
- logger.info("Running monitor service.");
- BasicParser parser = new BasicParser();
- CommandLine cmdline = parser.parse(options, args);
- runCmd(cmdline);
- } catch (ParseException pe) {
- printUsage();
- Runtime.getRuntime().exit(-1);
- } catch (IOException ie) {
- logger.error("Failed to start monitor service : ", ie);
- Runtime.getRuntime().exit(-1);
- }
- }
-
- void runCmd(CommandLine cmdline) throws IOException {
- StatsProvider statsProvider = new NullStatsProvider();
- if (cmdline.hasOption("p")) {
- String providerClass = cmdline.getOptionValue("p");
- statsProvider = ReflectionUtils.newInstance(providerClass, StatsProvider.class);
- }
- StatsReceiver statsReceiver = NullStatsReceiver.get();
-
- final MonitorService monitorService = new MonitorService(
- getOptionalStringArg(cmdline, "u"),
- getOptionalStringArg(cmdline, "c"),
- getOptionalStringArg(cmdline, "s"),
- getOptionalIntegerArg(cmdline, "i"),
- getOptionalIntegerArg(cmdline, "d"),
- getOptionalStringArg(cmdline, "f"),
- getOptionalIntegerArg(cmdline, "n"),
- getOptionalIntegerArg(cmdline, "t"),
- getOptionalIntegerArg(cmdline, "hck"),
- getOptionalBooleanArg(cmdline, "hsci"),
- getOptionalBooleanArg(cmdline, "w"),
- getOptionalBooleanArg(cmdline, "mx"),
- statsReceiver,
- statsProvider);
-
- monitorService.runServer();
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- logger.info("Closing monitor service.");
- monitorService.close();
- logger.info("Closed monitor service.");
- }
- });
- try {
- monitorService.join();
- } catch (InterruptedException ie) {
- logger.warn("Interrupted when waiting monitor service to be finished : ", ie);
- }
- }
-
- public static void main(String[] args) {
- final MonitorServiceApp launcher = new MonitorServiceApp(args);
- launcher.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ResponseUtils.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
deleted file mode 100644
index a2691d3..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import org.apache.distributedlog.thrift.service.BulkWriteResponse;
-import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-
-/**
- * Utility methods for building write proxy service responses.
- */
-public class ResponseUtils {
- public static ResponseHeader deniedHeader() {
- return new ResponseHeader(StatusCode.REQUEST_DENIED);
- }
-
- public static ResponseHeader streamUnavailableHeader() {
- return new ResponseHeader(StatusCode.STREAM_UNAVAILABLE);
- }
-
- public static ResponseHeader successHeader() {
- return new ResponseHeader(StatusCode.SUCCESS);
- }
-
- public static ResponseHeader ownerToHeader(String owner) {
- return new ResponseHeader(StatusCode.FOUND).setLocation(owner);
- }
-
- public static ResponseHeader exceptionToHeader(Throwable t) {
- ResponseHeader response = new ResponseHeader();
- if (t instanceof DLException) {
- DLException dle = (DLException) t;
- if (dle instanceof OwnershipAcquireFailedException) {
- response.setLocation(((OwnershipAcquireFailedException) dle).getCurrentOwner());
- }
- response.setCode(dle.getCode());
- response.setErrMsg(dle.getMessage());
- } else {
- response.setCode(StatusCode.INTERNAL_SERVER_ERROR);
- response.setErrMsg("Internal server error : " + t.getMessage());
- }
- return response;
- }
-
- public static WriteResponse write(ResponseHeader responseHeader) {
- return new WriteResponse(responseHeader);
- }
-
- public static WriteResponse writeSuccess() {
- return new WriteResponse(successHeader());
- }
-
- public static WriteResponse writeDenied() {
- return new WriteResponse(deniedHeader());
- }
-
- public static BulkWriteResponse bulkWrite(ResponseHeader responseHeader) {
- return new BulkWriteResponse(responseHeader);
- }
-
- public static BulkWriteResponse bulkWriteSuccess() {
- return new BulkWriteResponse(successHeader());
- }
-
- public static BulkWriteResponse bulkWriteDenied() {
- return new BulkWriteResponse(deniedHeader());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java
deleted file mode 100644
index 436145d..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-/**
- * List of feature keys used by distributedlog server.
- */
-public enum ServerFeatureKeys {
-
- REGION_STOP_ACCEPT_NEW_STREAM,
- SERVICE_RATE_LIMIT_DISABLED,
- SERVICE_CHECKSUM_DISABLED,
- SERVICE_GLOBAL_LIMITER_DISABLED
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/StatsFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/StatsFilter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/StatsFilter.java
deleted file mode 100644
index ee64580..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/StatsFilter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import com.google.common.base.Stopwatch;
-import com.twitter.finagle.Service;
-import com.twitter.finagle.SimpleFilter;
-import com.twitter.util.Future;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-/**
- * Track distributedlog server finagle-service stats.
- */
-class StatsFilter<Req, Rep> extends SimpleFilter<Req, Rep> {
-
- private final StatsLogger stats;
- private final Counter outstandingAsync;
- private final OpStatsLogger serviceExec;
-
- @Override
- public Future<Rep> apply(Req req, Service<Req, Rep> service) {
- Future<Rep> result = null;
- outstandingAsync.inc();
- final Stopwatch stopwatch = Stopwatch.createStarted();
- try {
- result = service.apply(req);
- serviceExec.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- } finally {
- outstandingAsync.dec();
- if (null == result) {
- serviceExec.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
- }
- }
- return result;
- }
-
- public StatsFilter(StatsLogger stats) {
- this.stats = stats;
- this.outstandingAsync = stats.getCounter("outstandingAsync");
- this.serviceExec = stats.getOpStatsLogger("serviceExec");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java
deleted file mode 100644
index ee64fc7..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.announcer;
-
-import java.io.IOException;
-
-/**
- * Announce service information.
- */
-public interface Announcer {
-
- /**
- * Announce service info.
- */
- void announce() throws IOException;
-
- /**
- * Unannounce the service info.
- */
- void unannounce() throws IOException;
-
- /**
- * Close the announcer.
- */
- void close();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java
deleted file mode 100644
index 5a1277a..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.announcer;
-
-import java.io.IOException;
-
-/**
- * A no-op implementation of {@link Announcer}.
- */
-public class NOPAnnouncer implements Announcer {
- @Override
- public void announce() throws IOException {
- // nop
- }
-
- @Override
- public void unannounce() throws IOException {
- // nop
- }
-
- @Override
- public void close() {
- // nop
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java
deleted file mode 100644
index df4a8e2..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.announcer;
-
-import com.twitter.common.zookeeper.Group;
-import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.client.serverset.DLZkServerSet;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * ServerSet based announcer.
- */
-public class ServerSetAnnouncer implements Announcer {
-
- private static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class);
-
- final String localAddr;
- final InetSocketAddress serviceEndpoint;
- final Map<String, InetSocketAddress> additionalEndpoints;
- final int shardId;
-
- // ServerSet
- DLZkServerSet zkServerSet;
-
- // Service Status
- ServerSet.EndpointStatus serviceStatus = null;
-
- /**
- * Announce server infos.
- *
- * @param servicePort
- * service port
- * @param statsPort
- * stats port
- * @param shardId
- * shard id
- */
- public ServerSetAnnouncer(URI uri,
- int servicePort,
- int statsPort,
- int shardId) throws UnknownHostException {
- this.shardId = shardId;
- this.localAddr = InetAddress.getLocalHost().getHostAddress();
- // service endpoint
- this.serviceEndpoint = new InetSocketAddress(localAddr, servicePort);
- // stats endpoint
- InetSocketAddress statsEndpoint = new InetSocketAddress(localAddr, statsPort);
- this.additionalEndpoints = new HashMap<String, InetSocketAddress>();
- this.additionalEndpoints.put("aurora", statsEndpoint);
- this.additionalEndpoints.put("stats", statsEndpoint);
- this.additionalEndpoints.put("service", serviceEndpoint);
- this.additionalEndpoints.put("thrift", serviceEndpoint);
-
- // Create zookeeper and server set
- this.zkServerSet = DLZkServerSet.of(uri, 60000);
- }
-
- @Override
- public synchronized void announce() throws IOException {
- try {
- serviceStatus =
- zkServerSet.getServerSet().join(serviceEndpoint, additionalEndpoints, shardId);
- } catch (Group.JoinException e) {
- throw new IOException("Failed to announce service : ", e);
- } catch (InterruptedException e) {
- logger.warn("Interrupted on announcing service : ", e);
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public synchronized void unannounce() throws IOException {
- if (null == serviceStatus) {
- logger.warn("No service to unannounce.");
- return;
- }
- try {
- serviceStatus.leave();
- } catch (ServerSet.UpdateException e) {
- throw new IOException("Failed to unannounce service : ", e);
- }
- }
-
- @Override
- public void close() {
- zkServerSet.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/package-info.java
deleted file mode 100644
index 6559bb3..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Announcers to announce servers to server set.
- */
-package org.apache.distributedlog.service.announcer;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java
deleted file mode 100644
index cdffaa3..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-
-/**
- * Balancer Interface.
- *
- * <p>A balancer is used for balance the streams across the proxy cluster.
- */
-public interface Balancer {
-
- /**
- * Rebalance all the streams from <i>source</i> to others.
- *
- * @param source
- * source target name.
- * @param rebalanceConcurrency
- * the concurrency to move streams for re-balance.
- * @param rebalanceRateLimiter
- * the rate limiting to move streams for re-balance.
- */
- void balanceAll(String source,
- int rebalanceConcurrency,
- Optional<RateLimiter> rebalanceRateLimiter);
-
- /**
- * Balance the streams across all targets.
- *
- * @param rebalanceWaterMark
- * rebalance water mark. if number of streams of a given target is less than
- * the water mark, no streams will be re-balanced from this target.
- * @param rebalanceTolerancePercentage
- * tolerance percentage for the balancer. if number of streams of a given target is
- * less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will
- * be re-balanced from that target.
- * @param rebalanceConcurrency
- * the concurrency to move streams for re-balance.
- * @param rebalanceRateLimiter
- * the rate limiting to move streams for re-balance.
- */
- void balance(int rebalanceWaterMark,
- double rebalanceTolerancePercentage,
- int rebalanceConcurrency,
- Optional<RateLimiter> rebalanceRateLimiter);
-
- /**
- * Close the balancer.
- */
- void close();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java
deleted file mode 100644
index 964c1cc..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.client.serverset.DLZkServerSet;
-import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.service.ClientUtils;
-import org.apache.distributedlog.service.DLSocketAddress;
-import org.apache.distributedlog.service.DistributedLogClient;
-import org.apache.distributedlog.service.DistributedLogClientBuilder;
-import org.apache.distributedlog.tools.Tool;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Tool to rebalance cluster.
- */
-public class BalancerTool extends Tool {
-
- private static final Logger logger = LoggerFactory.getLogger(BalancerTool.class);
-
- static DistributedLogClientBuilder createDistributedLogClientBuilder(ServerSet serverSet) {
- return DistributedLogClientBuilder.newBuilder()
- .name("rebalancer_tool")
- .clientId(ClientId$.MODULE$.apply("rebalancer_tool"))
- .maxRedirects(2)
- .serverSet(serverSet)
- .clientBuilder(ClientBuilder.get()
- .connectionTimeout(Duration.fromSeconds(2))
- .tcpConnectTimeout(Duration.fromSeconds(2))
- .requestTimeout(Duration.fromSeconds(10))
- .hostConnectionLimit(1)
- .hostConnectionCoresize(1)
- .keepAlive(true)
- .failFast(false));
- }
-
- /**
- * Base Command to run balancer.
- */
- protected abstract static class BalancerCommand extends OptsCommand {
-
- protected Options options = new Options();
- protected int rebalanceWaterMark = 0;
- protected double rebalanceTolerancePercentage = 0.0f;
- protected int rebalanceConcurrency = 1;
- protected Double rate = null;
- protected Optional<RateLimiter> rateLimiter;
-
- BalancerCommand(String name, String description) {
- super(name, description);
- options.addOption("rwm", "rebalance-water-mark", true, "Rebalance water mark per proxy");
- options.addOption("rtp", "rebalance-tolerance-percentage", true,
- "Rebalance tolerance percentage per proxy");
- options.addOption("rc", "rebalance-concurrency", true, "Concurrency to rebalance stream distribution");
- options.addOption("r", "rate", true, "Rebalance rate");
- }
-
- Optional<RateLimiter> getRateLimiter() {
- return rateLimiter;
- }
-
- @Override
- protected Options getOptions() {
- return options;
- }
-
- protected void parseCommandLine(CommandLine cmdline) throws ParseException {
- if (cmdline.hasOption("rwm")) {
- this.rebalanceWaterMark = Integer.parseInt(cmdline.getOptionValue("rwm"));
- }
- if (cmdline.hasOption("rtp")) {
- this.rebalanceTolerancePercentage = Double.parseDouble(cmdline.getOptionValue("rtp"));
- }
- if (cmdline.hasOption("rc")) {
- this.rebalanceConcurrency = Integer.parseInt(cmdline.getOptionValue("rc"));
- }
- if (cmdline.hasOption("r")) {
- this.rate = Double.parseDouble(cmdline.getOptionValue("r"));
- }
- checkArgument(rebalanceWaterMark >= 0,
- "Rebalance Water Mark should be a non-negative number");
- checkArgument(rebalanceTolerancePercentage >= 0.0f,
- "Rebalance Tolerance Percentage should be a non-negative number");
- checkArgument(rebalanceConcurrency > 0,
- "Rebalance Concurrency should be a positive number");
- if (null == rate || rate <= 0.0f) {
- rateLimiter = Optional.absent();
- } else {
- rateLimiter = Optional.of(RateLimiter.create(rate));
- }
- }
-
- @Override
- protected int runCmd(CommandLine cmdline) throws Exception {
- try {
- parseCommandLine(cmdline);
- } catch (ParseException pe) {
- println("ERROR: fail to parse commandline : '" + pe.getMessage() + "'");
- printUsage();
- return -1;
- }
- return executeCommand(cmdline);
- }
-
- protected abstract int executeCommand(CommandLine cmdline) throws Exception;
- }
-
- /**
- * Command to balance streams within a cluster.
- */
- protected static class ClusterBalancerCommand extends BalancerCommand {
-
- protected URI uri;
- protected String source = null;
-
- protected ClusterBalancerCommand() {
- super("clusterbalancer", "Balance streams inside a cluster");
- options.addOption("u", "uri", true, "DistributedLog URI");
- options.addOption("sp", "source-proxy", true, "Source proxy to balance");
- }
-
- @Override
- protected String getUsage() {
- return "clusterbalancer [options]";
- }
-
- protected void parseCommandLine(CommandLine cmdline) throws ParseException {
- super.parseCommandLine(cmdline);
- if (!cmdline.hasOption("u")) {
- throw new ParseException("No proxy serverset provided.");
- }
- uri = URI.create(cmdline.getOptionValue("u"));
- if (cmdline.hasOption("sp")) {
- String sourceProxyStr = cmdline.getOptionValue("sp");
- try {
- DLSocketAddress.parseSocketAddress(sourceProxyStr);
- } catch (IllegalArgumentException iae) {
- throw new ParseException("Invalid source proxy " + sourceProxyStr + " : " + iae.getMessage());
- }
- this.source = sourceProxyStr;
- }
- }
-
- @Override
- protected int executeCommand(CommandLine cmdline) throws Exception {
- DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000);
- logger.info("Created serverset for {}", uri);
- try {
- DistributedLogClientBuilder clientBuilder =
- createDistributedLogClientBuilder(serverSet.getServerSet());
- ClusterBalancer balancer = new ClusterBalancer(clientBuilder);
- try {
- return runBalancer(clientBuilder, balancer);
- } finally {
- balancer.close();
- }
- } finally {
- serverSet.close();
- }
- }
-
- protected int runBalancer(DistributedLogClientBuilder clientBuilder,
- ClusterBalancer balancer)
- throws Exception {
- if (null == source) {
- balancer.balance(
- rebalanceWaterMark,
- rebalanceTolerancePercentage,
- rebalanceConcurrency,
- getRateLimiter());
- } else {
- balanceFromSource(clientBuilder, balancer, source, getRateLimiter());
- }
- return 0;
- }
-
- protected void balanceFromSource(DistributedLogClientBuilder clientBuilder,
- ClusterBalancer balancer,
- String source,
- Optional<RateLimiter> rateLimiter)
- throws Exception {
- InetSocketAddress sourceAddr = DLSocketAddress.parseSocketAddress(source);
- DistributedLogClientBuilder sourceClientBuilder =
- DistributedLogClientBuilder.newBuilder(clientBuilder)
- .host(sourceAddr);
-
- Pair<DistributedLogClient, MonitorServiceClient> clientPair =
- ClientUtils.buildClient(sourceClientBuilder);
- try {
- Await.result(clientPair.getRight().setAcceptNewStream(false));
- logger.info("Disable accepting new stream on proxy {}.", source);
- balancer.balanceAll(source, rebalanceConcurrency, rateLimiter);
- } finally {
- clientPair.getLeft().close();
- }
- }
- }
-
- /**
- * Command to balance streams between regions.
- */
- protected static class RegionBalancerCommand extends BalancerCommand {
-
- protected URI region1;
- protected URI region2;
- protected String source = null;
-
- protected RegionBalancerCommand() {
- super("regionbalancer", "Balance streams between regions");
- options.addOption("rs", "regions", true, "DistributedLog Region URI: uri1[,uri2]");
- options.addOption("s", "source", true, "DistributedLog Source Region to balance");
- }
-
- @Override
- protected String getUsage() {
- return "regionbalancer [options]";
- }
-
- @Override
- protected void parseCommandLine(CommandLine cmdline) throws ParseException {
- super.parseCommandLine(cmdline);
- if (!cmdline.hasOption("rs")) {
- throw new ParseException("No regions provided.");
- }
- String regionsStr = cmdline.getOptionValue("rs");
- String[] regions = regionsStr.split(",");
- if (regions.length != 2) {
- throw new ParseException("Invalid regions provided. Expected : serverset1[,serverset2]");
- }
- region1 = URI.create(regions[0]);
- region2 = URI.create(regions[1]);
- if (cmdline.hasOption("s")) {
- source = cmdline.getOptionValue("s");
- }
- }
-
- @Override
- protected int executeCommand(CommandLine cmdline) throws Exception {
- DLZkServerSet serverSet1 = DLZkServerSet.of(region1, 60000);
- logger.info("Created serverset for {}", region1);
- DLZkServerSet serverSet2 = DLZkServerSet.of(region2, 60000);
- logger.info("Created serverset for {}", region2);
- try {
- DistributedLogClientBuilder builder1 =
- createDistributedLogClientBuilder(serverSet1.getServerSet());
- Pair<DistributedLogClient, MonitorServiceClient> pair1 =
- ClientUtils.buildClient(builder1);
- DistributedLogClientBuilder builder2 =
- createDistributedLogClientBuilder(serverSet2.getServerSet());
- Pair<DistributedLogClient, MonitorServiceClient> pair2 =
- ClientUtils.buildClient(builder2);
- try {
- SimpleBalancer balancer = new SimpleBalancer(
- BKNamespaceDriver.getZKServersFromDLUri(region1), pair1.getLeft(), pair1.getRight(),
- BKNamespaceDriver.getZKServersFromDLUri(region2), pair2.getLeft(), pair2.getRight());
- try {
- return runBalancer(balancer);
- } finally {
- balancer.close();
- }
- } finally {
- pair1.getLeft().close();
- pair2.getLeft().close();
- }
- } finally {
- serverSet1.close();
- serverSet2.close();
- }
- }
-
- protected int runBalancer(SimpleBalancer balancer) throws Exception {
- if (null == source) {
- balancer.balance(
- rebalanceWaterMark,
- rebalanceTolerancePercentage,
- rebalanceConcurrency,
- getRateLimiter());
- } else {
- balancer.balanceAll(source, rebalanceConcurrency, getRateLimiter());
- }
- return 0;
- }
- }
-
- public BalancerTool() {
- super();
- addCommand(new ClusterBalancerCommand());
- addCommand(new RegionBalancerCommand());
- }
-
- @Override
- protected String getName() {
- return "balancer";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java
deleted file mode 100644
index 4c9e075..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import java.util.Map;
-
-/**
- * Utils for balancer.
- */
-public class BalancerUtils {
-
- /**
- * Util function to calculate how many streams to balance for <i>nodeToRebalance</i>,
- * based on the load distribution <i>loadDistribution</i>.
- *
- * @param nodeToRebalance
- * node to rebalance
- * @param loadDistribution
- * load distribution map
- * @param rebalanceWaterMark
- * if number of streams of <i>nodeToRebalance</i>
- * is less than <i>rebalanceWaterMark</i>, no streams will be re-balanced.
- * @param tolerancePercentage
- * tolerance percentage for the balancer. if number of streams of <i>nodeToRebalance</i>
- * is less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will
- * be re-balanced.
- * @param <K>
- * @return number of streams to rebalance
- */
- public static <K> int calculateNumStreamsToRebalance(K nodeToRebalance,
- Map<K, Integer> loadDistribution,
- int rebalanceWaterMark,
- double tolerancePercentage) {
- Integer myLoad = loadDistribution.get(nodeToRebalance);
- if (null == myLoad || myLoad <= rebalanceWaterMark) {
- return 0;
- }
-
- long totalLoad = 0L;
- int numNodes = loadDistribution.size();
-
- for (Map.Entry<K, Integer> entry : loadDistribution.entrySet()) {
- if (null == entry.getKey() || null == entry.getValue()) {
- continue;
- }
- totalLoad += entry.getValue();
- }
-
- double averageLoad = ((double) totalLoad) / numNodes;
- long permissibleLoad =
- Math.max(1L, (long) Math.ceil(averageLoad + averageLoad * tolerancePercentage / 100.0f));
-
- if (myLoad <= permissibleLoad) {
- return 0;
- }
-
- return Math.max(0, myLoad - (int) Math.ceil(averageLoad));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java
deleted file mode 100644
index 5add339..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.service.ClientUtils;
-import org.apache.distributedlog.service.DLSocketAddress;
-import org.apache.distributedlog.service.DistributedLogClient;
-import org.apache.distributedlog.service.DistributedLogClientBuilder;
-import com.twitter.util.Await;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import java.io.Serializable;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A balancer balances ownerships with a cluster of targets.
- */
-public class ClusterBalancer implements Balancer {
-
- private static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class);
-
- /**
- * Represent a single host. Ordered by number of streams in desc order.
- */
- static class Host {
-
- final SocketAddress address;
- final Set<String> streams;
- final DistributedLogClientBuilder clientBuilder;
- DistributedLogClient client = null;
- MonitorServiceClient monitor = null;
-
- Host(SocketAddress address, Set<String> streams,
- DistributedLogClientBuilder clientBuilder) {
- this.address = address;
- this.streams = streams;
- this.clientBuilder = clientBuilder;
- }
-
- private void initializeClientsIfNeeded() {
- if (null == client) {
- Pair<DistributedLogClient, MonitorServiceClient> clientPair =
- createDistributedLogClient(address, clientBuilder);
- client = clientPair.getLeft();
- monitor = clientPair.getRight();
- }
- }
-
- synchronized DistributedLogClient getClient() {
- initializeClientsIfNeeded();
- return client;
- }
-
- synchronized MonitorServiceClient getMonitor() {
- initializeClientsIfNeeded();
- return monitor;
- }
-
- synchronized void close() {
- if (null != client) {
- client.close();
- }
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Host(").append(address).append(")");
- return sb.toString();
- }
- }
-
- static class HostComparator implements Comparator<Host>, Serializable {
- private static final long serialVersionUID = 7984973796525102538L;
-
- @Override
- public int compare(Host h1, Host h2) {
- return h2.streams.size() - h1.streams.size();
- }
- }
-
- protected final DistributedLogClientBuilder clientBuilder;
- protected final DistributedLogClient client;
- protected final MonitorServiceClient monitor;
-
- public ClusterBalancer(DistributedLogClientBuilder clientBuilder) {
- this(clientBuilder, ClientUtils.buildClient(clientBuilder));
- }
-
- ClusterBalancer(DistributedLogClientBuilder clientBuilder,
- Pair<DistributedLogClient, MonitorServiceClient> clientPair) {
- this.clientBuilder = clientBuilder;
- this.client = clientPair.getLeft();
- this.monitor = clientPair.getRight();
- }
-
- /**
- * Build a new distributedlog client to a single host <i>host</i>.
- *
- * @param host
- * host to access
- * @return distributedlog clients
- */
- static Pair<DistributedLogClient, MonitorServiceClient> createDistributedLogClient(
- SocketAddress host, DistributedLogClientBuilder clientBuilder) {
- DistributedLogClientBuilder newBuilder =
- DistributedLogClientBuilder.newBuilder(clientBuilder).host(host);
- return ClientUtils.buildClient(newBuilder);
- }
-
- @Override
- public void balanceAll(String source,
- int rebalanceConcurrency, /* unused */
- Optional<RateLimiter> rebalanceRateLimiter) {
- balance(0, 0.0f, rebalanceConcurrency, Optional.of(source), rebalanceRateLimiter);
- }
-
- @Override
- public void balance(int rebalanceWaterMark,
- double rebalanceTolerancePercentage,
- int rebalanceConcurrency, /* unused */
- Optional<RateLimiter> rebalanceRateLimiter) {
- Optional<String> source = Optional.absent();
- balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, source, rebalanceRateLimiter);
- }
-
- public void balance(int rebalanceWaterMark,
- double rebalanceTolerancePercentage,
- int rebalanceConcurrency,
- Optional<String> source,
- Optional<RateLimiter> rebalanceRateLimiter) {
- Map<SocketAddress, Set<String>> distribution = monitor.getStreamOwnershipDistribution();
- if (distribution.size() <= 1) {
- return;
- }
- SocketAddress sourceAddr = null;
- if (source.isPresent()) {
- sourceAddr = DLSocketAddress.parseSocketAddress(source.get());
- logger.info("Balancer source is {}", sourceAddr);
- if (!distribution.containsKey(sourceAddr)) {
- return;
- }
- }
- // Get the list of hosts ordered by number of streams in DESC order
- List<Host> hosts = new ArrayList<Host>(distribution.size());
- for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) {
- Host host = new Host(entry.getKey(), entry.getValue(), clientBuilder);
- hosts.add(host);
- }
- Collections.sort(hosts, new HostComparator());
- try {
-
- // find the host to move streams from.
- int hostIdxMoveFrom = -1;
- if (null != sourceAddr) {
- for (Host host : hosts) {
- ++hostIdxMoveFrom;
- if (sourceAddr.equals(host.address)) {
- break;
- }
- }
- }
-
- // compute the average load.
- int totalStream = 0;
- for (Host host : hosts) {
- totalStream += host.streams.size();
- }
- double averageLoad;
- if (hostIdxMoveFrom >= 0) {
- averageLoad = ((double) totalStream / (hosts.size() - 1));
- } else {
- averageLoad = ((double) totalStream / hosts.size());
- }
-
- int moveFromLowWaterMark;
- int moveToHighWaterMark =
- Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f));
-
- if (hostIdxMoveFrom >= 0) {
- moveFromLowWaterMark = Math.max(0, rebalanceWaterMark);
- moveStreams(
- hosts,
- new AtomicInteger(hostIdxMoveFrom), moveFromLowWaterMark,
- new AtomicInteger(hosts.size() - 1), moveToHighWaterMark,
- rebalanceRateLimiter);
- moveRemainingStreamsFromSource(hosts.get(hostIdxMoveFrom), hosts, rebalanceRateLimiter);
- } else {
- moveFromLowWaterMark = Math.max((int) Math.ceil(averageLoad), rebalanceWaterMark);
- AtomicInteger moveFrom = new AtomicInteger(0);
- AtomicInteger moveTo = new AtomicInteger(hosts.size() - 1);
- while (moveFrom.get() < moveTo.get()) {
- moveStreams(hosts, moveFrom, moveFromLowWaterMark,
- moveTo, moveToHighWaterMark, rebalanceRateLimiter);
- moveFrom.incrementAndGet();
- }
- }
- } finally {
- for (Host host : hosts) {
- host.close();
- }
- }
- }
-
- void moveStreams(List<Host> hosts,
- AtomicInteger hostIdxMoveFrom,
- int moveFromLowWaterMark,
- AtomicInteger hostIdxMoveTo,
- int moveToHighWaterMark,
- Optional<RateLimiter> rateLimiter) {
- if (hostIdxMoveFrom.get() < 0 || hostIdxMoveFrom.get() >= hosts.size()
- || hostIdxMoveTo.get() < 0 || hostIdxMoveTo.get() >= hosts.size()
- || hostIdxMoveFrom.get() >= hostIdxMoveTo.get()) {
- return;
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("Moving streams : hosts = {}, from = {}, to = {} :"
- + " from_low_water_mark = {}, to_high_water_mark = {}",
- new Object[] {
- hosts,
- hostIdxMoveFrom.get(),
- hostIdxMoveTo.get(),
- moveFromLowWaterMark,
- moveToHighWaterMark });
- }
-
- Host hostMoveFrom = hosts.get(hostIdxMoveFrom.get());
- int numStreamsOnFromHost = hostMoveFrom.streams.size();
- if (numStreamsOnFromHost <= moveFromLowWaterMark) {
- // do nothing
- return;
- }
-
- int numStreamsToMove = numStreamsOnFromHost - moveFromLowWaterMark;
- LinkedList<String> streamsToMove = new LinkedList<String>(hostMoveFrom.streams);
- Collections.shuffle(streamsToMove);
-
- if (logger.isDebugEnabled()) {
- logger.debug("Try to move {} streams from host {} : streams = {}",
- new Object[] { numStreamsToMove, hostMoveFrom.address, streamsToMove });
- }
-
- while (numStreamsToMove-- > 0 && !streamsToMove.isEmpty()) {
- if (rateLimiter.isPresent()) {
- rateLimiter.get().acquire();
- }
-
- // pick a host to move
- Host hostMoveTo = hosts.get(hostIdxMoveTo.get());
- while (hostMoveTo.streams.size() >= moveToHighWaterMark) {
- int hostIdx = hostIdxMoveTo.decrementAndGet();
- logger.info("move to host : {}, from {}", hostIdx, hostIdxMoveFrom.get());
- if (hostIdx <= hostIdxMoveFrom.get()) {
- return;
- } else {
- hostMoveTo = hosts.get(hostIdx);
- if (logger.isDebugEnabled()) {
- logger.debug("Target host to move moved to host {} @ {}",
- hostIdx, hostMoveTo);
- }
- }
- }
-
- // pick a stream
- String stream = streamsToMove.remove();
-
- // move the stream
- if (moveStream(stream, hostMoveFrom, hostMoveTo)) {
- hostMoveFrom.streams.remove(stream);
- hostMoveTo.streams.add(stream);
- }
- }
-
- }
-
- void moveRemainingStreamsFromSource(Host source,
- List<Host> hosts,
- Optional<RateLimiter> rateLimiter) {
- LinkedList<String> streamsToMove = new LinkedList<String>(source.streams);
- Collections.shuffle(streamsToMove);
-
- if (logger.isDebugEnabled()) {
- logger.debug("Try to move remaining streams from {} : {}", source, streamsToMove);
- }
-
- int hostIdx = hosts.size() - 1;
-
- while (!streamsToMove.isEmpty()) {
- if (rateLimiter.isPresent()) {
- rateLimiter.get().acquire();
- }
-
- Host target = hosts.get(hostIdx);
- if (!target.address.equals(source.address)) {
- String stream = streamsToMove.remove();
- // move the stream
- if (moveStream(stream, source, target)) {
- source.streams.remove(stream);
- target.streams.add(stream);
- }
- }
- --hostIdx;
- if (hostIdx < 0) {
- hostIdx = hosts.size() - 1;
- }
- }
- }
-
- private boolean moveStream(String stream, Host from, Host to) {
- try {
- doMoveStream(stream, from, to);
- return true;
- } catch (Exception e) {
- return false;
- }
- }
-
- private void doMoveStream(final String stream, final Host from, final Host to) throws Exception {
- logger.info("Moving stream {} from {} to {}.",
- new Object[] { stream, from.address, to.address });
- Await.result(from.getClient().release(stream).flatMap(new Function<Void, Future<Void>>() {
- @Override
- public Future<Void> apply(Void result) {
- logger.info("Released stream {} from {}.", stream, from.address);
- return to.getMonitor().check(stream).addEventListener(new FutureEventListener<Void>() {
-
- @Override
- public void onSuccess(Void value) {
- logger.info("Moved stream {} from {} to {}.",
- new Object[] { stream, from.address, to.address });
- }
-
- @Override
- public void onFailure(Throwable cause) {
- logger.info("Failed to move stream {} from {} to {} : ",
- new Object[] { stream, from.address, to.address, cause });
- }
- });
- }
- }));
- }
-
- @Override
- public void close() {
- client.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java
deleted file mode 100644
index 6a43179..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.Serializable;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.lang3.tuple.Pair;
-
-/**
- * A stream chooser based on number of streams.
- */
-class CountBasedStreamChooser implements StreamChooser, Serializable,
- Comparator<Pair<SocketAddress, LinkedList<String>>> {
-
- private static final long serialVersionUID = 4664153397369979203L;
-
- final List<Pair<SocketAddress, LinkedList<String>>> streamsDistribution;
-
- // pivot index in the list of hosts. the chooser will remove streams from the hosts before
- // pivot, which will reduce their stream counts to make them equal to the stream count of the pivot.
- int pivot;
- int pivotCount;
-
- // next index in the list of hosts to choose stream from.
- int next;
-
- CountBasedStreamChooser(Map<SocketAddress, Set<String>> streams) {
- checkArgument(streams.size() > 0, "Only support no-empty streams distribution");
- streamsDistribution = new ArrayList<Pair<SocketAddress, LinkedList<String>>>(streams.size());
- for (Map.Entry<SocketAddress, Set<String>> entry : streams.entrySet()) {
- LinkedList<String> randomizedStreams = new LinkedList<String>(entry.getValue());
- Collections.shuffle(randomizedStreams);
- streamsDistribution.add(Pair.of(entry.getKey(), randomizedStreams));
- }
- // sort the hosts by the number of streams in descending order
- Collections.sort(streamsDistribution, this);
- pivot = 0;
- pivotCount = streamsDistribution.get(0).getValue().size();
- findNextPivot();
- next = 0;
- }
-
- private void findNextPivot() {
- int prevPivotCount = pivotCount;
- while (++pivot < streamsDistribution.size()) {
- pivotCount = streamsDistribution.get(pivot).getValue().size();
- if (pivotCount < prevPivotCount) {
- return;
- }
- }
- pivot = streamsDistribution.size();
- pivotCount = 0;
- }
-
- @Override
- public synchronized String choose() {
- // reach the pivot
- if (next == pivot) {
- if (streamsDistribution.get(next - 1).getRight().size() > pivotCount) {
- next = 0;
- } else if (pivotCount == 0) { // the streams are empty now
- return null;
- } else {
- findNextPivot();
- next = 0;
- }
- }
-
- // get stream count that next host to choose from
- LinkedList<String> nextStreams = streamsDistribution.get(next).getRight();
- if (nextStreams.size() == 0) {
- return null;
- }
-
- String chosenStream = nextStreams.remove();
- ++next;
- return chosenStream;
- }
-
- @Override
- public int compare(Pair<SocketAddress, LinkedList<String>> o1,
- Pair<SocketAddress, LinkedList<String>> o2) {
- return o2.getValue().size() - o1.getValue().size();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java
deleted file mode 100644
index 4aefc5e..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-/**
- * A stream chooser that can only choose limited number of streams.
- */
-public class LimitedStreamChooser implements StreamChooser {
-
- /**
- * Create a limited stream chooser by {@code limit}.
- *
- * @param underlying the underlying stream chooser.
- * @param limit the limit of number of streams to choose.
- * @return the limited stream chooser.
- */
- public static LimitedStreamChooser of(StreamChooser underlying, int limit) {
- return new LimitedStreamChooser(underlying, limit);
- }
-
- final StreamChooser underlying;
- int limit;
-
- LimitedStreamChooser(StreamChooser underlying, int limit) {
- this.underlying = underlying;
- this.limit = limit;
- }
-
- @Override
- public synchronized String choose() {
- if (limit <= 0) {
- return null;
- }
- String s = underlying.choose();
- if (s == null) {
- limit = 0;
- return null;
- }
- --limit;
- return s;
- }
-}