You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/04 08:44:03 UTC
[4/4] incubator-distributedlog git commit: DL-132: Enable check style
for distributedlog service module.
DL-132: Enable check style for distributedlog service module.
Author: Xi Liu <xi...@gmail.com>
Reviewers: Sijie Guo <si...@apache.org>
Closes #89 from xiliuant/xi/checkstyle_service
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/1a30b0ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/1a30b0ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/1a30b0ce
Branch: refs/heads/master
Commit: 1a30b0ceb76f33eda08b611d97c150f45f239a95
Parents: 32a52a9
Author: Xi Liu <xi...@gmail.com>
Authored: Wed Jan 4 00:43:56 2017 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Wed Jan 4 00:43:56 2017 -0800
----------------------------------------------------------------------
.../resources/distributedlog/checkstyle.xml | 2 +-
distributedlog-service/pom.xml | 33 ++
.../distributedlog/service/ClientUtils.java | 3 +
.../service/DistributedLogCluster.java | 78 ++---
.../service/DistributedLogServer.java | 64 ++--
.../service/DistributedLogServerApp.java | 38 ++-
.../service/DistributedLogServiceImpl.java | 48 +--
.../service/FatalErrorHandler.java | 7 -
.../distributedlog/service/MonitorService.java | 29 +-
.../service/MonitorServiceApp.java | 16 +-
.../service/ServerFeatureKeys.java | 2 +-
.../distributedlog/service/StatsFilter.java | 8 +-
.../service/announcer/Announcer.java | 2 +-
.../service/announcer/NOPAnnouncer.java | 3 +
.../service/announcer/ServerSetAnnouncer.java | 10 +-
.../service/announcer/package-info.java | 21 ++
.../service/balancer/Balancer.java | 5 +
.../service/balancer/BalancerTool.java | 42 ++-
.../service/balancer/ClusterBalancer.java | 27 +-
.../balancer/CountBasedStreamChooser.java | 9 +-
.../service/balancer/LimitedStreamChooser.java | 10 +
.../service/balancer/SimpleBalancer.java | 17 +-
.../service/balancer/StreamChooser.java | 2 +-
.../service/balancer/StreamMover.java | 5 +-
.../service/balancer/StreamMoverImpl.java | 2 +-
.../service/balancer/package-info.java | 21 ++
.../config/DefaultStreamConfigProvider.java | 29 +-
.../service/config/ServerConfiguration.java | 106 ++++---
.../config/ServiceStreamConfigProvider.java | 19 +-
.../service/config/package-info.java | 21 ++
.../distributedlog/service/package-info.java | 21 ++
.../service/placement/EqualLoadAppraiser.java | 26 +-
.../placement/LeastLoadPlacementPolicy.java | 311 ++++++++++---------
.../service/placement/LoadAppraiser.java | 18 +-
.../service/placement/PlacementPolicy.java | 194 ++++++------
.../placement/PlacementStateManager.java | 80 +++--
.../service/placement/ServerLoad.java | 227 +++++++-------
.../service/placement/StreamLoad.java | 141 +++++----
.../placement/ZKPlacementStateManager.java | 236 +++++++-------
.../service/placement/package-info.java | 21 ++
.../service/stream/AbstractStreamOp.java | 23 +-
.../service/stream/AbstractWriteOp.java | 9 +-
.../service/stream/BulkWriteOp.java | 29 +-
.../distributedlog/service/stream/DeleteOp.java | 7 +-
.../service/stream/HeartbeatOp.java | 11 +-
.../service/stream/ReleaseOp.java | 7 +-
.../distributedlog/service/stream/Stream.java | 7 +-
.../service/stream/StreamFactory.java | 3 +
.../service/stream/StreamFactoryImpl.java | 3 +
.../service/stream/StreamImpl.java | 34 +-
.../service/stream/StreamManager.java | 4 +-
.../service/stream/StreamManagerImpl.java | 5 +-
.../distributedlog/service/stream/StreamOp.java | 2 -
.../service/stream/StreamOpStats.java | 2 +-
.../service/stream/TruncateOp.java | 11 +-
.../distributedlog/service/stream/WriteOp.java | 17 +-
.../service/stream/WriteOpWithPayload.java | 3 +
.../service/stream/admin/AdminOp.java | 6 +-
.../service/stream/admin/CreateOp.java | 7 +-
.../service/stream/admin/StreamAdminOp.java | 5 +-
.../service/stream/admin/package-info.java | 21 ++
.../stream/limiter/DynamicRequestLimiter.java | 21 +-
.../stream/limiter/RequestLimiterBuilder.java | 24 +-
.../stream/limiter/ServiceRequestLimiter.java | 8 +-
.../stream/limiter/StreamAcquireLimiter.java | 6 +-
.../stream/limiter/StreamRequestLimiter.java | 5 +-
.../service/stream/limiter/package-info.java | 21 ++
.../service/stream/package-info.java | 21 ++
.../CacheableStreamPartitionConverter.java | 3 +
.../DelimiterStreamPartitionConverter.java | 2 +-
.../service/streamset/Partition.java | 1 +
.../service/streamset/PartitionMap.java | 3 +
.../service/streamset/package-info.java | 21 ++
.../distributedlog/service/tools/ProxyTool.java | 30 +-
.../service/tools/package-info.java | 21 ++
.../service/utils/package-info.java | 21 ++
.../stats/CodahaleMetricsServletProvider.java | 4 +-
.../HealthCheckServletContextListener.java | 2 +-
.../stats/MetricsServletContextListener.java | 3 +
.../bookkeeper/stats/ServletReporter.java | 2 +-
.../apache/bookkeeper/stats/package-info.java | 21 ++
.../client/routing/LocalRoutingService.java | 7 +-
.../service/DistributedLogServerTestCase.java | 33 +-
.../service/TestDistributedLogServerBase.java | 62 ++--
.../TestDistributedLogServerClientRouting.java | 9 +-
.../service/TestDistributedLogService.java | 88 +++---
.../service/TestRegionUnavailable.java | 22 +-
.../distributedlog/service/TestStatsFilter.java | 11 +-
.../service/balancer/TestBalancerUtils.java | 8 +-
.../service/balancer/TestClusterBalancer.java | 26 +-
.../balancer/TestCountBasedStreamChooser.java | 16 +-
.../service/balancer/TestSimpleBalancer.java | 22 +-
.../service/balancer/TestStreamMover.java | 12 +-
.../service/config/TestServerConfiguration.java | 8 +-
.../config/TestStreamConfigProvider.java | 14 +-
.../placement/TestLeastLoadPlacementPolicy.java | 247 ++++++++-------
.../service/placement/TestServerLoad.java | 50 +--
.../service/placement/TestStreamLoad.java | 28 +-
.../placement/TestZKPlacementStateManager.java | 197 ++++++------
.../service/stream/TestStreamManager.java | 24 +-
.../service/stream/TestStreamOp.java | 27 +-
.../limiter/TestServiceRequestLimiter.java | 32 +-
.../TestDelimiterStreamPartitionConverter.java | 6 +-
.../TestIdentityStreamPartitionConverter.java | 8 +-
.../service/streamset/TestPartitionMap.java | 7 +-
105 files changed, 1974 insertions(+), 1400 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
----------------------------------------------------------------------
diff --git a/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml b/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
index e1117c8..db3549f 100644
--- a/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
+++ b/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
@@ -260,7 +260,7 @@ page at http://checkstyle.sourceforge.net/config.html -->
T, K, V, W, X or else be capital-case terminated with a T,
such as MyGenericParameterT -->
<module name="ClassTypeParameterName">
- <property name="format" value="^(((T|K|V|W|X)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
+ <property name="format" value="^(((T|K|V|W|X|R)[0-9]*)|([A-Z][a-z][a-zA-Z]*))$"/>
<property name="severity" value="error"/>
</module>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-service/pom.xml b/distributedlog-service/pom.xml
index 052ce15..154dedb 100644
--- a/distributedlog-service/pom.xml
+++ b/distributedlog-service/pom.xml
@@ -197,6 +197,39 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.19</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>distributedlog-build-tools</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>distributedlog/checkstyle.xml</configLocation>
+ <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
+ <consoleOutput>true</consoleOutput>
+ <failOnViolation>true</failOnViolation>
+ <includeResources>false</includeResources>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java
index dd9961d..da36014 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java
@@ -21,6 +21,9 @@ import com.twitter.distributedlog.client.DistributedLogClientImpl;
import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
import org.apache.commons.lang3.tuple.Pair;
+/**
+ * DistributedLog Client Related Utils.
+ */
public class ClientUtils {
public static Pair<DistributedLogClient, MonitorServiceClient> buildClient(DistributedLogClientBuilder builder) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
index a2a0ca6..029c822 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
@@ -25,6 +25,13 @@ import com.twitter.distributedlog.metadata.DLMetadata;
import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import com.twitter.finagle.builder.Server;
+import java.io.File;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
import org.apache.bookkeeper.stats.NullStatsProvider;
@@ -35,14 +42,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.net.BindException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
/**
* DistributedLog Cluster is an emulator to run distributedlog components.
*/
@@ -59,18 +58,18 @@ public class DistributedLogCluster {
*/
public static class Builder {
- int _numBookies = 3;
- boolean _shouldStartZK = true;
- String _zkHost = "127.0.0.1";
- int _zkPort = 0;
- boolean _shouldStartProxy = true;
- int _proxyPort = 7000;
- boolean _thriftmux = false;
- DistributedLogConfiguration _dlConf = new DistributedLogConfiguration()
+ int numBookies = 3;
+ boolean shouldStartZK = true;
+ String zkHost = "127.0.0.1";
+ int zkPort = 0;
+ boolean shouldStartProxy = true;
+ int proxyPort = 7000;
+ boolean thriftmux = false;
+ DistributedLogConfiguration dlConf = new DistributedLogConfiguration()
.setLockTimeout(10)
.setOutputBufferSize(0)
.setImmediateFlushEnabled(true);
- ServerConfiguration _bkConf = new ServerConfiguration();
+ ServerConfiguration bkConf = new ServerConfiguration();
private Builder() {}
@@ -80,7 +79,7 @@ public class DistributedLogCluster {
* @return builder
*/
public Builder numBookies(int numBookies) {
- this._numBookies = numBookies;
+ this.numBookies = numBookies;
return this;
}
@@ -92,7 +91,7 @@ public class DistributedLogCluster {
* @return builder
*/
public Builder shouldStartZK(boolean startZK) {
- this._shouldStartZK = startZK;
+ this.shouldStartZK = startZK;
return this;
}
@@ -104,7 +103,7 @@ public class DistributedLogCluster {
* @return builder
*/
public Builder zkServers(String zkServers) {
- this._zkHost = zkServers;
+ this.zkHost = zkServers;
return this;
}
@@ -116,7 +115,7 @@ public class DistributedLogCluster {
* @return builder.
*/
public Builder zkPort(int zkPort) {
- this._zkPort = zkPort;
+ this.zkPort = zkPort;
return this;
}
@@ -128,7 +127,7 @@ public class DistributedLogCluster {
* @return builder
*/
public Builder shouldStartProxy(boolean startProxy) {
- this._shouldStartProxy = startProxy;
+ this.shouldStartProxy = startProxy;
return this;
}
@@ -140,60 +139,63 @@ public class DistributedLogCluster {
* @return builder
*/
public Builder proxyPort(int proxyPort) {
- this._proxyPort = proxyPort;
+ this.proxyPort = proxyPort;
return this;
}
/**
- * DistributedLog Configuration
+ * Set the distributedlog configuration.
*
* @param dlConf
* distributedlog configuration
* @return builder
*/
public Builder dlConf(DistributedLogConfiguration dlConf) {
- this._dlConf = dlConf;
+ this.dlConf = dlConf;
return this;
}
/**
- * Bookkeeper server configuration
+ * Set the Bookkeeper server configuration.
*
* @param bkConf
* bookkeeper server configuration
* @return builder
*/
public Builder bkConf(ServerConfiguration bkConf) {
- this._bkConf = bkConf;
+ this.bkConf = bkConf;
return this;
}
/**
- * Enable thriftmux for the dl server
+ * Enable thriftmux for the dl server.
*
* @param enabled flag to enable thriftmux
* @return builder
*/
public Builder thriftmux(boolean enabled) {
- this._thriftmux = enabled;
+ this.thriftmux = enabled;
return this;
}
public DistributedLogCluster build() throws Exception {
// build the cluster
return new DistributedLogCluster(
- _dlConf,
- _bkConf,
- _numBookies,
- _shouldStartZK,
- _zkHost,
- _zkPort,
- _shouldStartProxy,
- _proxyPort,
- _thriftmux);
+ dlConf,
+ bkConf,
+ numBookies,
+ shouldStartZK,
+ zkHost,
+ zkPort,
+ shouldStartProxy,
+ proxyPort,
+ thriftmux);
}
}
+ /**
+ * Run a distributedlog proxy server.
+ */
public static class DLServer {
static final int MAX_RETRIES = 20;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
index a9ba125..248bcf7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
@@ -17,32 +17,8 @@
*/
package com.twitter.distributedlog.service;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.Tuple2;
-
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.client.routing.RoutingService;
import com.twitter.distributedlog.config.DynamicConfigurationFactory;
@@ -72,10 +48,33 @@ import com.twitter.finagle.thrift.ClientIdRequiredFilter;
import com.twitter.finagle.thrift.ThriftServerFramedCodec;
import com.twitter.finagle.transport.Transport;
import com.twitter.util.Duration;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+/**
+ * Running the distributedlog proxy server.
+ */
public class DistributedLogServer {
- static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName();
private DistributedLogServiceImpl dlService = null;
@@ -124,7 +123,8 @@ public class DistributedLogServer {
this.loadAppraiserClassStr = loadAppraiserClass;
}
- public void runServer() throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
+ public void runServer()
+ throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
if (!uri.isPresent()) {
throw new IllegalArgumentException("No distributedlog uri provided.");
}
@@ -135,7 +135,8 @@ public class DistributedLogServer {
try {
dlConf.loadConf(new File(configFile).toURI().toURL());
} catch (ConfigurationException e) {
- throw new IllegalArgumentException("Failed to load distributedlog configuration from " + configFile + ".");
+ throw new IllegalArgumentException("Failed to load distributedlog configuration from "
+ + configFile + ".");
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
+ configFile + ".");
@@ -185,7 +186,8 @@ public class DistributedLogServer {
}
Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER));
LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass);
- logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get() + " Instantiated " + loadAppraiser.getClass().getCanonicalName());
+ logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get()
+ + " Instantiated " + loadAppraiser.getClass().getCanonicalName());
StreamConfigProvider streamConfProvider =
getStreamConfigProvider(dlConf, converter);
@@ -227,7 +229,8 @@ public class DistributedLogServer {
}
}
- private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf) throws ConfigurationException {
+ private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf)
+ throws ConfigurationException {
Optional<DynamicDistributedLogConfiguration> dynConf = Optional.absent();
if (conf.isPresent()) {
DynamicConfigurationFactory configFactory = new DynamicConfigurationFactory(
@@ -341,7 +344,8 @@ public class DistributedLogServer {
logger.info("Using thriftmux.");
Tuple2<Transport.Liveness, Stack.Param<Transport.Liveness>> livenessParam = new Transport.Liveness(
Duration.Top(), Duration.Top(), Option.apply((Object) Boolean.valueOf(true))).mk();
- serverBuilder = serverBuilder.stack(ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2()));
+ serverBuilder = serverBuilder.stack(
+ ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2()));
}
logger.info("DistributedLogServer running with the following configuration : \n{}", dlConf.getPropsAsString());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
index 1c3d8d4..55ed84f 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
@@ -17,15 +17,26 @@
*/
package com.twitter.distributedlog.service;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalStringArg;
+
import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.client.routing.RoutingService;
import com.twitter.distributedlog.client.routing.RoutingUtils;
import com.twitter.distributedlog.client.serverset.DLZkServerSet;
import com.twitter.finagle.stats.NullStatsReceiver;
import com.twitter.finagle.stats.StatsReceiver;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.ReflectionUtils;
@@ -38,21 +49,14 @@ import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import static com.twitter.distributedlog.util.CommandLineUtils.*;
-
+/**
+ * The launcher of the distributedlog proxy server.
+ */
public class DistributedLogServerApp {
- static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class);
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class);
- private final static String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]";
+ private static final String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]";
private final String[] args;
private final Options options = new Options();
@@ -104,7 +108,8 @@ public class DistributedLogServerApp {
}
}
- private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
+ private void runCmd(CommandLine cmdline)
+ throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
final StatsReceiver statsReceiver = NullStatsReceiver.get();
Optional<String> confOptional = getOptionalStringArg(cmdline, "c");
DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
@@ -113,7 +118,8 @@ public class DistributedLogServerApp {
try {
dlConf.loadConf(new File(configFile).toURI().toURL());
} catch (ConfigurationException e) {
- throw new IllegalArgumentException("Failed to load distributedlog configuration from " + configFile + ".");
+ throw new IllegalArgumentException("Failed to load distributedlog configuration from "
+ + configFile + ".");
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
+ configFile + ".");
@@ -130,7 +136,7 @@ public class DistributedLogServerApp {
}).or(new NullStatsProvider());
final Optional<String> uriOption = getOptionalStringArg(cmdline, "u");
- Preconditions.checkArgument(uriOption.isPresent(), "No distributedlog uri provided.");
+ checkArgument(uriOption.isPresent(), "No distributedlog uri provided.");
URI dlUri = URI.create(uriOption.get());
DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) TimeUnit.SECONDS.toMillis(60));
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 5dee7fd..db1346e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -47,7 +47,6 @@ import com.twitter.distributedlog.service.placement.PlacementPolicy;
import com.twitter.distributedlog.service.placement.ZKPlacementStateManager;
import com.twitter.distributedlog.service.stream.BulkWriteOp;
import com.twitter.distributedlog.service.stream.DeleteOp;
-import com.twitter.distributedlog.service.stream.admin.CreateOp;
import com.twitter.distributedlog.service.stream.HeartbeatOp;
import com.twitter.distributedlog.service.stream.ReleaseOp;
import com.twitter.distributedlog.service.stream.Stream;
@@ -59,8 +58,8 @@ import com.twitter.distributedlog.service.stream.StreamOp;
import com.twitter.distributedlog.service.stream.StreamOpStats;
import com.twitter.distributedlog.service.stream.TruncateOp;
import com.twitter.distributedlog.service.stream.WriteOp;
-import com.twitter.distributedlog.service.stream.admin.StreamAdminOp;
import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
+import com.twitter.distributedlog.service.stream.admin.CreateOp;
import com.twitter.distributedlog.service.stream.admin.StreamAdminOp;
import com.twitter.distributedlog.service.stream.limiter.ServiceRequestLimiter;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
@@ -86,7 +85,6 @@ import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.ScheduledThreadPoolTimer;
import com.twitter.util.Timer;
-
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -96,7 +94,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.Counter;
@@ -105,15 +102,17 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.runtime.BoxedUnit;
+/**
+ * Implementation of distributedlog thrift service.
+ */
public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface,
FatalErrorHandler {
- static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class);
- private final int MOVING_AVERAGE_WINDOW_SECS = 60;
+ private static final int MOVING_AVERAGE_WINDOW_SECS = 60;
private final ServerConfiguration serverConfig;
private final DistributedLogConfiguration dlConfig;
@@ -294,8 +293,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
@Override
public Number getSample() {
- return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable() ?
- 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2));
+ return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable()
+ ? 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2));
}
};
this.movingAvgRpsGauge = new Gauge<Number>() {
@@ -364,8 +363,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
streamsStatsLogger.registerGauge("cached", this.streamCachedGauge);
// Setup complete
- logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {}, dlsn version {}.",
- new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion });
+ logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {},"
+ + " dlsn version {}.",
+ new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion });
}
private void countStatusCode(StatusCode code) {
@@ -440,7 +440,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
}
@Override
- public Future<BulkWriteResponse> writeBulkWithContext(final String stream, List<ByteBuffer> data, WriteContext ctx) {
+ public Future<BulkWriteResponse> writeBulkWithContext(final String stream,
+ List<ByteBuffer> data,
+ WriteContext ctx) {
bulkWritePendingStat.inc();
receivedRecordCounter.add(data.size());
BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
@@ -480,8 +482,14 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
@Override
public Future<WriteResponse> truncate(String stream, String dlsn, WriteContext ctx) {
- TruncateOp op = new TruncateOp(stream, DLSN.deserialize(dlsn), statsLogger, perStreamStatsLogger, getChecksum(ctx),
- featureChecksumDisabled, accessControlManager);
+ TruncateOp op = new TruncateOp(
+ stream,
+ DLSN.deserialize(dlsn),
+ statsLogger,
+ perStreamStatsLogger,
+ getChecksum(ctx),
+ featureChecksumDisabled,
+ accessControlManager);
executeStreamOp(op);
return op.result();
}
@@ -730,14 +738,14 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
}
/**
- * clean up the gauge before we close to help GC
+ * clean up the gauge before we close to help GC.
*/
private void unregisterGauge(){
- this.statsLogger.unregisterGauge("proxy_status",this.proxyStatusGauge);
- this.statsLogger.unregisterGauge("moving_avg_rps",this.movingAvgRpsGauge);
- this.statsLogger.unregisterGauge("moving_avg_bps",this.movingAvgBpsGauge);
- this.statsLogger.unregisterGauge("acquired",this.streamAcquiredGauge);
- this.statsLogger.unregisterGauge("cached",this.streamCachedGauge);
+ this.statsLogger.unregisterGauge("proxy_status", this.proxyStatusGauge);
+ this.statsLogger.unregisterGauge("moving_avg_rps", this.movingAvgRpsGauge);
+ this.statsLogger.unregisterGauge("moving_avg_bps", this.movingAvgBpsGauge);
+ this.statsLogger.unregisterGauge("acquired", this.streamAcquiredGauge);
+ this.statsLogger.unregisterGauge("cached", this.streamCachedGauge);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java
index e0a15e6..d6922b9 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java
@@ -17,13 +17,6 @@
*/
package com.twitter.distributedlog.service;
-import com.google.common.base.Optional;
-import com.twitter.util.Future;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* Implement handling for an unrecoverable error.
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
index 7edb778..4ff5b87 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
@@ -17,9 +17,9 @@
*/
package com.twitter.distributedlog.service;
+import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
@@ -41,14 +41,6 @@ import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Duration;
import com.twitter.util.FutureEventListener;
-
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
@@ -65,10 +57,19 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Monitor Service.
+ */
public class MonitorService implements NamespaceListener {
- static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
+ private static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
private DistributedLogNamespace dlNamespace = null;
private MonitorServiceClient dlClient = null;
@@ -271,9 +272,9 @@ public class MonitorService implements NamespaceListener {
}
public void runServer() throws IllegalArgumentException, IOException {
- Preconditions.checkArgument(uriArg.isPresent(),
+ checkArgument(uriArg.isPresent(),
"No distributedlog uri provided.");
- Preconditions.checkArgument(serverSetArg.isPresent(),
+ checkArgument(serverSetArg.isPresent(),
"No proxy server set provided.");
if (intervalArg.isPresent()) {
interval = intervalArg.get();
@@ -422,7 +423,7 @@ public class MonitorService implements NamespaceListener {
}
/**
- * Close the server
+ * Close the server.
*/
public void close() {
logger.info("Closing monitor service.");
@@ -459,7 +460,7 @@ public class MonitorService implements NamespaceListener {
}
/**
- * clean up the gauge before we close to help GC
+ * clean up the gauge before we close to help GC.
*/
private void unregisterGauge(){
statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", numOfStreamsGauge);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
index a51a6a9..b5b4ca8 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
@@ -17,8 +17,13 @@
*/
package com.twitter.distributedlog.service;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalStringArg;
+
import com.twitter.finagle.stats.NullStatsReceiver;
import com.twitter.finagle.stats.StatsReceiver;
+import java.io.IOException;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.ReflectionUtils;
@@ -30,15 +35,16 @@ import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
-import static com.twitter.distributedlog.util.CommandLineUtils.*;
+/**
+ * The launcher to run monitor service.
+ */
public class MonitorServiceApp {
- static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class);
+ private static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class);
+
+ static final String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]";
- final static String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]";
final String[] args;
final Options options = new Options();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java
index 798dcf5..d779cd0 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java
@@ -18,7 +18,7 @@
package com.twitter.distributedlog.service;
/**
- * List of feature keys used by distributedlog server
+ * List of feature keys used by distributedlog server.
*/
public enum ServerFeatureKeys {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java
index 6c570f6..bd0a992 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java
@@ -18,17 +18,13 @@
package com.twitter.distributedlog.service;
import com.google.common.base.Stopwatch;
-
import com.twitter.finagle.Service;
import com.twitter.finagle.SimpleFilter;
import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-
-import org.apache.bookkeeper.stats.StatsLogger;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
-
-import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.StatsLogger;
/**
* Track distributedlog server finagle-service stats.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java
index 89e0665..cb37088 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java
@@ -20,7 +20,7 @@ package com.twitter.distributedlog.service.announcer;
import java.io.IOException;
/**
- * Announce service information
+ * Announce service information.
*/
public interface Announcer {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java
index c686408..471f954 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java
@@ -19,6 +19,9 @@ package com.twitter.distributedlog.service.announcer;
import java.io.IOException;
+/**
+ * A no-op implementation of {@link Announcer}.
+ */
public class NOPAnnouncer implements Announcer {
@Override
public void announce() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java
index ada8710..eaf4c26 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java
@@ -20,9 +20,6 @@ package com.twitter.distributedlog.service.announcer;
import com.twitter.common.zookeeper.Group;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.distributedlog.client.serverset.DLZkServerSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -30,10 +27,15 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * ServerSet based announcer.
+ */
public class ServerSetAnnouncer implements Announcer {
- static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class);
+ private static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class);
final String localAddr;
final InetSocketAddress serviceEndpoint;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java
new file mode 100644
index 0000000..bca36df
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Announcers to announce servers to server set.
+ */
+package com.twitter.distributedlog.service.announcer;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java
index 9dd84d0..3ffe54b 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java
@@ -20,6 +20,11 @@ package com.twitter.distributedlog.service.balancer;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.RateLimiter;
+/**
+ * Balancer Interface.
+ *
+ * <p>A balancer is used for balance the streams across the proxy cluster.
+ */
public interface Balancer {
/**
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
index 2e49d92..48430df 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
@@ -17,8 +17,9 @@
*/
package com.twitter.distributedlog.service.balancer;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
@@ -33,6 +34,8 @@ import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Await;
import com.twitter.util.Duration;
+import java.net.InetSocketAddress;
+import java.net.URI;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
@@ -40,15 +43,12 @@ import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.net.URI;
-
/**
- * Tool to rebalance cluster
+ * Tool to rebalance cluster.
*/
public class BalancerTool extends Tool {
- static final Logger logger = LoggerFactory.getLogger(BalancerTool.class);
+ private static final Logger logger = LoggerFactory.getLogger(BalancerTool.class);
static DistributedLogClientBuilder createDistributedLogClientBuilder(ServerSet serverSet) {
return DistributedLogClientBuilder.newBuilder()
@@ -66,6 +66,9 @@ public class BalancerTool extends Tool {
.failFast(false));
}
+ /**
+ * Base Command to run balancer.
+ */
protected abstract static class BalancerCommand extends OptsCommand {
protected Options options = new Options();
@@ -78,7 +81,8 @@ public class BalancerTool extends Tool {
BalancerCommand(String name, String description) {
super(name, description);
options.addOption("rwm", "rebalance-water-mark", true, "Rebalance water mark per proxy");
- options.addOption("rtp", "rebalance-tolerance-percentage", true, "Rebalance tolerance percentage per proxy");
+ options.addOption("rtp", "rebalance-tolerance-percentage", true,
+ "Rebalance tolerance percentage per proxy");
options.addOption("rc", "rebalance-concurrency", true, "Concurrency to rebalance stream distribution");
options.addOption("r", "rate", true, "Rebalance rate");
}
@@ -105,11 +109,11 @@ public class BalancerTool extends Tool {
if (cmdline.hasOption("r")) {
this.rate = Double.parseDouble(cmdline.getOptionValue("r"));
}
- Preconditions.checkArgument(rebalanceWaterMark >= 0,
+ checkArgument(rebalanceWaterMark >= 0,
"Rebalance Water Mark should be a non-negative number");
- Preconditions.checkArgument(rebalanceTolerancePercentage >= 0.0f,
+ checkArgument(rebalanceTolerancePercentage >= 0.0f,
"Rebalance Tolerance Percentage should be a non-negative number");
- Preconditions.checkArgument(rebalanceConcurrency > 0,
+ checkArgument(rebalanceConcurrency > 0,
"Rebalance Concurrency should be a positive number");
if (null == rate || rate <= 0.0f) {
rateLimiter = Optional.absent();
@@ -133,6 +137,9 @@ public class BalancerTool extends Tool {
protected abstract int executeCommand(CommandLine cmdline) throws Exception;
}
+ /**
+ * Command to balance streams within a cluster.
+ */
protected static class ClusterBalancerCommand extends BalancerCommand {
protected URI uri;
@@ -188,7 +195,11 @@ public class BalancerTool extends Tool {
ClusterBalancer balancer)
throws Exception {
if (null == source) {
- balancer.balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, getRateLimiter());
+ balancer.balance(
+ rebalanceWaterMark,
+ rebalanceTolerancePercentage,
+ rebalanceConcurrency,
+ getRateLimiter());
} else {
balanceFromSource(clientBuilder, balancer, source, getRateLimiter());
}
@@ -217,6 +228,9 @@ public class BalancerTool extends Tool {
}
}
+ /**
+ * Command to balance streams between regions.
+ */
protected static class RegionBalancerCommand extends BalancerCommand {
protected URI region1;
@@ -288,7 +302,11 @@ public class BalancerTool extends Tool {
protected int runBalancer(SimpleBalancer balancer) throws Exception {
if (null == source) {
- balancer.balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, getRateLimiter());
+ balancer.balance(
+ rebalanceWaterMark,
+ rebalanceTolerancePercentage,
+ rebalanceConcurrency,
+ getRateLimiter());
} else {
balancer.balanceAll(source, rebalanceConcurrency, getRateLimiter());
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java
index 6fef648..3a3dc1f 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java
@@ -28,10 +28,6 @@ import com.twitter.util.Await;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.Serializable;
import java.net.SocketAddress;
import java.util.ArrayList;
@@ -42,13 +38,16 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * A balancer balances ownerships with a cluster of targets
+ * A balancer balances ownerships with a cluster of targets.
*/
public class ClusterBalancer implements Balancer {
- static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class);
+ private static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class);
/**
* Represent a single host. Ordered by number of streams in desc order.
@@ -205,7 +204,8 @@ public class ClusterBalancer implements Balancer {
}
int moveFromLowWaterMark;
- int moveToHighWaterMark = Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f));
+ int moveToHighWaterMark =
+ Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f));
if (hostIdxMoveFrom >= 0) {
moveFromLowWaterMark = Math.max(0, rebalanceWaterMark);
@@ -220,7 +220,8 @@ public class ClusterBalancer implements Balancer {
AtomicInteger moveFrom = new AtomicInteger(0);
AtomicInteger moveTo = new AtomicInteger(hosts.size() - 1);
while (moveFrom.get() < moveTo.get()) {
- moveStreams(hosts, moveFrom, moveFromLowWaterMark, moveTo, moveToHighWaterMark, rebalanceRateLimiter);
+ moveStreams(hosts, moveFrom, moveFromLowWaterMark,
+ moveTo, moveToHighWaterMark, rebalanceRateLimiter);
moveFrom.incrementAndGet();
}
}
@@ -244,8 +245,14 @@ public class ClusterBalancer implements Balancer {
}
if (logger.isDebugEnabled()) {
- logger.debug("Moving streams : hosts = {}, from = {}, to = {} : from_low_water_mark = {}, to_high_water_mark = {}",
- new Object[] { hosts, hostIdxMoveFrom.get(), hostIdxMoveTo.get(), moveFromLowWaterMark, moveToHighWaterMark });
+ logger.debug("Moving streams : hosts = {}, from = {}, to = {} :"
+ + " from_low_water_mark = {}, to_high_water_mark = {}",
+ new Object[] {
+ hosts,
+ hostIdxMoveFrom.get(),
+ hostIdxMoveTo.get(),
+ moveFromLowWaterMark,
+ moveToHighWaterMark });
}
Host hostMoveFrom = hosts.get(hostIdxMoveFrom.get());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java
index 0a267ea..fab37b3 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java
@@ -17,8 +17,7 @@
*/
package com.twitter.distributedlog.service.balancer;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.tuple.Pair;
+import static com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
import java.net.SocketAddress;
@@ -29,7 +28,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+/**
+ * A stream chooser based on number of streams.
+ */
class CountBasedStreamChooser implements StreamChooser, Serializable,
Comparator<Pair<SocketAddress, LinkedList<String>>> {
@@ -46,7 +49,7 @@ class CountBasedStreamChooser implements StreamChooser, Serializable,
int next;
CountBasedStreamChooser(Map<SocketAddress, Set<String>> streams) {
- Preconditions.checkArgument(streams.size() > 0, "Only support no-empty streams distribution");
+ checkArgument(streams.size() > 0, "Only support no-empty streams distribution");
streamsDistribution = new ArrayList<Pair<SocketAddress, LinkedList<String>>>(streams.size());
for (Map.Entry<SocketAddress, Set<String>> entry : streams.entrySet()) {
LinkedList<String> randomizedStreams = new LinkedList<String>(entry.getValue());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java
index d0e294d..069e596 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java
@@ -17,8 +17,18 @@
*/
package com.twitter.distributedlog.service.balancer;
+/**
+ * A stream chooser that can only choose limited number of streams.
+ */
public class LimitedStreamChooser implements StreamChooser {
+ /**
+ * Create a limited stream chooser by {@code limit}.
+ *
+ * @param underlying the underlying stream chooser.
+ * @param limit the limit of number of streams to choose.
+ * @return the limited stream chooser.
+ */
public static LimitedStreamChooser of(StreamChooser underlying, int limit) {
return new LimitedStreamChooser(underlying, limit);
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java
index 6913e4a..b205d5f 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java
@@ -21,9 +21,6 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
import com.twitter.distributedlog.service.DistributedLogClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
@@ -31,13 +28,15 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A balancer balances ownerships between two targets.
*/
public class SimpleBalancer implements Balancer {
- static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class);
+ private static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class);
protected final String target1;
protected final String target2;
@@ -120,8 +119,8 @@ public class SimpleBalancer implements Balancer {
loadDistribution.put(target, targetStreamCount);
// Calculate how many streams to be rebalanced from src region to target region
- int numStreamsToRebalance =
- BalancerUtils.calculateNumStreamsToRebalance(source, loadDistribution, rebalanceWaterMark, rebalanceTolerancePercentage);
+ int numStreamsToRebalance = BalancerUtils.calculateNumStreamsToRebalance(
+ source, loadDistribution, rebalanceWaterMark, rebalanceTolerancePercentage);
if (numStreamsToRebalance <= 0) {
logger.info("No streams need to be rebalanced from '{}' to '{}'.", source, target);
@@ -130,7 +129,8 @@ public class SimpleBalancer implements Balancer {
StreamChooser streamChooser =
LimitedStreamChooser.of(new CountBasedStreamChooser(srcDistribution), numStreamsToRebalance);
- StreamMover streamMover = new StreamMoverImpl(source, srcClient, srcMonitor, target, targetClient, targetMonitor);
+ StreamMover streamMover =
+ new StreamMoverImpl(source, srcClient, srcMonitor, target, targetClient, targetMonitor);
moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter);
}
@@ -166,7 +166,8 @@ public class SimpleBalancer implements Balancer {
}
StreamChooser streamChooser = new CountBasedStreamChooser(distribution);
- StreamMover streamMover = new StreamMoverImpl(source, sourceClient, sourceMonitor, target, targetClient, targetMonitor);
+ StreamMover streamMover =
+ new StreamMoverImpl(source, sourceClient, sourceMonitor, target, targetClient, targetMonitor);
moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter);
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java
index e0fcaf1..d92aef0 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java
@@ -18,7 +18,7 @@
package com.twitter.distributedlog.service.balancer;
/**
- * Choose a stream to rebalance
+ * Choose a stream to rebalance.
*/
public interface StreamChooser {
/**
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java
index 6857560..6e4205b 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java
@@ -17,10 +17,13 @@
*/
package com.twitter.distributedlog.service.balancer;
+/**
+ * A stream mover to move streams between proxies.
+ */
public interface StreamMover {
/**
- * Move given stream <i>streamName</i>
+ * Move given stream <i>streamName</i>.
*
* @param streamName
* stream name to move
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java
index 75df4ea..fc67fb2 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java
@@ -48,7 +48,7 @@ public class StreamMoverImpl implements StreamMover {
}
/**
- * Move given stream <i>streamName</i>
+ * Move given stream <i>streamName</i>.
*
* @param streamName
* stream name to move
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java
new file mode 100644
index 0000000..4ae8d44
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Balancer to move streams around to balance the traffic.
+ */
+package com.twitter.distributedlog.service.balancer;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java
index d612195..b45b798 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java
@@ -25,34 +25,41 @@ import com.twitter.distributedlog.config.ConfigurationSubscription;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.config.FileConfigurationBuilder;
import com.twitter.distributedlog.config.PropertiesConfigurationBuilder;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.configuration.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* For all streams return the same dynamic config based on configFile.
*/
public class DefaultStreamConfigProvider implements StreamConfigProvider {
- static final Logger LOG = LoggerFactory.getLogger(DefaultStreamConfigProvider.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamConfigProvider.class);
private final Optional<DynamicDistributedLogConfiguration> dynConf;
private final ConfigurationSubscription confSub;
- public DefaultStreamConfigProvider(String configFilePath, ScheduledExecutorService executorService, int reloadPeriod,
- TimeUnit reloadUnit) throws ConfigurationException {
+ public DefaultStreamConfigProvider(String configFilePath,
+ ScheduledExecutorService executorService,
+ int reloadPeriod,
+ TimeUnit reloadUnit)
+ throws ConfigurationException {
try {
File configFile = new File(configFilePath);
- FileConfigurationBuilder properties = new PropertiesConfigurationBuilder(configFile.toURI().toURL());
- ConcurrentConstConfiguration defaultConf = new ConcurrentConstConfiguration(new DistributedLogConfiguration());
- DynamicDistributedLogConfiguration conf = new DynamicDistributedLogConfiguration(defaultConf);
+ FileConfigurationBuilder properties =
+ new PropertiesConfigurationBuilder(configFile.toURI().toURL());
+ ConcurrentConstConfiguration defaultConf =
+ new ConcurrentConstConfiguration(new DistributedLogConfiguration());
+ DynamicDistributedLogConfiguration conf =
+ new DynamicDistributedLogConfiguration(defaultConf);
List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(properties);
- confSub = new ConfigurationSubscription(conf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit);
+ confSub = new ConfigurationSubscription(
+ conf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit);
this.dynConf = Optional.of(conf);
} catch (MalformedURLException ex) {
throw new ConfigurationException(ex);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
index 5b19f6c..b3b4c4e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
@@ -17,7 +17,8 @@
*/
package com.twitter.distributedlog.service.config;
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogConstants;
@@ -29,7 +30,7 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.SystemConfiguration;
/**
- * Configuration for DistributedLog Server
+ * Configuration for DistributedLog Server.
*/
public class ServerConfiguration extends CompositeConfiguration {
@@ -43,36 +44,36 @@ public class ServerConfiguration extends CompositeConfiguration {
}
// Server DLSN version
- protected final static String SERVER_DLSN_VERSION = "server_dlsn_version";
- protected final static byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1;
+ protected static final String SERVER_DLSN_VERSION = "server_dlsn_version";
+ protected static final byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1;
// Server Durable Write Enable/Disable Flag
- protected final static String SERVER_DURABLE_WRITE_ENABLED = "server_durable_write_enabled";
- protected final static boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true;
+ protected static final String SERVER_DURABLE_WRITE_ENABLED = "server_durable_write_enabled";
+ protected static final boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true;
// Server Region Id
- protected final static String SERVER_REGION_ID = "server_region_id";
- protected final static int SERVER_REGION_ID_DEFAULT = DistributedLogConstants.LOCAL_REGION_ID;
+ protected static final String SERVER_REGION_ID = "server_region_id";
+ protected static final int SERVER_REGION_ID_DEFAULT = DistributedLogConstants.LOCAL_REGION_ID;
// Server Port
- protected final static String SERVER_PORT = "server_port";
- protected final static int SERVER_PORT_DEFAULT = 0;
+ protected static final String SERVER_PORT = "server_port";
+ protected static final int SERVER_PORT_DEFAULT = 0;
// Server Shard Id
- protected final static String SERVER_SHARD_ID = "server_shard";
- protected final static int SERVER_SHARD_ID_DEFAULT = -1;
+ protected static final String SERVER_SHARD_ID = "server_shard";
+ protected static final int SERVER_SHARD_ID_DEFAULT = -1;
// Server Threads
- protected final static String SERVER_NUM_THREADS = "server_threads";
- protected final static int SERVER_NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors();
+ protected static final String SERVER_NUM_THREADS = "server_threads";
+ protected static final int SERVER_NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors();
// Server enable per stream stat
- protected final static String SERVER_ENABLE_PERSTREAM_STAT = "server_enable_perstream_stat";
- protected final static boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true;
+ protected static final String SERVER_ENABLE_PERSTREAM_STAT = "server_enable_perstream_stat";
+ protected static final boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true;
// Server graceful shutdown period (in millis)
- protected final static String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = "server_graceful_shutdown_period_ms";
- protected final static long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 0L;
+ protected static final String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = "server_graceful_shutdown_period_ms";
+ protected static final long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 0L;
// Server service timeout
public static final String SERVER_SERVICE_TIMEOUT_MS = "server_service_timeout_ms";
@@ -86,17 +87,18 @@ public class ServerConfiguration extends CompositeConfiguration {
// Server stream probation timeout
public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS = "server_stream_probation_timeout_ms";
public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD = "streamProbationTimeoutMs";
- public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60*1000*5;
+ public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60 * 1000 * 5;
// Server stream to partition converter
- protected final static String SERVER_STREAM_PARTITION_CONVERTER_CLASS = "stream_partition_converter_class";
+ protected static final String SERVER_STREAM_PARTITION_CONVERTER_CLASS = "stream_partition_converter_class";
// Use hostname as the allocator pool name
- protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME
- = "server_use_hostname_as_allocator_pool_name";
+ protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME =
+ "server_use_hostname_as_allocator_pool_name";
protected static final boolean SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false;
//Configure refresh interval for calculating resource placement in seconds
- public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S = "server_resource_placement_refresh_interval_sec";
+ public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S =
+ "server_resource_placement_refresh_interval_sec";
public static final int SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120;
public ServerConfiguration() {
@@ -105,7 +107,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Load configurations from {@link DistributedLogConfiguration}
+ * Load configurations from {@link DistributedLogConfiguration}.
*
* @param dlConf
* distributedlog configuration
@@ -137,7 +139,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Set the flag to enable/disable durable write
+ * Set the flag to enable/disable durable write.
*
* @param enabled
* flag to enable/disable durable write
@@ -149,7 +151,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Is durable write enabled?
+ * Is durable write enabled.
*
* @return true if waiting writes to be durable. otherwise false.
*/
@@ -158,7 +160,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Set the region id used to instantiate DistributedLogNamespace
+ * Set the region id used to instantiate DistributedLogNamespace.
*
* @param regionId
* region id
@@ -170,8 +172,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Get the region id used to instantiate
- * {@link com.twitter.distributedlog.namespace.DistributedLogNamespace}
+ * Get the region id used to instantiate {@link com.twitter.distributedlog.namespace.DistributedLogNamespace}.
*
* @return region id used to instantiate DistributedLogNamespace
*/
@@ -213,8 +214,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Get the shard id of this server. It would be used to instantiate the client id
- * used for DistributedLogNamespace.
+ * Get the shard id of this server.
+ *
+ * <p>It would be used to instantiate the client id used for DistributedLogNamespace.
*
* @return shard id of this server.
*/
@@ -286,7 +288,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Get timeout for stream op execution in proxy layer. 0 disables timeout.
+ * Get timeout for stream op execution in proxy layer.
+ *
+ * <p>0 disables timeout.
*
* @return timeout for stream operation in proxy layer.
*/
@@ -296,7 +300,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Set timeout for stream op execution in proxy layer. 0 disables timeout.
+ * Set timeout for stream op execution in proxy layer.
+ *
+ * <p>0 disables timeout.
*
* @param timeoutMs
* timeout for stream operation in proxy layer.
@@ -308,7 +314,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Get timeout for closing writer in proxy layer. 0 disables timeout.
+ * Get timeout for closing writer in proxy layer.
+ *
+ * <p>0 disables timeout.
*
* @return timeout for closing writer in proxy layer.
*/
@@ -317,7 +325,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Set timeout for closing writer in proxy layer. 0 disables timeout.
+ * Set timeout for closing writer in proxy layer.
+ *
+ * <p>0 disables timeout.
*
* @param timeoutMs
* timeout for closing writer in proxy layer.
@@ -329,8 +339,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * After service timeout, how long should stream be kept in cache in probationary state in order
- * to prevent reacquire. In millisec.
+ * How long should stream be kept in cache in probationary state after service timeout.
+ *
+ * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds.
*
* @return stream probation timeout in ms.
*/
@@ -340,10 +351,12 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * After service timeout, how long should stream be kept in cache in probationary state in order
- * to prevent reacquire. In millisec.
+ * How long should stream be kept in cache in probationary state after service timeout.
+ *
+ * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds.
*
* @param timeoutMs probation timeout in ms.
+ * @return server configuration
*/
public ServerConfiguration setStreamProbationTimeoutMs(long timeoutMs) {
setProperty(SERVER_STREAM_PROBATION_TIMEOUT_MS, timeoutMs);
@@ -357,7 +370,8 @@ public class ServerConfiguration extends CompositeConfiguration {
* stream partition converter class
* @return server configuration
*/
- public ServerConfiguration setStreamPartitionConverterClass(Class<? extends StreamPartitionConverter> converterClass) {
+ public ServerConfiguration setStreamPartitionConverterClass(
+ Class<? extends StreamPartitionConverter> converterClass) {
setProperty(SERVER_STREAM_PARTITION_CONVERTER_CLASS, converterClass.getName());
return this;
}
@@ -391,7 +405,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Get if use hostname as the allocator pool name
+ * Get if use hostname as the allocator pool name.
*
* @return true if use hostname as the allocator pool name. otherwise, use
* {@link #getServerShardId()} as the allocator pool name.
@@ -412,15 +426,17 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Validate the configuration
+ * Validate the configuration.
+ *
+ * @throws IllegalStateException when there are any invalid settings.
*/
public void validate() {
byte dlsnVersion = getDlsnVersion();
- Preconditions.checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= DLSN.VERSION1,
+ checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= DLSN.VERSION1,
"Unknown dlsn version " + dlsnVersion);
- Preconditions.checkArgument(getServerThreads() > 0,
+ checkArgument(getServerThreads() > 0,
"Invalid number of server threads : " + getServerThreads());
- Preconditions.checkArgument(getServerShardId() >= 0,
+ checkArgument(getServerShardId() >= 0,
"Invalid server shard id : " + getServerShardId());
}