You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:24 UTC

[10/31] incubator-distributedlog git commit: DL-157: resource placement for write proxy

DL-157: resource placement for write proxy


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/0591d067
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/0591d067
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/0591d067

Branch: refs/heads/master
Commit: 0591d067f05617bc534a662b6f9a014192cbe3a5
Parents: 34fa16b
Author: Jordan Bull <jb...@twitter.com>
Authored: Tue Dec 13 11:11:03 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Thu Dec 29 02:10:32 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/benchmark/Benchmarker.java   |  16 +-
 .../distributedlog/benchmark/WriterWorker.java  |   6 +-
 .../client/DistributedLogClientImpl.java        |  25 +--
 .../service/DistributedLogClientBuilder.java    |   1 +
 .../BKDistributedLogNamespace.java              |   8 +-
 distributedlog-service/pom.xml                  |  39 ++++
 .../service/DistributedLogCluster.java          |   7 +-
 .../service/DistributedLogServer.java           |  96 ++++++----
 .../service/DistributedLogServerApp.java        |   7 +-
 .../service/DistributedLogServiceImpl.java      |  90 ++++-----
 .../service/config/ServerConfiguration.java     |  12 ++
 .../service/placement/EqualLoadAppraiser.java   |  37 ++++
 .../placement/LeastLoadPlacementPolicy.java     | 192 +++++++++++++++++++
 .../service/placement/LoadAppraiser.java        |  25 +++
 .../service/placement/PlacementPolicy.java      | 148 ++++++++++++++
 .../placement/PlacementStateManager.java        |  65 +++++++
 .../service/placement/ServerLoad.java           | 152 +++++++++++++++
 .../service/placement/StreamLoad.java           | 109 +++++++++++
 .../placement/ZKPlacementStateManager.java      | 172 +++++++++++++++++
 .../src/main/thrift/metadata.thrift             |  29 +++
 .../service/TestDistributedLogService.java      |  48 ++---
 .../placement/TestLeastLoadPlacementPolicy.java | 160 ++++++++++++++++
 .../service/placement/TestServerLoad.java       |  48 +++++
 .../service/placement/TestStreamLoad.java       |  35 ++++
 .../placement/TestZKPlacementStateManager.java  | 123 ++++++++++++
 25 files changed, 1516 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
