You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:24 UTC
[10/31] incubator-distributedlog git commit: DL-157: resource
placement for write proxy
DL-157: resource placement for write proxy
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/0591d067
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/0591d067
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/0591d067
Branch: refs/heads/master
Commit: 0591d067f05617bc534a662b6f9a014192cbe3a5
Parents: 34fa16b
Author: Jordan Bull <jb...@twitter.com>
Authored: Tue Dec 13 11:11:03 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Thu Dec 29 02:10:32 2016 -0800
----------------------------------------------------------------------
.../distributedlog/benchmark/Benchmarker.java | 16 +-
.../distributedlog/benchmark/WriterWorker.java | 6 +-
.../client/DistributedLogClientImpl.java | 25 +--
.../service/DistributedLogClientBuilder.java | 1 +
.../BKDistributedLogNamespace.java | 8 +-
distributedlog-service/pom.xml | 39 ++++
.../service/DistributedLogCluster.java | 7 +-
.../service/DistributedLogServer.java | 96 ++++++----
.../service/DistributedLogServerApp.java | 7 +-
.../service/DistributedLogServiceImpl.java | 90 ++++-----
.../service/config/ServerConfiguration.java | 12 ++
.../service/placement/EqualLoadAppraiser.java | 37 ++++
.../placement/LeastLoadPlacementPolicy.java | 192 +++++++++++++++++++
.../service/placement/LoadAppraiser.java | 25 +++
.../service/placement/PlacementPolicy.java | 148 ++++++++++++++
.../placement/PlacementStateManager.java | 65 +++++++
.../service/placement/ServerLoad.java | 152 +++++++++++++++
.../service/placement/StreamLoad.java | 109 +++++++++++
.../placement/ZKPlacementStateManager.java | 172 +++++++++++++++++
.../src/main/thrift/metadata.thrift | 29 +++
.../service/TestDistributedLogService.java | 48 ++---
.../placement/TestLeastLoadPlacementPolicy.java | 160 ++++++++++++++++
.../service/placement/TestServerLoad.java | 48 +++++
.../service/placement/TestStreamLoad.java | 35 ++++
.../placement/TestZKPlacementStateManager.java | 123 ++++++++++++
25 files changed, 1516 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
index 87d3b53..5b04a05 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
@@ -85,6 +85,7 @@ public class Benchmarker {
boolean enableBatching = false;
int batchBufferSize = 256 * 1024;
int batchFlushIntervalMicros = 2000;
+ String routingServiceFinagleNameString;
final DistributedLogConfiguration conf = new DistributedLogConfiguration();
final StatsReceiver statsReceiver = new OstrichStatsReceiver();
@@ -125,6 +126,7 @@ public class Benchmarker {
options.addOption("bt", "enable-batch", false, "Enable batching on writers");
options.addOption("bbs", "batch-buffer-size", true, "The batch buffer size in bytes");
options.addOption("bfi", "batch-flush-interval", true, "The batch buffer flush interval in micros");
+ options.addOption("rs", "routing-service", true, "The routing service finagle name for server-side routing");
options.addOption("h", "help", false, "Print usage.");
}
@@ -221,6 +223,9 @@ public class Benchmarker {
if (cmdline.hasOption("rb")) {
recvBufferSize = Integer.parseInt(cmdline.getOptionValue("rb"));
}
+ if (cmdline.hasOption("rs")) {
+ routingServiceFinagleNameString = cmdline.getOptionValue("rs");
+ }
thriftmux = cmdline.hasOption("mx");
handshakeWithClientInfo = cmdline.hasOption("hsci");
readFromHead = cmdline.hasOption("rfh");
@@ -311,7 +316,8 @@ public class Benchmarker {
recvBufferSize,
enableBatching,
batchBufferSize,
- batchFlushIntervalMicros);
+ batchFlushIntervalMicros,
+ routingServiceFinagleNameString);
}
protected WriterWorker createWriteWorker(
@@ -335,7 +341,8 @@ public class Benchmarker {
int recvBufferSize,
boolean enableBatching,
int batchBufferSize,
- int batchFlushIntervalMicros) {
+ int batchFlushIntervalMicros,
+ String routingServiceFinagleNameString) {
return new WriterWorker(
streamPrefix,
uri,
@@ -357,7 +364,8 @@ public class Benchmarker {
recvBufferSize,
enableBatching,
batchBufferSize,
- batchFlushIntervalMicros);
+ batchFlushIntervalMicros,
+ routingServiceFinagleNameString);
}
Worker runDLWriter() throws IOException {
@@ -453,7 +461,7 @@ public class Benchmarker {
try {
benchmarker.run();
} catch (Exception e) {
- logger.info("Benchmark quitted due to : ", e);
+ logger.info("Benchmark quit due to : ", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
index 46229b3..dc5a6e2 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
@@ -81,6 +81,7 @@ public class WriterWorker implements Worker {
final boolean enableBatching;
final int batchBufferSize;
final int batchFlushIntervalMicros;
+ private final String routingServiceFinagleName;
volatile boolean running = true;
@@ -113,7 +114,8 @@ public class WriterWorker implements Worker {
int recvBufferSize,
boolean enableBatching,
int batchBufferSize,
- int batchFlushIntervalMicros) {
+ int batchFlushIntervalMicros,
+ String routingServiceFinagleName) {
checkArgument(startStreamId <= endStreamId);
checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty());
this.streamPrefix = streamPrefix;
@@ -143,6 +145,7 @@ public class WriterWorker implements Worker {
this.finagleNames = finagleNames;
this.serverSets = createServerSets(serverSetPaths);
this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ this.routingServiceFinagleName = routingServiceFinagleName;
// Streams
streamNames = new ArrayList<String>(endStreamId - startStreamId);
@@ -197,6 +200,7 @@ public class WriterWorker implements Worker {
.periodicOwnershipSyncIntervalMs(TimeUnit.MINUTES.toMillis(5))
.periodicDumpOwnershipCache(true)
.handshakeTracing(true)
+ .serverRoutingServiceFinagleNameStr(routingServiceFinagleName)
.name("writer");
if (!finagleNames.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
index 634afe1..1077cd0 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
@@ -73,6 +73,7 @@ import com.twitter.util.Promise;
import com.twitter.util.Return;
import com.twitter.util.Throw;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -852,18 +853,18 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
}
}
- private void retryGetOwnerFromRoutingServer(final StreamOp op,
+ private void retryGetOwnerFromResourcePlacementServer(final StreamOp op,
final Promise<SocketAddress> getOwnerPromise,
final Throwable cause) {
if (op.shouldTimeout()) {
op.fail(null, cause);
return;
}
- getOwnerFromRoutingServer(op, getOwnerPromise);
+ getOwnerFromResourcePlacementServer(op, getOwnerPromise);
}
- private void getOwnerFromRoutingServer(final StreamOp op,
- final Promise<SocketAddress> getOwnerPromise) {
+ private void getOwnerFromResourcePlacementServer(final StreamOp op,
+ final Promise<SocketAddress> getOwnerPromise) {
clusterClient.get().getService().getOwner(op.stream, op.ctx)
.addEventListener(new FutureEventListener<WriteResponse>() {
@Override
@@ -875,18 +876,20 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
public void onSuccess(WriteResponse value) {
if (StatusCode.FOUND == value.getHeader().getCode()
&& null != value.getHeader().getLocation()) {
- SocketAddress addr;
try {
- addr = DLSocketAddress.deserialize(value.getHeader().getLocation()).getSocketAddress();
+ InetSocketAddress addr = DLSocketAddress.deserialize(
+ value.getHeader().getLocation()
+ ).getSocketAddress();
+ getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr));
} catch (IOException e) {
// retry from the routing server again
- retryGetOwnerFromRoutingServer(op, getOwnerPromise, e);
+ logger.error("ERROR in getOwner", e);
+ retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, e);
return;
}
- getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr));
} else {
// retry from the routing server again
- retryGetOwnerFromRoutingServer(op, getOwnerPromise,
+ retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise,
new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown"));
}
}
@@ -896,7 +899,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
private Future<SocketAddress> getOwner(final StreamOp op) {
if (clusterClient.isPresent()) {
final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>();
- getOwnerFromRoutingServer(op, getOwnerPromise);
+ getOwnerFromResourcePlacementServer(op, getOwnerPromise);
return getOwnerPromise;
}
// pickup host by hashing
@@ -1190,7 +1193,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
ownershipCache.updateOwner(stream, ownerAddr);
} catch (IOException e) {
logger.warn("Invalid ownership {} found for stream {} : ",
- new Object[] { location, stream, e });
+ new Object[] { location, stream, e });
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
index 44d93ee..3f65aff 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
@@ -96,6 +96,7 @@ public final class DistributedLogClientBuilder {
newBuilder.statsReceiver = builder.statsReceiver;
newBuilder.streamStatsReceiver = builder.streamStatsReceiver;
newBuilder.enableRegionStats = builder.enableRegionStats;
+ newBuilder.serverRoutingServiceFinagleName = builder.serverRoutingServiceFinagleName;
newBuilder.clientConfig = ClientConfig.newConfig(builder.clientConfig);
return newBuilder;
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 2c9fe44..e7f29cc 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -609,10 +609,10 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
return rootPath;
}
- private static ZooKeeperClientBuilder createDLZKClientBuilder(String zkcName,
- DistributedLogConfiguration conf,
- String zkServers,
- StatsLogger statsLogger) {
+ public static ZooKeeperClientBuilder createDLZKClientBuilder(String zkcName,
+ DistributedLogConfiguration conf,
+ String zkServers,
+ StatsLogger statsLogger) {
RetryPolicy retryPolicy = null;
if (conf.getZKNumRetries() > 0) {
retryPolicy = new BoundExponentialBackoffRetryPolicy(
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-service/pom.xml b/distributedlog-service/pom.xml
index b7b6ff8..e74d486 100644
--- a/distributedlog-service/pom.xml
+++ b/distributedlog-service/pom.xml
@@ -117,10 +117,49 @@
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.5.0-1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>scrooge-core_2.11</artifactId>
+ <version>${scrooge.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>2.2.0-incubating</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
<plugins>
<plugin>
+ <groupId>com.twitter</groupId>
+ <artifactId>scrooge-maven-plugin</artifactId>
+ <version>${scrooge-maven-plugin.version}</version>
+ <configuration>
+ <language>java</language>
+ </configuration>
+ <executions>
+ <execution>
+ <id>thrift-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
index 0ce335b..3225ced 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
@@ -22,6 +22,7 @@ import com.twitter.distributedlog.LocalDLMEmulator;
import com.twitter.distributedlog.client.routing.SingleHostRoutingService;
import com.twitter.distributedlog.metadata.BKDLConfig;
import com.twitter.distributedlog.metadata.DLMetadata;
+import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import com.twitter.finagle.builder.Server;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -228,9 +229,11 @@ public class DistributedLogCluster {
routingService,
new NullStatsProvider(),
proxyPort,
- thriftmux);
+ thriftmux,
+ new EqualLoadAppraiser());
routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort));
routingService.startService();
+ serverPair.getLeft().startPlacementPolicy();
success = true;
} catch (BindException be) {
retries++;
@@ -244,7 +247,7 @@ public class DistributedLogCluster {
}
}
- LOG.info("Runnning DL on port {}", proxyPort);
+ LOG.info("Running DL on port {}", proxyPort);
dlServer = serverPair;
address = DLSocketAddress.getSocketAddress(proxyPort);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
index 185ea82..a9ba125 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
@@ -17,8 +17,32 @@
*/
package com.twitter.distributedlog.service;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.Tuple2;
+
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.client.routing.RoutingService;
import com.twitter.distributedlog.config.DynamicConfigurationFactory;
@@ -31,6 +55,8 @@ import com.twitter.distributedlog.service.config.NullStreamConfigProvider;
import com.twitter.distributedlog.service.config.ServerConfiguration;
import com.twitter.distributedlog.service.config.ServiceStreamConfigProvider;
import com.twitter.distributedlog.service.config.StreamConfigProvider;
+import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
+import com.twitter.distributedlog.service.placement.LoadAppraiser;
import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
import com.twitter.distributedlog.thrift.service.DistributedLogService;
@@ -46,31 +72,11 @@ import com.twitter.finagle.thrift.ClientIdRequiredFilter;
import com.twitter.finagle.thrift.ThriftServerFramedCodec;
import com.twitter.finagle.transport.Transport;
import com.twitter.util.Duration;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Tuple2;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
public class DistributedLogServer {
static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
+ private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName();
private DistributedLogServiceImpl dlService = null;
private Server server = null;
@@ -89,6 +95,7 @@ public class DistributedLogServer {
private final Optional<Integer> statsPort;
private final Optional<Integer> shardId;
private final Optional<Boolean> announceServerSet;
+ private final Optional<String> loadAppraiserClassStr;
private final Optional<Boolean> thriftmux;
DistributedLogServer(Optional<String> uri,
@@ -98,6 +105,7 @@ public class DistributedLogServer {
Optional<Integer> statsPort,
Optional<Integer> shardId,
Optional<Boolean> announceServerSet,
+ Optional<String> loadAppraiserClass,
Optional<Boolean> thriftmux,
RoutingService routingService,
StatsReceiver statsReceiver,
@@ -113,9 +121,10 @@ public class DistributedLogServer {
this.routingService = routingService;
this.statsReceiver = statsReceiver;
this.statsProvider = statsProvider;
+ this.loadAppraiserClassStr = loadAppraiserClass;
}
- public void runServer() throws ConfigurationException, IllegalArgumentException, IOException {
+ public void runServer() throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
if (!uri.isPresent()) {
throw new IllegalArgumentException("No distributedlog uri provided.");
}
@@ -174,6 +183,9 @@ public class DistributedLogServer {
IdentityStreamPartitionConverter.class.getName());
converter = new IdentityStreamPartitionConverter();
}
+ Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER));
+ LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass);
+ logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get() + " Instantiated " + loadAppraiser.getClass().getCanonicalName());
StreamConfigProvider streamConfProvider =
getStreamConfigProvider(dlConf, converter);
@@ -193,7 +205,8 @@ public class DistributedLogServer {
keepAliveLatch,
statsReceiver,
thriftmux.isPresent(),
- streamConfProvider);
+ streamConfProvider,
+ loadAppraiser);
this.dlService = serverPair.getLeft();
this.server = serverPair.getRight();
@@ -203,6 +216,8 @@ public class DistributedLogServer {
// start the routing service after announced
routingService.startService();
logger.info("Started the routing service.");
+ dlService.startPlacementPolicy();
+ logger.info("Started the placement policy.");
}
protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf) {
@@ -256,7 +271,8 @@ public class DistributedLogServer {
RoutingService routingService,
StatsProvider provider,
int port,
- boolean thriftmux) throws IOException {
+ boolean thriftmux,
+ LoadAppraiser loadAppraiser) throws IOException {
return runServer(serverConf,
dlConf,
@@ -269,7 +285,8 @@ public class DistributedLogServer {
new CountDownLatch(0),
new NullStatsReceiver(),
thriftmux,
- new NullStreamConfigProvider());
+ new NullStreamConfigProvider(),
+ loadAppraiser);
}
static Pair<DistributedLogServiceImpl, Server> runServer(
@@ -284,7 +301,8 @@ public class DistributedLogServer {
CountDownLatch keepAliveLatch,
StatsReceiver statsReceiver,
boolean thriftmux,
- StreamConfigProvider streamConfProvider) throws IOException {
+ StreamConfigProvider streamConfProvider,
+ LoadAppraiser loadAppraiser) throws IOException {
logger.info("Running server @ uri {}.", dlUri);
boolean perStreamStatsEnabled = serverConf.isPerStreamStatEnabled();
@@ -297,16 +315,17 @@ public class DistributedLogServer {
// dl service
DistributedLogServiceImpl dlService = new DistributedLogServiceImpl(
- serverConf,
- dlConf,
- dynDlConf,
- streamConfProvider,
- dlUri,
- partitionConverter,
- routingService,
- provider.getStatsLogger(""),
- perStreamStatsLogger,
- keepAliveLatch);
+ serverConf,
+ dlConf,
+ dynDlConf,
+ streamConfProvider,
+ dlUri,
+ partitionConverter,
+ routingService,
+ provider.getStatsLogger(""),
+ perStreamStatsLogger,
+ keepAliveLatch,
+ loadAppraiser);
StatsReceiver serviceStatsReceiver = statsReceiver.scope("service");
StatsLogger serviceStatsLogger = provider.getStatsLogger("service");
@@ -400,6 +419,7 @@ public class DistributedLogServer {
* @throws ConfigurationException
* @throws IllegalArgumentException
* @throws IOException
+ * @throws ClassNotFoundException
*/
public static DistributedLogServer runServer(
Optional<String> uri,
@@ -409,11 +429,12 @@ public class DistributedLogServer {
Optional<Integer> statsPort,
Optional<Integer> shardId,
Optional<Boolean> announceServerSet,
+ Optional<String> loadAppraiserClass,
Optional<Boolean> thriftmux,
RoutingService routingService,
StatsReceiver statsReceiver,
StatsProvider statsProvider)
- throws ConfigurationException, IllegalArgumentException, IOException {
+ throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
final DistributedLogServer server = new DistributedLogServer(
uri,
@@ -423,6 +444,7 @@ public class DistributedLogServer {
statsPort,
shardId,
announceServerSet,
+ loadAppraiserClass,
thriftmux,
routingService,
statsReceiver,
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
index af36307..1c3d8d4 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
@@ -68,6 +68,7 @@ public class DistributedLogServerApp {
options.addOption("pd", "stats-provider", true, "DistributedLog Stats Provider");
options.addOption("si", "shard-id", true, "DistributedLog Shard ID");
options.addOption("a", "announce", false, "ServerSet Path to Announce");
+ options.addOption("la", "load-appraiser", true, "LoadAppraiser Implementation to Use");
options.addOption("mx", "thriftmux", false, "Is thriftmux enabled");
}
@@ -97,10 +98,13 @@ public class DistributedLogServerApp {
} catch (IOException ie) {
logger.error("Failed to start distributedlog server : ", ie);
Runtime.getRuntime().exit(-1);
+ } catch (ClassNotFoundException cnf) {
+ logger.error("Failed to start distributedlog server : ", cnf);
+ Runtime.getRuntime().exit(-1);
}
}
- private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException {
+ private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
final StatsReceiver statsReceiver = NullStatsReceiver.get();
Optional<String> confOptional = getOptionalStringArg(cmdline, "c");
DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
@@ -142,6 +146,7 @@ public class DistributedLogServerApp {
getOptionalIntegerArg(cmdline, "sp"),
getOptionalIntegerArg(cmdline, "si"),
getOptionalBooleanArg(cmdline, "a"),
+ getOptionalStringArg(cmdline, "la"),
getOptionalBooleanArg(cmdline, "mx"),
routingService,
statsReceiver,
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 5c5b5af..e7974c7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.net.InetSocketAddressHelper;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.acl.AccessControlManager;
@@ -36,8 +37,14 @@ import com.twitter.distributedlog.exceptions.TooManyStreamsException;
import com.twitter.distributedlog.feature.AbstractFeatureProvider;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.rate.MovingAverageRate;
+import com.twitter.distributedlog.rate.MovingAverageRateFactory;
import com.twitter.distributedlog.service.config.ServerConfiguration;
import com.twitter.distributedlog.service.config.StreamConfigProvider;
+import com.twitter.distributedlog.service.placement.LeastLoadPlacementPolicy;
+import com.twitter.distributedlog.service.placement.LoadAppraiser;
+import com.twitter.distributedlog.service.placement.PlacementPolicy;
+import com.twitter.distributedlog.service.placement.ZKPlacementStateManager;
import com.twitter.distributedlog.service.stream.BulkWriteOp;
import com.twitter.distributedlog.service.stream.DeleteOp;
import com.twitter.distributedlog.service.stream.admin.CreateOp;
@@ -67,32 +74,19 @@ import com.twitter.distributedlog.thrift.service.ServerStatus;
import com.twitter.distributedlog.thrift.service.StatusCode;
import com.twitter.distributedlog.thrift.service.WriteContext;
import com.twitter.distributedlog.thrift.service.WriteResponse;
-import com.twitter.distributedlog.rate.MovingAverageRateFactory;
-import com.twitter.distributedlog.rate.MovingAverageRate;
import com.twitter.distributedlog.util.ConfUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.finagle.NoBrokersAvailableException;
import com.twitter.util.Await;
import com.twitter.util.Duration;
+import com.twitter.util.Function;
import com.twitter.util.Function0;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
-import com.twitter.util.Timer;
import com.twitter.util.ScheduledThreadPoolTimer;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
+import com.twitter.util.Timer;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
@@ -102,6 +96,17 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.runtime.BoxedUnit;
+
public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface,
FatalErrorHandler {
@@ -113,6 +118,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
private final DistributedLogConfiguration dlConfig;
private final DistributedLogNamespace dlNamespace;
private final int serverRegionId;
+ private final PlacementPolicy placementPolicy;
private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT;
private final ReentrantReadWriteLock closeLock =
new ReentrantReadWriteLock();
@@ -157,6 +163,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
private final Gauge<Number> movingAvgBpsGauge;
private final Gauge<Number> streamAcquiredGauge;
private final Gauge<Number> streamCachedGauge;
+ private final int shard;
DistributedLogServiceImpl(ServerConfiguration serverConf,
DistributedLogConfiguration dlConf,
@@ -167,7 +174,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
RoutingService routingService,
StatsLogger statsLogger,
StatsLogger perStreamStatsLogger,
- CountDownLatch keepAliveLatch)
+ CountDownLatch keepAliveLatch,
+ LoadAppraiser loadAppraiser)
throws IOException {
// Configuration.
this.serverConfig = serverConf;
@@ -177,7 +185,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
this.serverRegionId = serverConf.getRegionId();
this.streamPartitionConverter = converter;
int serverPort = serverConf.getServerPort();
- int shard = serverConf.getServerShardId();
+ this.shard = serverConf.getServerShardId();
int numThreads = serverConf.getServerThreads();
this.clientId = DLSocketAddress.toLockId(DLSocketAddress.getSocketAddress(serverPort), shard);
String allocatorPoolName = ServerUtils.getLedgerAllocatorPoolName(
@@ -264,6 +272,15 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
streamManager,
limiterDisabledFeature);
+ this.placementPolicy = new LeastLoadPlacementPolicy(
+ loadAppraiser,
+ routingService,
+ dlNamespace,
+ new ZKPlacementStateManager(uri, dlConf, statsLogger),
+ Duration.fromSeconds(serverConf.getResourcePlacementRefreshInterval()),
+ statsLogger);
+ logger.info("placement started");
+
// Stats
this.statsLogger = statsLogger;
@@ -501,35 +518,13 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(clientId)));
}
- Stream stream = streamManager.getStream(streamName);
- String owner;
- if (null != stream && null != (owner = stream.getOwner())) {
- return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(owner)));
- }
-
- RoutingService.RoutingContext routingContext = RoutingService.RoutingContext.of(regionResolver);
-
- if (ctx.isSetTriedHosts()) {
- for (String triedHost : ctx.getTriedHosts()) {
- routingContext.addTriedHost(
- DLSocketAddress.parseSocketAddress(triedHost), StatusCode.STREAM_UNAVAILABLE);
- }
- }
-
- try {
- SocketAddress host = routingService.getHost(streamName, routingContext);
- if (host instanceof InetSocketAddress) {
- // use shard id '-1' as the shard id here won't be used for redirection
- return Future.value(new WriteResponse(
- ResponseUtils.ownerToHeader(DLSocketAddress.toLockId((InetSocketAddress) host, -1))));
- } else {
- return Future.value(new WriteResponse(
- ResponseUtils.streamUnavailableHeader()));
+ return placementPolicy.placeStream(streamName).map(new Function<String, WriteResponse>() {
+ @Override
+ public WriteResponse apply(String server) {
+ String host = DLSocketAddress.toLockId(InetSocketAddressHelper.parse(server), -1);
+ return new WriteResponse(ResponseUtils.ownerToHeader(host));
}
- } catch (NoBrokersAvailableException e) {
- return Future.value(new WriteResponse(
- ResponseUtils.streamUnavailableHeader()));
- }
+ });
}
@@ -689,6 +684,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
// Stop the timer.
timer.stop();
+ placementPolicy.close();
// clean up gauge
unregisterGauge();
@@ -704,6 +700,10 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
}
}
+ protected void startPlacementPolicy() {
+ this.placementPolicy.start(shard == 0);
+ }
+
@Override
public void notifyFatalError() {
triggerShutdown();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
index 9a9e83c..5b19f6c 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
@@ -95,6 +95,9 @@ public class ServerConfiguration extends CompositeConfiguration {
protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME
= "server_use_hostname_as_allocator_pool_name";
protected static final boolean SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false;
+ //Configure refresh interval for calculating resource placement in seconds
+ public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S = "server_resource_placement_refresh_interval_sec";
+ public static final int SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120;
public ServerConfiguration() {
super();
@@ -399,6 +402,15 @@ public class ServerConfiguration extends CompositeConfiguration {
SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT);
}
+ public ServerConfiguration setResourcePlacementRefreshInterval(int refreshIntervalSecs) {
+ setProperty(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, refreshIntervalSecs);
+ return this;
+ }
+
+ public int getResourcePlacementRefreshInterval() {
+ return getInt(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT);
+ }
+
/**
* Validate the configuration
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
new file mode 100644
index 0000000..144e358
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import com.twitter.util.Future;
+
+/**
+ * Created for those who hold these truths to be self-evident, that all streams are created equal,
+ * that they are endowed by their creator with certain unalienable loads, that among these are
+ * Uno, Eins, and One.
+ */
+public class EqualLoadAppraiser implements LoadAppraiser {
+ @Override
+ public Future<StreamLoad> getStreamLoad(String stream) {
+ return Future.value(new StreamLoad(stream, 1));
+ }
+
+ @Override
+ public Future<Void> refreshCache() {
+ return Future.value(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
new file mode 100644
index 0000000..e4c8128
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import scala.Function1;
+import scala.runtime.BoxedUnit;
+
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.Stats;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.Futures;
+
+/**
+ * A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as
+ * evenly as possible across all shards. The LoadAppraiser remains responsible for determining what
+ * the load of a server would be. This placement policy then distributes these streams across the
+ * servers.
+ */
+public class LeastLoadPlacementPolicy extends PlacementPolicy {
+ private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+ private Map<String, String> streamToServer = new HashMap<String, String>();
+
+ public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
+ DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+ Duration refreshInterval, StatsLogger statsLogger) {
+ super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
+ statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ if (serverLoads.size() > 0) {
+ return serverLoads.last().getLoad() - serverLoads.first().getLoad();
+ } else {
+ return getDefaultValue();
+ }
+ }
+ });
+ }
+
+ @Override
+ public Future<String> placeStream(String stream) {
+ if (streamToServer.containsKey(stream)) {
+ return Future.value(streamToServer.get(stream));
+ }
+ Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream);
+ return streamLoadFuture.map(new Function<StreamLoad, String>() {
+ @Override
+ public String apply(StreamLoad streamLoad) {
+ return placeStreamSynchronized(streamLoad);
+ }
+ });
+ }
+
+ synchronized private String placeStreamSynchronized(StreamLoad streamLoad) {
+ ServerLoad serverLoad = serverLoads.pollFirst();
+ serverLoad.addStream(streamLoad);
+ serverLoads.add(serverLoad);
+ return serverLoad.getServer();
+ }
+
+ @Override
+ public void refresh() {
+ logger.info("Refreshing server loads.");
+ Future<Void> refresh = loadAppraiser.refreshCache();
+ final Set<String> servers = getServers();
+ final Set<String> allStreams = getStreams();
+ Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(new Function<Void, Future<TreeSet<ServerLoad>>>() {
+ @Override
+ public Future<TreeSet<ServerLoad>> apply(Void v1) {
+ return calculate(servers, allStreams);
+ }
+ });
+ serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+ try {
+ updateServerLoads(serverLoads);
+ } catch (PlacementStateManager.StateManagerSaveException e) {
+ logger.error("The refreshed mapping could not be persisted and will not be used.", e);
+ }
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+
+ synchronized private void updateServerLoads(TreeSet<ServerLoad> serverLoads) throws PlacementStateManager.StateManagerSaveException {
+ this.placementStateManager.saveOwnership(serverLoads);
+ this.streamToServer = serverLoadsToMap(serverLoads);
+ this.serverLoads = serverLoads;
+ }
+
+ @Override
+ synchronized public void load(TreeSet<ServerLoad> serverLoads) {
+ this.serverLoads = serverLoads;
+ this.streamToServer = serverLoadsToMap(serverLoads);
+ }
+
+ public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) {
+ logger.info("Calculating server loads");
+ final long startTime = System.currentTimeMillis();
+ ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size());
+
+ for (String stream: streams) {
+ Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream);
+ futures.add(streamLoad);
+ }
+
+ return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() {
+ @Override
+ public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) {
+ /* Sort streamLoads so largest streams are placed first for better balance */
+ TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>();
+ for (StreamLoad streamLoad: streamLoads) {
+ streamQueue.add(streamLoad);
+ }
+
+ TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+ for (String server: servers) {
+ ServerLoad serverLoad = new ServerLoad(server);
+ if (!streamQueue.isEmpty()) {
+ serverLoad.addStream(streamQueue.pollFirst());
+ }
+ serverLoads.add(serverLoad);
+ }
+
+ while (!streamQueue.isEmpty()) {
+ ServerLoad serverLoad = serverLoads.pollFirst();
+ serverLoad.addStream(streamQueue.pollFirst());
+ serverLoads.add(serverLoad);
+ }
+ return serverLoads;
+ }
+ }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+ placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
+ return BoxedUnit.UNIT;
+ }
+ }).onFailure(new Function<Throwable, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Throwable t) {
+ logger.error("Failure calculating loads", t);
+ placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+
+ private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) {
+ HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size());
+ for (ServerLoad serverLoad: serverLoads) {
+ for (StreamLoad streamLoad: serverLoad.getStreamLoads()) {
+ streamToServer.put(streamLoad.getStream(), serverLoad.getServer());
+ }
+ }
+ return streamToServer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
new file mode 100644
index 0000000..784f106
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import com.twitter.util.Future;
+
+public interface LoadAppraiser {
+ Future<StreamLoad> getStreamLoad(String stream);
+ Future<Void> refreshCache();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
new file mode 100644
index 0000000..2044428
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+
+import scala.runtime.BoxedUnit;
+
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.service.DLSocketAddress;
+import com.twitter.util.Duration;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.ScheduledThreadPoolTimer;
+import com.twitter.util.Time;
+import com.twitter.util.Timer;
+
+/**
+ * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream
+ * contains. The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will
+ * then distributed these StreamLoads to the available servers in a manner defined by the
+ * implementation creating ServerLoad objects. It then saves this assignment via the
+ * PlacementStateManager.
+ */
+public abstract class PlacementPolicy {
+ protected final LoadAppraiser loadAppraiser;
+ protected final RoutingService routingService;
+ protected final DistributedLogNamespace namespace;
+ protected final PlacementStateManager placementStateManager;
+ private final Duration refreshInterval;
+
+ protected static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class);
+ protected final OpStatsLogger placementCalcStats;
+ private Timer placementRefreshTimer;
+
+ public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
+ DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+ Duration refreshInterval, StatsLogger statsLogger) {
+ this.loadAppraiser = loadAppraiser;
+ this.routingService = routingService;
+ this.namespace = namespace;
+ this.placementStateManager = placementStateManager;
+ this.refreshInterval = refreshInterval;
+ placementCalcStats = statsLogger.getOpStatsLogger("placement");
+ }
+
+ public Set<String> getServers() {
+ Set<SocketAddress> hosts = routingService.getHosts();
+ Set<String> servers = new HashSet<String>(hosts.size());
+ for (SocketAddress address: hosts) {
+ servers.add(DLSocketAddress.toString((InetSocketAddress) address));
+ }
+ return servers;
+ }
+
+ public Set<String> getStreams() {
+ Set<String> streams = new HashSet<String>();
+ try {
+ Iterator<String> logs = namespace.getLogs();
+ while (logs.hasNext()) {
+ streams.add(logs.next());
+ }
+ } catch (IOException e) {
+ logger.error("Could not get streams for placement policy.", e);
+ }
+ return streams;
+ }
+
+ public void start(boolean leader) {
+ logger.info("Starting placement policy");
+
+ TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
+ for (String server: getServers()) {
+ emptyServerLoads.add(new ServerLoad(server));
+ }
+ load(emptyServerLoads); //Pre-Load so streams don't NPE
+ if (leader) { //this is the leader shard
+ logger.info("Shard is leader. Scheduling timed refresh.");
+ placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true);
+ placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ refresh();
+ return BoxedUnit.UNIT;
+ }
+ });
+ } else {
+ logger.info("Shard is not leader. Watching for server load changes.");
+ placementStateManager.watch(new PlacementStateManager.PlacementCallback() {
+ @Override
+ public void callback(TreeSet<ServerLoad> serverLoads) {
+ if (!serverLoads.isEmpty()) {
+ load(serverLoads);
+ }
+ }
+ });
+ }
+ }
+
+ public void close() {
+ if (placementRefreshTimer != null) {
+ placementRefreshTimer.stop();
+ }
+ }
+
+ /**
+ * Places the stream on a server according to the policy and returns a future contianing the
+ * host that owns the stream upon completion
+ */
+ public abstract Future<String> placeStream(String stream);
+
+ /**
+ * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager
+ */
+ public abstract void refresh();
+
+ /**
+ * Loads the placement mapping into the node from a TreeSet of ServerLoads
+ */
+ public abstract void load(TreeSet<ServerLoad> serverLoads);
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
new file mode 100644
index 0000000..cd0d906
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.util.TreeSet;
+
+/**
+ * The PlacementStateManager handles persistence of calculated resource placements including, the
+ * storage once the calculated, and the retrieval by the other shards.
+ */
+public interface PlacementStateManager {
+
+ /**
+ * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage
+ */
+ void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException;
+
+ /**
+ * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage
+ */
+ TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException;
+
+ /**
+ * Watch the persistent storage for changes to the ownership mapping and calls placementCallback
+ * with the new mapping when a change occurs
+ */
+ void watch(PlacementCallback placementCallback);
+
+ interface PlacementCallback {
+ void callback(TreeSet<ServerLoad> serverLoads);
+ }
+
+ abstract class StateManagerException extends Exception {
+ public StateManagerException(String message, Exception e) {
+ super(message, e);
+ }
+ }
+
+ class StateManagerLoadException extends StateManagerException {
+ public StateManagerLoadException(Exception e) {
+ super("Load of Ownership failed", e);
+ }
+ }
+
+ class StateManagerSaveException extends StateManagerException {
+ public StateManagerSaveException(Exception e) {
+ super("Save of Ownership failed", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
new file mode 100644
index 0000000..d7fbcf2
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import com.twitter.distributedlog.service.placement.thrift.*;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TMemoryInputTransport;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+/**
+ * A comparable data object containing the identifier of the server, total appraised load on the
+ * server, and all streams assigned to the server by the resource placement mapping. This is
+ * comparable first by load and then by server so that a sorted data structure of these will be
+ * consistent across multiple calculations.
+ */
+public class ServerLoad implements Comparable {
+ private static final int BUFFER_SIZE = 4096000;
+ private final String server;
+ private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>();
+ private long load = 0l;
+
+ public ServerLoad(String server) {
+ this.server = server;
+ }
+
+ synchronized public long addStream(StreamLoad stream) {
+ this.load += stream.getLoad();
+ streamLoads.add(stream);
+ return this.load;
+ }
+
+ synchronized public long removeStream(String stream) {
+ for (StreamLoad streamLoad : streamLoads) {
+ if (streamLoad.stream.equals(stream)) {
+ this.load -= load;
+ streamLoads.remove(streamLoad);
+ return this.load;
+ }
+ }
+ return this.load; //Throwing an exception wouldn't help us as our logic should never reach here
+ }
+
+ public long getLoad() {
+ return load;
+ }
+
+ public Set<StreamLoad> getStreamLoads() {
+ return streamLoads;
+ }
+
+ public String getServer() {
+ return server;
+ }
+
+ protected com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() {
+ com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
+ = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
+ tServerLoad.setServer(server);
+ tServerLoad.setLoad(load);
+ ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads
+ = new ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad>();
+ for (StreamLoad streamLoad: streamLoads) {
+ tStreamLoads.add(streamLoad.toThrift());
+ }
+ tServerLoad.setStreams(tStreamLoads);
+ return tServerLoad;
+ }
+
+ public byte[] serialize() throws IOException {
+ TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+ TJSONProtocol protocol = new TJSONProtocol(transport);
+ try {
+ toThrift().write(protocol);
+ transport.flush();
+ return transport.toString(UTF_8.name()).getBytes(UTF_8);
+ } catch (TException e) {
+ throw new IOException("Failed to serialize server load : ", e);
+ } catch (UnsupportedEncodingException uee) {
+ throw new IOException("Failed to serialize server load : ", uee);
+ }
+ }
+
+ public static ServerLoad deserialize(byte[] data) throws IOException {
+ com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
+ = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
+ TMemoryInputTransport transport = new TMemoryInputTransport(data);
+ TJSONProtocol protocol = new TJSONProtocol(transport);
+ try {
+ tServerLoad.read(protocol);
+ ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer());
+ if (tServerLoad.isSetStreams()) {
+ for (com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad : tServerLoad.getStreams()) {
+ serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()));
+ }
+ }
+ return serverLoad;
+ } catch (TException e) {
+ throw new IOException("Failed to deserialize server load : ", e);
+ }
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ ServerLoad other = (ServerLoad) o;
+ if (load == other.load) {
+ return server.compareTo(other.getServer());
+ } else {
+ return Long.compare(load, other.getLoad());
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ ServerLoad other = (ServerLoad) o;
+ return server.equals(other.getServer()) && load == other.getLoad() && streamLoads.equals(other.getStreamLoads());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads);
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(server).append(load).append(streamLoads).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
new file mode 100644
index 0000000..4f3dc71
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TMemoryInputTransport;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+/**
+ * A comparable data object containing the identifier of the stream and the appraised load produced
+ * by the stream.
+ */
+public class StreamLoad implements Comparable {
+ private static final int BUFFER_SIZE = 4096;
+ public final String stream;
+ private final int load;
+
+ public StreamLoad(String stream, int load) {
+ this.stream = stream;
+ this.load = load;
+ }
+
+ public int getLoad() {
+ return load;
+ }
+
+ public String getStream() {
+ return stream;
+ }
+
+ protected com.twitter.distributedlog.service.placement.thrift.StreamLoad toThrift() {
+ com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
+ return tStreamLoad.setStream(stream).setLoad(load);
+ }
+
+ public byte[] serialize() throws IOException {
+ TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+ TJSONProtocol protocol = new TJSONProtocol(transport);
+ try {
+ toThrift().write(protocol);
+ transport.flush();
+ return transport.toString(UTF_8.name()).getBytes(UTF_8);
+ } catch (TException e) {
+ throw new IOException("Failed to serialize stream load : ", e);
+ } catch (UnsupportedEncodingException uee) {
+ throw new IOException("Failed to serialize stream load : ", uee);
+ }
+ }
+
+ public static StreamLoad deserialize(byte[] data) throws IOException {
+ com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
+ TMemoryInputTransport transport = new TMemoryInputTransport(data);
+ TJSONProtocol protocol = new TJSONProtocol(transport);
+ try {
+ tStreamLoad.read(protocol);
+ return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad());
+ } catch (TException e) {
+ throw new IOException("Failed to deserialize stream load : ", e);
+ }
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ StreamLoad other = (StreamLoad) o;
+ if (load == other.getLoad()) {
+ return stream.compareTo(other.getStream());
+ } else {
+ return Long.compare(load, other.getLoad());
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ StreamLoad other = (StreamLoad) o;
+ return stream.equals(other.getStream()) && load == other.getLoad();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load);
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(stream).append(load).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
new file mode 100644
index 0000000..18b9d1f
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Transaction;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.distributedlog.BKDistributedLogNamespace;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.util.DLUtils;
+import com.twitter.distributedlog.util.Utils;
+
+/**
+ * An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to
+ * avoid necessitating an additional system for the resource placement.
+ */
+public class ZKPlacementStateManager implements PlacementStateManager {
+ static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
+ private static final String SERVER_LOAD_DIR = "/.server-load";
+
+ private final String serverLoadPath;
+ private final ZooKeeperClient zkClient;
+
+ private boolean watching = false;
+
+ public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
+ zkClient = BKDistributedLogNamespace.createDLZKClientBuilder(
+ String.format("dlzk:%s:factory_writer_shared", uri),
+ conf,
+ DLUtils.getZKServersFromDLUri(uri),
+ statsLogger.scope("dlzk_factory_writer_shared")).build();
+ serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
+ }
+
+ private void createServerLoadPathIfNoExists(byte[] data)
+ throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
+ try {
+ Utils.zkCreateFullPathOptimistic(zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException nee) {
+ logger.debug("the server load path {} is already created by others", serverLoadPath, nee);
+ }
+ }
+
+ @Override
+ public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException {
+ logger.info("saving ownership");
+ try {
+ ZooKeeper zk = zkClient.get();
+ // use timestamp as data so watchers will see any changes
+ byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+
+ if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist
+ createServerLoadPathIfNoExists(timestamp);
+ }
+
+ Transaction tx = zk.transaction();
+ List<String> children = zk.getChildren(serverLoadPath, false);
+ HashSet<String> servers = new HashSet<String>(children);
+ tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated
+ for (ServerLoad serverLoad : serverLoads) {
+ String server = serverToZkFormat(serverLoad.getServer());
+ String serverPath = serverPath(server);
+ if (servers.contains(server)) {
+ servers.remove(server);
+ tx.setData(serverPath, serverLoad.serialize(), -1);
+ } else {
+ tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+ }
+ }
+ for (String server : servers) {
+ tx.delete(serverPath(server), -1);
+ }
+ tx.commit();
+ } catch (InterruptedException | IOException | KeeperException e) {
+ throw new StateManagerSaveException(e);
+ }
+ }
+
+ @Override
+ public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException {
+ TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+ try {
+ ZooKeeper zk = zkClient.get();
+ List<String> children = zk.getChildren(serverLoadPath, false);
+ for (String server : children) {
+ ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat())));
+ }
+ return ownerships;
+ } catch (InterruptedException | IOException | KeeperException e) {
+ throw new StateManagerLoadException(e);
+ }
+ }
+
+ @Override
+ synchronized public void watch(final PlacementCallback callback) {
+ if (watching) {
+ return; // do not double watch
+ }
+ watching = true;
+
+ try {
+ ZooKeeper zk = zkClient.get();
+ try {
+ zk.getData(serverLoadPath, new Watcher() {
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ try {
+ callback.callback(loadOwnership());
+ } catch (StateManagerLoadException e) {
+ logger.error("Watch of Ownership failed", e);
+ } finally {
+ watching = false;
+ watch(callback);
+ }
+ }
+ }, new Stat());
+ } catch (KeeperException.NoNodeException nee) {
+ byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+ createServerLoadPathIfNoExists(timestamp);
+ watching = false;
+ watch(callback);
+ }
+ } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
+ logger.error("Watch of Ownership failed", e);
+ watching = false;
+ watch(callback);
+ }
+ }
+
+ public String serverPath(String server) {
+ return String.format("%s/%s", serverLoadPath, server);
+ }
+
+ protected String serverToZkFormat(String server) {
+ return server.replaceAll("/", "--");
+ }
+
+ protected String zkFormatToServer(String zkFormattedServer) {
+ return zkFormattedServer.replaceAll("--", "/");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/thrift/metadata.thrift
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/thrift/metadata.thrift b/distributedlog-service/src/main/thrift/metadata.thrift
new file mode 100644
index 0000000..8f7b6ec
--- /dev/null
+++ b/distributedlog-service/src/main/thrift/metadata.thrift
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+namespace java com.twitter.distributedlog.service.placement.thrift
+
+struct StreamLoad {
+ 1: optional string stream
+ 2: optional i32 load
+}
+
+struct ServerLoad {
+ 1: optional string server
+ 2: optional i64 load
+ 3: optional list<StreamLoad> streams
+}