index 87d3b53..5b04a05 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
@@ -85,6 +85,7 @@ public class Benchmarker {
     boolean enableBatching = false;
     int batchBufferSize = 256 * 1024;
     int batchFlushIntervalMicros = 2000;
+    String routingServiceFinagleNameString;
 
     final DistributedLogConfiguration conf = new DistributedLogConfiguration();
     final StatsReceiver statsReceiver = new OstrichStatsReceiver();
@@ -125,6 +126,7 @@ public class Benchmarker {
         options.addOption("bt", "enable-batch", false, "Enable batching on writers");
         options.addOption("bbs", "batch-buffer-size", true, "The batch buffer size in bytes");
         options.addOption("bfi", "batch-flush-interval", true, "The batch buffer flush interval in micros");
+        options.addOption("rs", "routing-service", true, "The routing service finagle name for server-side routing");
         options.addOption("h", "help", false, "Print usage.");
     }
 
@@ -221,6 +223,9 @@ public class Benchmarker {
         if (cmdline.hasOption("rb")) {
             recvBufferSize = Integer.parseInt(cmdline.getOptionValue("rb"));
         }
+        if (cmdline.hasOption("rs")) {
+            routingServiceFinagleNameString = cmdline.getOptionValue("rs");
+        }
         thriftmux = cmdline.hasOption("mx");
         handshakeWithClientInfo = cmdline.hasOption("hsci");
         readFromHead = cmdline.hasOption("rfh");
@@ -311,7 +316,8 @@ public class Benchmarker {
                 recvBufferSize,
                 enableBatching,
                 batchBufferSize,
-                batchFlushIntervalMicros);
+                batchFlushIntervalMicros,
+                routingServiceFinagleNameString);
     }
 
     protected WriterWorker createWriteWorker(
@@ -335,7 +341,8 @@ public class Benchmarker {
             int recvBufferSize,
             boolean enableBatching,
             int batchBufferSize,
-            int batchFlushIntervalMicros) {
+            int batchFlushIntervalMicros,
+            String routingServiceFinagleNameString) {
         return new WriterWorker(
                 streamPrefix,
                 uri,
@@ -357,7 +364,8 @@ public class Benchmarker {
                 recvBufferSize,
                 enableBatching,
                 batchBufferSize,
-                batchFlushIntervalMicros);
+                batchFlushIntervalMicros,
+                routingServiceFinagleNameString);
     }
 
     Worker runDLWriter() throws IOException {
@@ -453,7 +461,7 @@ public class Benchmarker {
         try {
             benchmarker.run();
         } catch (Exception e) {
-            logger.info("Benchmark quitted due to : ", e);
+            logger.info("Benchmark quit due to : ", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
index 46229b3..dc5a6e2 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
@@ -81,6 +81,7 @@ public class WriterWorker implements Worker {
     final boolean enableBatching;
     final int batchBufferSize;
     final int batchFlushIntervalMicros;
+    private final String routingServiceFinagleName;
 
     volatile boolean running = true;
 
@@ -113,7 +114,8 @@ public class WriterWorker implements Worker {
                         int recvBufferSize,
                         boolean enableBatching,
                         int batchBufferSize,
-                        int batchFlushIntervalMicros) {
+                        int batchFlushIntervalMicros,
+                        String routingServiceFinagleName) {
         checkArgument(startStreamId <= endStreamId);
         checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty());
         this.streamPrefix = streamPrefix;
@@ -143,6 +145,7 @@ public class WriterWorker implements Worker {
         this.finagleNames = finagleNames;
         this.serverSets = createServerSets(serverSetPaths);
         this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+        this.routingServiceFinagleName = routingServiceFinagleName;
 
         // Streams
         streamNames = new ArrayList<String>(endStreamId - startStreamId);
@@ -197,6 +200,7 @@ public class WriterWorker implements Worker {
             .periodicOwnershipSyncIntervalMs(TimeUnit.MINUTES.toMillis(5))
             .periodicDumpOwnershipCache(true)
             .handshakeTracing(true)
+            .serverRoutingServiceFinagleNameStr(routingServiceFinagleName)
             .name("writer");
 
         if (!finagleNames.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
index 634afe1..1077cd0 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java
@@ -73,6 +73,7 @@ import com.twitter.util.Promise;
 import com.twitter.util.Return;
 import com.twitter.util.Throw;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -852,18 +853,18 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
         }
     }
 
-    private void retryGetOwnerFromRoutingServer(final StreamOp op,
+    private void retryGetOwnerFromResourcePlacementServer(final StreamOp op,
                                                 final Promise<SocketAddress> getOwnerPromise,
                                                 final Throwable cause) {
         if (op.shouldTimeout()) {
             op.fail(null, cause);
             return;
         }
-        getOwnerFromRoutingServer(op, getOwnerPromise);
+        getOwnerFromResourcePlacementServer(op, getOwnerPromise);
     }
 
-    private void getOwnerFromRoutingServer(final StreamOp op,
-                                           final Promise<SocketAddress> getOwnerPromise) {
+    private void getOwnerFromResourcePlacementServer(final StreamOp op,
+                                                     final Promise<SocketAddress> getOwnerPromise) {
         clusterClient.get().getService().getOwner(op.stream, op.ctx)
             .addEventListener(new FutureEventListener<WriteResponse>() {
                 @Override
@@ -875,18 +876,20 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
                 public void onSuccess(WriteResponse value) {
                     if (StatusCode.FOUND == value.getHeader().getCode()
                           && null != value.getHeader().getLocation()) {
-                        SocketAddress addr;
                         try {
-                             addr = DLSocketAddress.deserialize(value.getHeader().getLocation()).getSocketAddress();
+                            InetSocketAddress addr = DLSocketAddress.deserialize(
+                                value.getHeader().getLocation()
+                            ).getSocketAddress();
+                            getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr));
                         } catch (IOException e) {
                             // retry from the routing server again
-                            retryGetOwnerFromRoutingServer(op, getOwnerPromise, e);
+                            logger.error("ERROR in getOwner", e);
+                            retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, e);
                             return;
                         }
-                        getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr));
                     } else {
                         // retry from the routing server again
-                        retryGetOwnerFromRoutingServer(op, getOwnerPromise,
+                        retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise,
                                 new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown"));
                     }
                 }
@@ -896,7 +899,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
     private Future<SocketAddress> getOwner(final StreamOp op) {
         if (clusterClient.isPresent()) {
             final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>();
-            getOwnerFromRoutingServer(op, getOwnerPromise);
+            getOwnerFromResourcePlacementServer(op, getOwnerPromise);
             return getOwnerPromise;
         }
         // pickup host by hashing
@@ -1190,7 +1193,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe
             ownershipCache.updateOwner(stream, ownerAddr);
         } catch (IOException e) {
             logger.warn("Invalid ownership {} found for stream {} : ",
-                        new Object[] { location, stream, e });
+                new Object[] { location, stream, e });
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
index 44d93ee..3f65aff 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
@@ -96,6 +96,7 @@ public final class DistributedLogClientBuilder {
         newBuilder.statsReceiver = builder.statsReceiver;
         newBuilder.streamStatsReceiver = builder.streamStatsReceiver;
         newBuilder.enableRegionStats = builder.enableRegionStats;
+        newBuilder.serverRoutingServiceFinagleName = builder.serverRoutingServiceFinagleName;
         newBuilder.clientConfig = ClientConfig.newConfig(builder.clientConfig);
         return newBuilder;
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 2c9fe44..e7f29cc 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -609,10 +609,10 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         return rootPath;
     }
 
-    private static ZooKeeperClientBuilder createDLZKClientBuilder(String zkcName,
-                                                                  DistributedLogConfiguration conf,
-                                                                  String zkServers,
-                                                                  StatsLogger statsLogger) {
+    public static ZooKeeperClientBuilder createDLZKClientBuilder(String zkcName,
+                                                                 DistributedLogConfiguration conf,
+                                                                 String zkServers,
+                                                                 StatsLogger statsLogger) {
         RetryPolicy retryPolicy = null;
         if (conf.getZKNumRetries() > 0) {
             retryPolicy = new BoundExponentialBackoffRetryPolicy(

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-service/pom.xml b/distributedlog-service/pom.xml
index b7b6ff8..e74d486 100644
--- a/distributedlog-service/pom.xml
+++ b/distributedlog-service/pom.xml
@@ -117,10 +117,49 @@
       <artifactId>jetty-servlet</artifactId>
       <version>${jetty.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>0.5.0-1</version>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>scrooge-core_2.11</artifactId>
+      <version>${scrooge.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <version>2.2.0-incubating</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
       <plugin>
+        <groupId>com.twitter</groupId>
+        <artifactId>scrooge-maven-plugin</artifactId>
+        <version>${scrooge-maven-plugin.version}</version>
+        <configuration>
+          <language>java</language>
+        </configuration>
+        <executions>
+          <execution>
+            <id>thrift-sources</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <artifactId>maven-assembly-plugin</artifactId>
         <version>2.2.1</version>
         <configuration>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
index 0ce335b..3225ced 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
@@ -22,6 +22,7 @@ import com.twitter.distributedlog.LocalDLMEmulator;
 import com.twitter.distributedlog.client.routing.SingleHostRoutingService;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.DLMetadata;
+import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
 import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
 import com.twitter.finagle.builder.Server;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -228,9 +229,11 @@ public class DistributedLogCluster {
                             routingService,
                             new NullStatsProvider(),
                             proxyPort,
-                            thriftmux);
+                            thriftmux,
+                            new EqualLoadAppraiser());
                     routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort));
                     routingService.startService();
+                    serverPair.getLeft().startPlacementPolicy();
                     success = true;
                 } catch (BindException be) {
                     retries++;
@@ -244,7 +247,7 @@ public class DistributedLogCluster {
                 }
             }
 
-            LOG.info("Runnning DL on port {}", proxyPort);
+            LOG.info("Running DL on port {}", proxyPort);
 
             dlServer = serverPair;
             address = DLSocketAddress.getSocketAddress(proxyPort);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
index 185ea82..a9ba125 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
@@ -17,8 +17,32 @@
  */
 package com.twitter.distributedlog.service;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.Tuple2;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.client.routing.RoutingService;
 import com.twitter.distributedlog.config.DynamicConfigurationFactory;
@@ -31,6 +55,8 @@ import com.twitter.distributedlog.service.config.NullStreamConfigProvider;
 import com.twitter.distributedlog.service.config.ServerConfiguration;
 import com.twitter.distributedlog.service.config.ServiceStreamConfigProvider;
 import com.twitter.distributedlog.service.config.StreamConfigProvider;
+import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
+import com.twitter.distributedlog.service.placement.LoadAppraiser;
 import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.thrift.service.DistributedLogService;
@@ -46,31 +72,11 @@ import com.twitter.finagle.thrift.ClientIdRequiredFilter;
 import com.twitter.finagle.thrift.ThriftServerFramedCodec;
 import com.twitter.finagle.transport.Transport;
 import com.twitter.util.Duration;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Tuple2;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 public class DistributedLogServer {
 
     static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
+    private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName();
 
     private DistributedLogServiceImpl dlService = null;
     private Server server = null;
@@ -89,6 +95,7 @@ public class DistributedLogServer {
     private final Optional<Integer> statsPort;
     private final Optional<Integer> shardId;
     private final Optional<Boolean> announceServerSet;
+    private final Optional<String> loadAppraiserClassStr;
     private final Optional<Boolean> thriftmux;
 
     DistributedLogServer(Optional<String> uri,
@@ -98,6 +105,7 @@ public class DistributedLogServer {
                          Optional<Integer> statsPort,
                          Optional<Integer> shardId,
                          Optional<Boolean> announceServerSet,
+                         Optional<String> loadAppraiserClass,
                          Optional<Boolean> thriftmux,
                          RoutingService routingService,
                          StatsReceiver statsReceiver,
@@ -113,9 +121,10 @@ public class DistributedLogServer {
         this.routingService = routingService;
         this.statsReceiver = statsReceiver;
         this.statsProvider = statsProvider;
+        this.loadAppraiserClassStr = loadAppraiserClass;
     }
 
-    public void runServer() throws ConfigurationException, IllegalArgumentException, IOException {
+    public void runServer() throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
         if (!uri.isPresent()) {
             throw new IllegalArgumentException("No distributedlog uri provided.");
         }
@@ -174,6 +183,9 @@ public class DistributedLogServer {
                     IdentityStreamPartitionConverter.class.getName());
             converter = new IdentityStreamPartitionConverter();
         }
+        Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER));
+        LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass);
+        logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get() + " Instantiated " + loadAppraiser.getClass().getCanonicalName());
 
         StreamConfigProvider streamConfProvider =
                 getStreamConfigProvider(dlConf, converter);
@@ -193,7 +205,8 @@ public class DistributedLogServer {
                 keepAliveLatch,
                 statsReceiver,
                 thriftmux.isPresent(),
-                streamConfProvider);
+                streamConfProvider,
+                loadAppraiser);
 
         this.dlService = serverPair.getLeft();
         this.server = serverPair.getRight();
@@ -203,6 +216,8 @@ public class DistributedLogServer {
         // start the routing service after announced
         routingService.startService();
         logger.info("Started the routing service.");
+        dlService.startPlacementPolicy();
+        logger.info("Started the placement policy.");
     }
 
     protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf) {
@@ -256,7 +271,8 @@ public class DistributedLogServer {
             RoutingService routingService,
             StatsProvider provider,
             int port,
-            boolean thriftmux) throws IOException {
+            boolean thriftmux,
+            LoadAppraiser loadAppraiser) throws IOException {
 
         return runServer(serverConf,
                 dlConf,
@@ -269,7 +285,8 @@ public class DistributedLogServer {
                 new CountDownLatch(0),
                 new NullStatsReceiver(),
                 thriftmux,
-                new NullStreamConfigProvider());
+                new NullStreamConfigProvider(),
+                loadAppraiser);
     }
 
     static Pair<DistributedLogServiceImpl, Server> runServer(
@@ -284,7 +301,8 @@ public class DistributedLogServer {
             CountDownLatch keepAliveLatch,
             StatsReceiver statsReceiver,
             boolean thriftmux,
-            StreamConfigProvider streamConfProvider) throws IOException {
+            StreamConfigProvider streamConfProvider,
+            LoadAppraiser loadAppraiser) throws IOException {
         logger.info("Running server @ uri {}.", dlUri);
 
         boolean perStreamStatsEnabled = serverConf.isPerStreamStatEnabled();
@@ -297,16 +315,17 @@ public class DistributedLogServer {
 
         // dl service
         DistributedLogServiceImpl dlService = new DistributedLogServiceImpl(
-                serverConf,
-                dlConf,
-                dynDlConf,
-                streamConfProvider,
-                dlUri,
-                partitionConverter,
-                routingService,
-                provider.getStatsLogger(""),
-                perStreamStatsLogger,
-                keepAliveLatch);
+            serverConf,
+            dlConf,
+            dynDlConf,
+            streamConfProvider,
+            dlUri,
+            partitionConverter,
+            routingService,
+            provider.getStatsLogger(""),
+            perStreamStatsLogger,
+            keepAliveLatch,
+            loadAppraiser);
 
         StatsReceiver serviceStatsReceiver = statsReceiver.scope("service");
         StatsLogger serviceStatsLogger = provider.getStatsLogger("service");
@@ -400,6 +419,7 @@ public class DistributedLogServer {
      * @throws ConfigurationException
      * @throws IllegalArgumentException
      * @throws IOException
+     * @throws ClassNotFoundException
      */
     public static DistributedLogServer runServer(
                Optional<String> uri,
@@ -409,11 +429,12 @@ public class DistributedLogServer {
                Optional<Integer> statsPort,
                Optional<Integer> shardId,
                Optional<Boolean> announceServerSet,
+               Optional<String> loadAppraiserClass,
                Optional<Boolean> thriftmux,
                RoutingService routingService,
                StatsReceiver statsReceiver,
                StatsProvider statsProvider)
-            throws ConfigurationException, IllegalArgumentException, IOException {
+        throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
 
         final DistributedLogServer server = new DistributedLogServer(
                 uri,
@@ -423,6 +444,7 @@ public class DistributedLogServer {
                 statsPort,
                 shardId,
                 announceServerSet,
+                loadAppraiserClass,
                 thriftmux,
                 routingService,
                 statsReceiver,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
index af36307..1c3d8d4 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
@@ -68,6 +68,7 @@ public class DistributedLogServerApp {
         options.addOption("pd", "stats-provider", true, "DistributedLog Stats Provider");
         options.addOption("si", "shard-id", true, "DistributedLog Shard ID");
         options.addOption("a", "announce", false, "ServerSet Path to Announce");
+        options.addOption("la", "load-appraiser", true, "LoadAppraiser Implementation to Use");
         options.addOption("mx", "thriftmux", false, "Is thriftmux enabled");
     }
 
@@ -97,10 +98,13 @@ public class DistributedLogServerApp {
         } catch (IOException ie) {
             logger.error("Failed to start distributedlog server : ", ie);
             Runtime.getRuntime().exit(-1);
+        } catch (ClassNotFoundException cnf) {
+          logger.error("Failed to start distributedlog server : ", cnf);
+          Runtime.getRuntime().exit(-1);
         }
     }
 
-    private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException {
+    private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
         final StatsReceiver statsReceiver = NullStatsReceiver.get();
         Optional<String> confOptional = getOptionalStringArg(cmdline, "c");
         DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
@@ -142,6 +146,7 @@ public class DistributedLogServerApp {
                 getOptionalIntegerArg(cmdline, "sp"),
                 getOptionalIntegerArg(cmdline, "si"),
                 getOptionalBooleanArg(cmdline, "a"),
+                getOptionalStringArg(cmdline, "la"),
                 getOptionalBooleanArg(cmdline, "mx"),
                 routingService,
                 statsReceiver,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 5c5b5af..e7974c7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.net.InetSocketAddressHelper;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.acl.AccessControlManager;
@@ -36,8 +37,14 @@ import com.twitter.distributedlog.exceptions.TooManyStreamsException;
 import com.twitter.distributedlog.feature.AbstractFeatureProvider;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.rate.MovingAverageRate;
+import com.twitter.distributedlog.rate.MovingAverageRateFactory;
 import com.twitter.distributedlog.service.config.ServerConfiguration;
 import com.twitter.distributedlog.service.config.StreamConfigProvider;
+import com.twitter.distributedlog.service.placement.LeastLoadPlacementPolicy;
+import com.twitter.distributedlog.service.placement.LoadAppraiser;
+import com.twitter.distributedlog.service.placement.PlacementPolicy;
+import com.twitter.distributedlog.service.placement.ZKPlacementStateManager;
 import com.twitter.distributedlog.service.stream.BulkWriteOp;
 import com.twitter.distributedlog.service.stream.DeleteOp;
 import com.twitter.distributedlog.service.stream.admin.CreateOp;
@@ -67,32 +74,19 @@ import com.twitter.distributedlog.thrift.service.ServerStatus;
 import com.twitter.distributedlog.thrift.service.StatusCode;
 import com.twitter.distributedlog.thrift.service.WriteContext;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
-import com.twitter.distributedlog.rate.MovingAverageRateFactory;
-import com.twitter.distributedlog.rate.MovingAverageRate;
 import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.SchedulerUtils;
-import com.twitter.finagle.NoBrokersAvailableException;
 import com.twitter.util.Await;
 import com.twitter.util.Duration;
+import com.twitter.util.Function;
 import com.twitter.util.Function0;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
-import com.twitter.util.Timer;
 import com.twitter.util.ScheduledThreadPoolTimer;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
+import com.twitter.util.Timer;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -102,6 +96,17 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.runtime.BoxedUnit;
+
 public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface,
                                                   FatalErrorHandler {
 
@@ -113,6 +118,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
     private final DistributedLogConfiguration dlConfig;
     private final DistributedLogNamespace dlNamespace;
     private final int serverRegionId;
+    private final PlacementPolicy placementPolicy;
     private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT;
     private final ReentrantReadWriteLock closeLock =
             new ReentrantReadWriteLock();
@@ -157,6 +163,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
     private final Gauge<Number> movingAvgBpsGauge;
     private final Gauge<Number> streamAcquiredGauge;
     private final Gauge<Number> streamCachedGauge;
+    private final int shard;
 
     DistributedLogServiceImpl(ServerConfiguration serverConf,
                               DistributedLogConfiguration dlConf,
@@ -167,7 +174,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
                               RoutingService routingService,
                               StatsLogger statsLogger,
                               StatsLogger perStreamStatsLogger,
-                              CountDownLatch keepAliveLatch)
+                              CountDownLatch keepAliveLatch,
+                              LoadAppraiser loadAppraiser)
             throws IOException {
         // Configuration.
         this.serverConfig = serverConf;
@@ -177,7 +185,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         this.serverRegionId = serverConf.getRegionId();
         this.streamPartitionConverter = converter;
         int serverPort = serverConf.getServerPort();
-        int shard = serverConf.getServerShardId();
+        this.shard = serverConf.getServerShardId();
         int numThreads = serverConf.getServerThreads();
         this.clientId = DLSocketAddress.toLockId(DLSocketAddress.getSocketAddress(serverPort), shard);
         String allocatorPoolName = ServerUtils.getLedgerAllocatorPoolName(
@@ -264,6 +272,15 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
                 streamManager,
                 limiterDisabledFeature);
 
+        this.placementPolicy = new LeastLoadPlacementPolicy(
+            loadAppraiser,
+            routingService,
+            dlNamespace,
+            new ZKPlacementStateManager(uri, dlConf, statsLogger),
+            Duration.fromSeconds(serverConf.getResourcePlacementRefreshInterval()),
+            statsLogger);
+        logger.info("placement started");
+
         // Stats
         this.statsLogger = statsLogger;
 
@@ -501,35 +518,13 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
             return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(clientId)));
         }
 
-        Stream stream = streamManager.getStream(streamName);
-        String owner;
-        if (null != stream && null != (owner = stream.getOwner())) {
-            return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(owner)));
-        }
-
-        RoutingService.RoutingContext routingContext = RoutingService.RoutingContext.of(regionResolver);
-
-        if (ctx.isSetTriedHosts()) {
-            for (String triedHost : ctx.getTriedHosts()) {
-                routingContext.addTriedHost(
-                        DLSocketAddress.parseSocketAddress(triedHost), StatusCode.STREAM_UNAVAILABLE);
-            }
-        }
-
-        try {
-            SocketAddress host = routingService.getHost(streamName, routingContext);
-            if (host instanceof InetSocketAddress) {
-                // use shard id '-1' as the shard id here won't be used for redirection
-                return Future.value(new WriteResponse(
-                        ResponseUtils.ownerToHeader(DLSocketAddress.toLockId((InetSocketAddress) host, -1))));
-            } else {
-                return Future.value(new WriteResponse(
-                        ResponseUtils.streamUnavailableHeader()));
+        return placementPolicy.placeStream(streamName).map(new Function<String, WriteResponse>() {
+            @Override
+            public WriteResponse apply(String server) {
+                String host = DLSocketAddress.toLockId(InetSocketAddressHelper.parse(server), -1);
+                return new WriteResponse(ResponseUtils.ownerToHeader(host));
             }
-        } catch (NoBrokersAvailableException e) {
-            return Future.value(new WriteResponse(
-                    ResponseUtils.streamUnavailableHeader()));
-        }
+        });
     }
 
 
@@ -689,6 +684,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
 
             // Stop the timer.
             timer.stop();
+            placementPolicy.close();
 
             // clean up gauge
             unregisterGauge();
@@ -704,6 +700,10 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         }
     }
 
+    protected void startPlacementPolicy() {
+        this.placementPolicy.start(shard == 0);
+    }
+
     @Override
     public void notifyFatalError() {
         triggerShutdown();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
index 9a9e83c..5b19f6c 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
@@ -95,6 +95,9 @@ public class ServerConfiguration extends CompositeConfiguration {
     protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME
         = "server_use_hostname_as_allocator_pool_name";
     protected static final boolean SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false;
+    //Configure refresh interval for calculating resource placement in seconds
+    public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S = "server_resource_placement_refresh_interval_sec";
+    public static final int  SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120;
 
     public ServerConfiguration() {
         super();
@@ -399,6 +402,15 @@ public class ServerConfiguration extends CompositeConfiguration {
             SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT);
     }
 
+    public ServerConfiguration setResourcePlacementRefreshInterval(int refreshIntervalSecs) {
+        setProperty(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, refreshIntervalSecs);
+        return this;
+    }
+
+    public int getResourcePlacementRefreshInterval() {
+        return getInt(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT);
+    }
+
     /**
      * Validate the configuration
      */

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
new file mode 100644
index 0000000..144e358
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import com.twitter.util.Future;
+
+/**
+ * Created for those who hold these truths to be self-evident, that all streams are created equal,
+ * that they are endowed by their creator with certain unalienable loads, that among these are
+ * Uno, Eins, and One.
+ */
+public class EqualLoadAppraiser implements LoadAppraiser {
+  @Override
+  public Future<StreamLoad> getStreamLoad(String stream) {
+    return Future.value(new StreamLoad(stream, 1));
+  }
+
+  @Override
+  public Future<Void> refreshCache() {
+    return Future.value(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
new file mode 100644
index 0000000..e4c8128
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import scala.Function1;
+import scala.runtime.BoxedUnit;
+
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.Stats;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.Futures;
+
+/**
+ * A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as
+ * evenly as possible across all shards. The LoadAppraiser remains responsible for determining what
+ * the load of a server would be. This placement policy then distributes these streams across the
+ * servers.
+ */
+public class LeastLoadPlacementPolicy extends PlacementPolicy {
+  private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+  private Map<String, String> streamToServer = new HashMap<String, String>();
+
+  public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
+                                  DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+                                  Duration refreshInterval, StatsLogger statsLogger) {
+    super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
+    statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
+      @Override
+      public Number getDefaultValue() {
+        return 0;
+      }
+
+      @Override
+      public Number getSample() {
+        if (serverLoads.size() > 0) {
+          return serverLoads.last().getLoad() - serverLoads.first().getLoad();
+        } else {
+          return getDefaultValue();
+        }
+      }
+    });
+  }
+
+  @Override
+  public Future<String> placeStream(String stream) {
+    if (streamToServer.containsKey(stream)) {
+      return Future.value(streamToServer.get(stream));
+    }
+    Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream);
+    return streamLoadFuture.map(new Function<StreamLoad, String>() {
+      @Override
+      public String apply(StreamLoad streamLoad) {
+        return placeStreamSynchronized(streamLoad);
+      }
+    });
+  }
+
+  synchronized private String placeStreamSynchronized(StreamLoad streamLoad) {
+    ServerLoad serverLoad = serverLoads.pollFirst();
+    serverLoad.addStream(streamLoad);
+    serverLoads.add(serverLoad);
+    return serverLoad.getServer();
+  }
+
+  @Override
+  public void refresh() {
+    logger.info("Refreshing server loads.");
+    Future<Void> refresh = loadAppraiser.refreshCache();
+    final Set<String> servers = getServers();
+    final Set<String> allStreams = getStreams();
+    Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(new Function<Void, Future<TreeSet<ServerLoad>>>() {
+      @Override
+      public Future<TreeSet<ServerLoad>> apply(Void v1) {
+        return calculate(servers, allStreams);
+      }
+    });
+    serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+      @Override
+      public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+        try {
+          updateServerLoads(serverLoads);
+        } catch (PlacementStateManager.StateManagerSaveException e) {
+          logger.error("The refreshed mapping could not be persisted and will not be used.", e);
+        }
+        return BoxedUnit.UNIT;
+      }
+    });
+  }
+
+  synchronized private void updateServerLoads(TreeSet<ServerLoad> serverLoads) throws PlacementStateManager.StateManagerSaveException {
+    this.placementStateManager.saveOwnership(serverLoads);
+    this.streamToServer = serverLoadsToMap(serverLoads);
+    this.serverLoads = serverLoads;
+  }
+
+  @Override
+  synchronized public void load(TreeSet<ServerLoad> serverLoads) {
+    this.serverLoads = serverLoads;
+    this.streamToServer = serverLoadsToMap(serverLoads);
+  }
+
+  public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) {
+    logger.info("Calculating server loads");
+    final long startTime = System.currentTimeMillis();
+    ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size());
+
+    for (String stream: streams) {
+      Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream);
+      futures.add(streamLoad);
+    }
+
+    return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() {
+      @Override
+      public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) {
+        /* Sort streamLoads so largest streams are placed first for better balance */
+        TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>();
+        for (StreamLoad streamLoad: streamLoads) {
+          streamQueue.add(streamLoad);
+        }
+
+        TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+        for (String server: servers) {
+          ServerLoad serverLoad = new ServerLoad(server);
+          if (!streamQueue.isEmpty()) {
+            serverLoad.addStream(streamQueue.pollFirst());
+          }
+          serverLoads.add(serverLoad);
+        }
+
+        while (!streamQueue.isEmpty()) {
+          ServerLoad serverLoad = serverLoads.pollFirst();
+          serverLoad.addStream(streamQueue.pollFirst());
+          serverLoads.add(serverLoad);
+        }
+        return serverLoads;
+      }
+    }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+      @Override
+      public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+        placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
+        return BoxedUnit.UNIT;
+      }
+    }).onFailure(new Function<Throwable, BoxedUnit>() {
+      @Override
+      public BoxedUnit apply(Throwable t) {
+        logger.error("Failure calculating loads", t);
+        placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
+        return BoxedUnit.UNIT;
+      }
+    });
+  }
+
+  private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) {
+    HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size());
+    for (ServerLoad serverLoad: serverLoads) {
+      for (StreamLoad streamLoad: serverLoad.getStreamLoads()) {
+        streamToServer.put(streamLoad.getStream(), serverLoad.getServer());
+      }
+    }
+    return streamToServer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
new file mode 100644
index 0000000..784f106
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import com.twitter.util.Future;
+
+public interface LoadAppraiser {
+  Future<StreamLoad> getStreamLoad(String stream);
+  Future<Void> refreshCache();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
new file mode 100644
index 0000000..2044428
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+
+import scala.runtime.BoxedUnit;
+
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.service.DLSocketAddress;
+import com.twitter.util.Duration;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.ScheduledThreadPoolTimer;
+import com.twitter.util.Time;
+import com.twitter.util.Timer;
+
+/**
+ * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream
+ * contains. The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will
+ * then distributed these StreamLoads to the available servers in a manner defined by the
+ * implementation creating ServerLoad objects. It then saves this assignment via the
+ * PlacementStateManager.
+ */
+public abstract class PlacementPolicy {
+  protected final LoadAppraiser loadAppraiser;
+  protected final RoutingService routingService;
+  protected final DistributedLogNamespace namespace;
+  protected final PlacementStateManager placementStateManager;
+  private final Duration refreshInterval;
+
+  protected static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class);
+  protected final OpStatsLogger placementCalcStats;
+  private Timer placementRefreshTimer;
+
+  public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
+                         DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+                         Duration refreshInterval, StatsLogger statsLogger) {
+    this.loadAppraiser = loadAppraiser;
+    this.routingService = routingService;
+    this.namespace = namespace;
+    this.placementStateManager = placementStateManager;
+    this.refreshInterval = refreshInterval;
+    placementCalcStats = statsLogger.getOpStatsLogger("placement");
+  }
+
+  public Set<String> getServers() {
+    Set<SocketAddress> hosts = routingService.getHosts();
+    Set<String> servers = new HashSet<String>(hosts.size());
+    for (SocketAddress address: hosts) {
+      servers.add(DLSocketAddress.toString((InetSocketAddress) address));
+    }
+    return servers;
+  }
+
+  public Set<String> getStreams() {
+    Set<String> streams = new HashSet<String>();
+    try {
+      Iterator<String> logs = namespace.getLogs();
+      while (logs.hasNext()) {
+        streams.add(logs.next());
+      }
+    } catch (IOException e) {
+      logger.error("Could not get streams for placement policy.", e);
+    }
+    return streams;
+  }
+
+  public void start(boolean leader) {
+    logger.info("Starting placement policy");
+
+    TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
+    for (String server: getServers()) {
+      emptyServerLoads.add(new ServerLoad(server));
+    }
+    load(emptyServerLoads); //Pre-Load so streams don't NPE
+    if (leader) { //this is the leader shard
+      logger.info("Shard is leader. Scheduling timed refresh.");
+      placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true);
+      placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() {
+        @Override
+        public BoxedUnit apply() {
+          refresh();
+          return BoxedUnit.UNIT;
+        }
+      });
+    } else {
+      logger.info("Shard is not leader. Watching for server load changes.");
+      placementStateManager.watch(new PlacementStateManager.PlacementCallback() {
+        @Override
+        public void callback(TreeSet<ServerLoad> serverLoads) {
+          if (!serverLoads.isEmpty()) {
+            load(serverLoads);
+          }
+        }
+      });
+    }
+  }
+
+  public void close() {
+    if (placementRefreshTimer != null) {
+      placementRefreshTimer.stop();
+    }
+  }
+
+  /**
+   * Places the stream on a server according to the policy and returns a future contianing the
+   * host that owns the stream upon completion
+   */
+  public abstract Future<String> placeStream(String stream);
+
+  /**
+   * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager
+   */
+  public abstract void refresh();
+
+  /**
+   * Loads the placement mapping into the node from a TreeSet of ServerLoads
+   */
+  public abstract void load(TreeSet<ServerLoad> serverLoads);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
new file mode 100644
index 0000000..cd0d906
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.util.TreeSet;
+
+/**
+ * The PlacementStateManager handles persistence of calculated resource placements including, the
+ * storage once the calculated, and the retrieval by the other shards.
+ */
+public interface PlacementStateManager {
+
+  /**
+   * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage
+   */
+  void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException;
+
+  /**
+   * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage
+   */
+  TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException;
+
+  /**
+   * Watch the persistent storage for changes to the ownership mapping and calls placementCallback
+   * with the new mapping when a change occurs
+   */
+  void watch(PlacementCallback placementCallback);
+
+  interface PlacementCallback {
+    void callback(TreeSet<ServerLoad> serverLoads);
+  }
+
+  abstract class StateManagerException extends Exception {
+    public StateManagerException(String message, Exception e) {
+      super(message, e);
+    }
+  }
+
+  class StateManagerLoadException extends StateManagerException {
+    public StateManagerLoadException(Exception e) {
+      super("Load of Ownership failed", e);
+    }
+  }
+
+  class StateManagerSaveException extends StateManagerException {
+    public StateManagerSaveException(Exception e) {
+      super("Save of Ownership failed", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
new file mode 100644
index 0000000..d7fbcf2
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import com.twitter.distributedlog.service.placement.thrift.*;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TMemoryInputTransport;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+/**
+ * A comparable data object containing the identifier of the server, total appraised load on the
+ * server, and all streams assigned to the server by the resource placement mapping. This is
+ * comparable first by load and then by server so that a sorted data structure of these will be
+ * consistent across multiple calculations.
+ */
+public class ServerLoad implements Comparable {
+  private static final int BUFFER_SIZE = 4096000;
+  private final String server;
+  private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>();
+  private long load = 0l;
+
+  public ServerLoad(String server) {
+    this.server = server;
+  }
+
+  synchronized public long addStream(StreamLoad stream) {
+    this.load += stream.getLoad();
+    streamLoads.add(stream);
+    return this.load;
+  }
+
+  synchronized public long removeStream(String stream) {
+    for (StreamLoad streamLoad : streamLoads) {
+      if (streamLoad.stream.equals(stream)) {
+        this.load -= load;
+        streamLoads.remove(streamLoad);
+        return this.load;
+      }
+    }
+    return this.load; //Throwing an exception wouldn't help us as our logic should never reach here
+  }
+
+  public long getLoad() {
+    return load;
+  }
+
+  public Set<StreamLoad> getStreamLoads() {
+    return streamLoads;
+  }
+
+  public String getServer() {
+    return server;
+  }
+
+  protected com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() {
+    com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
+        = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
+    tServerLoad.setServer(server);
+    tServerLoad.setLoad(load);
+    ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads
+        = new ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad>();
+    for (StreamLoad streamLoad: streamLoads) {
+      tStreamLoads.add(streamLoad.toThrift());
+    }
+    tServerLoad.setStreams(tStreamLoads);
+    return tServerLoad;
+  }
+
+  public byte[] serialize() throws IOException {
+    TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      toThrift().write(protocol);
+      transport.flush();
+      return transport.toString(UTF_8.name()).getBytes(UTF_8);
+    } catch (TException e) {
+      throw new IOException("Failed to serialize server load : ", e);
+    } catch (UnsupportedEncodingException uee) {
+      throw new IOException("Failed to serialize server load : ", uee);
+    }
+  }
+
+  public static ServerLoad deserialize(byte[] data) throws IOException {
+    com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
+        = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
+    TMemoryInputTransport transport = new TMemoryInputTransport(data);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      tServerLoad.read(protocol);
+      ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer());
+      if (tServerLoad.isSetStreams()) {
+        for (com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad : tServerLoad.getStreams()) {
+          serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()));
+        }
+      }
+      return serverLoad;
+    } catch (TException e) {
+      throw new IOException("Failed to deserialize server load : ", e);
+    }
+  }
+
+  @Override
+  public int compareTo(Object o) {
+    ServerLoad other = (ServerLoad) o;
+    if (load == other.load) {
+      return server.compareTo(other.getServer());
+    } else {
+      return Long.compare(load, other.getLoad());
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    ServerLoad other = (ServerLoad) o;
+    return server.equals(other.getServer()) && load == other.getLoad() && streamLoads.equals(other.getStreamLoads());
+  }
+
+  @Override
+  public String toString() {
+    return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(server).append(load).append(streamLoads).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
new file mode 100644
index 0000000..4f3dc71
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TMemoryInputTransport;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+/**
+ * A comparable data object containing the identifier of the stream and the appraised load produced
+ * by the stream.
+ */
+public class StreamLoad implements Comparable {
+  private static final int BUFFER_SIZE = 4096;
+  public final String stream;
+  private final int load;
+
+  public StreamLoad(String stream, int load) {
+    this.stream = stream;
+    this.load = load;
+  }
+
+  public int getLoad() {
+    return load;
+  }
+
+  public String getStream() {
+    return stream;
+  }
+
+  protected com.twitter.distributedlog.service.placement.thrift.StreamLoad toThrift() {
+    com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
+    return tStreamLoad.setStream(stream).setLoad(load);
+  }
+
+  public byte[] serialize() throws IOException {
+    TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      toThrift().write(protocol);
+      transport.flush();
+      return transport.toString(UTF_8.name()).getBytes(UTF_8);
+    } catch (TException e) {
+      throw new IOException("Failed to serialize stream load : ", e);
+    } catch (UnsupportedEncodingException uee) {
+      throw new IOException("Failed to serialize stream load : ", uee);
+    }
+  }
+
+  public static StreamLoad deserialize(byte[] data) throws IOException {
+    com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
+    TMemoryInputTransport transport = new TMemoryInputTransport(data);
+    TJSONProtocol protocol = new TJSONProtocol(transport);
+    try {
+      tStreamLoad.read(protocol);
+      return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad());
+    } catch (TException e) {
+      throw new IOException("Failed to deserialize stream load : ", e);
+    }
+  }
+
+  @Override
+  public int compareTo(Object o) {
+    StreamLoad other = (StreamLoad) o;
+    if (load == other.getLoad()) {
+      return stream.compareTo(other.getStream());
+    } else {
+      return Long.compare(load, other.getLoad());
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    StreamLoad other = (StreamLoad) o;
+    return stream.equals(other.getStream()) && load == other.getLoad();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(stream).append(load).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
new file mode 100644
index 0000000..18b9d1f
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.placement;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Transaction;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.distributedlog.BKDistributedLogNamespace;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.util.DLUtils;
+import com.twitter.distributedlog.util.Utils;
+
+/**
+ * An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to
+ * avoid necessitating an additional system for the resource placement.
+ */
+public class ZKPlacementStateManager implements PlacementStateManager {
+  static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
+  private static final String SERVER_LOAD_DIR = "/.server-load";
+
+  private final String serverLoadPath;
+  private final ZooKeeperClient zkClient;
+
+  private boolean watching = false;
+
+  public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
+    zkClient = BKDistributedLogNamespace.createDLZKClientBuilder(
+        String.format("dlzk:%s:factory_writer_shared", uri),
+        conf,
+        DLUtils.getZKServersFromDLUri(uri),
+        statsLogger.scope("dlzk_factory_writer_shared")).build();
+    serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
+  }
+
+  private void createServerLoadPathIfNoExists(byte[] data)
+        throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
+    try {
+      Utils.zkCreateFullPathOptimistic(zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+    } catch (KeeperException.NodeExistsException nee) {
+      logger.debug("the server load path {} is already created by others", serverLoadPath, nee);
+    }
+  }
+
+  @Override
+  public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException {
+    logger.info("saving ownership");
+    try {
+      ZooKeeper zk = zkClient.get();
+      // use timestamp as data so watchers will see any changes
+      byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+
+      if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist
+        createServerLoadPathIfNoExists(timestamp);
+      }
+
+      Transaction tx = zk.transaction();
+      List<String> children = zk.getChildren(serverLoadPath, false);
+      HashSet<String> servers = new HashSet<String>(children);
+      tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated
+      for (ServerLoad serverLoad : serverLoads) {
+        String server = serverToZkFormat(serverLoad.getServer());
+        String serverPath = serverPath(server);
+        if (servers.contains(server)) {
+          servers.remove(server);
+          tx.setData(serverPath, serverLoad.serialize(), -1);
+        } else {
+          tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+        }
+      }
+      for (String server : servers) {
+        tx.delete(serverPath(server), -1);
+      }
+      tx.commit();
+    } catch (InterruptedException | IOException | KeeperException e) {
+      throw new StateManagerSaveException(e);
+    }
+  }
+
+  @Override
+  public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException {
+    TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+    try {
+      ZooKeeper zk = zkClient.get();
+      List<String> children = zk.getChildren(serverLoadPath, false);
+      for (String server : children) {
+        ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat())));
+      }
+      return ownerships;
+    } catch (InterruptedException | IOException | KeeperException e) {
+      throw new StateManagerLoadException(e);
+    }
+  }
+
+  @Override
+  synchronized public void watch(final PlacementCallback callback) {
+    if (watching) {
+      return; // do not double watch
+    }
+    watching = true;
+
+    try {
+      ZooKeeper zk = zkClient.get();
+      try {
+        zk.getData(serverLoadPath, new Watcher() {
+          @Override
+          public void process(WatchedEvent watchedEvent) {
+            try {
+              callback.callback(loadOwnership());
+            } catch (StateManagerLoadException e) {
+              logger.error("Watch of Ownership failed", e);
+            } finally {
+              watching = false;
+              watch(callback);
+            }
+          }
+        }, new Stat());
+      } catch (KeeperException.NoNodeException nee) {
+        byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+        createServerLoadPathIfNoExists(timestamp);
+        watching = false;
+        watch(callback);
+      }
+    } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
+      logger.error("Watch of Ownership failed", e);
+      watching = false;
+      watch(callback);
+    }
+  }
+
+  public String serverPath(String server) {
+    return String.format("%s/%s", serverLoadPath, server);
+  }
+
+  protected String serverToZkFormat(String server) {
+    return server.replaceAll("/", "--");
+  }
+
+  protected String zkFormatToServer(String zkFormattedServer) {
+    return zkFormattedServer.replaceAll("--", "/");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/thrift/metadata.thrift
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/thrift/metadata.thrift b/distributedlog-service/src/main/thrift/metadata.thrift
new file mode 100644
index 0000000..8f7b6ec
--- /dev/null
+++ b/distributedlog-service/src/main/thrift/metadata.thrift
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+namespace java com.twitter.distributedlog.service.placement.thrift
+
+struct StreamLoad {
+    1: optional string stream
+    2: optional i32 load
+}
+
+struct ServerLoad {
+    1: optional string server
+    2: optional i64 load
+    3: optional list<StreamLoad> streams
+}