You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/05/07 08:50:42 UTC

[GitHub] sijie closed pull request #1392: [table service] a zk based storage container controller

sijie closed pull request #1392: [table service] a zk based storage container controller 
URL: https://github.com/apache/bookkeeper/pull/1392
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
index 938bf49f2..b62c8ae7b 100644
--- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
@@ -23,6 +23,9 @@
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets.SetView;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
 
 /**
  * Assertion utils.
@@ -42,4 +45,10 @@ private MoreAsserts() {}
             diff.isEmpty());
     }
 
+    public static <T> void assertUtil(Predicate<T> predicate, Supplier<T> supplier) throws InterruptedException {
+        while (!predicate.test(supplier.get())) {
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+    }
+
 }
diff --git a/pom.xml b/pom.xml
index 0fef32aa5..973140c68 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,6 +115,7 @@
     <commons-lang.version>2.6</commons-lang.version>
     <commons-lang3.version>3.3.2</commons-lang3.version>
     <commons-io.version>2.4</commons-io.version>
+    <curator.version>4.0.1</curator.version>
     <dropwizard.version>3.1.0</dropwizard.version>
     <finagle.version>6.44.0</finagle.version>
     <freebuilder.version>1.12.3</freebuilder.version>
@@ -413,6 +414,19 @@
         </exclusions>
       </dependency>
 
+      <!-- curator dependencies -->
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-recipes</artifactId>
+        <version>${curator.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
       <!-- http server dependencies -->
       <dependency>
         <groupId>com.twitter</groupId>
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
index d83d5a746..ba2c2479a 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
@@ -25,6 +25,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
 import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -37,6 +38,7 @@
 /**
  * A root range client wrapper with retries.
  */
+@Slf4j
 class RootRangeClientImplWithRetries implements RootRangeClient {
 
     @VisibleForTesting
@@ -63,7 +65,8 @@ private static boolean shouldRetryOnException(Throwable cause) {
         } else if (cause instanceof RuntimeException) {
             return false;
         } else {
-            return true;
+            // storage level exceptions
+            return false;
         }
     }
 
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/NetUtils.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/NetUtils.java
index 23b9ca152..28fdd4102 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/NetUtils.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/NetUtils.java
@@ -119,4 +119,16 @@ public static Endpoint createEndpoint(String hostname, int port) {
             .build();
     }
 
+    /**
+     * Convert an endpoint to string.
+     *
+     * @param ep endpoint
+     * @return endpoint string representation
+     */
+    public static String endpointToString(Endpoint ep) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(ep.getHostname()).append(":").append(ep.getPort());
+        return sb.toString();
+    }
+
 }
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/ZooKeeperClusterTestCase.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/ZooKeeperClusterTestCase.java
index 1f7ac607d..7fb8bef67 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/ZooKeeperClusterTestCase.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/ZooKeeperClusterTestCase.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog;
 
 import java.io.File;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.commons.io.FileUtils;
@@ -29,8 +30,15 @@
 /**
  * ZooKeeperClusterTestCase.
  */
+@Slf4j
 public class ZooKeeperClusterTestCase {
 
+    static {
+        // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words
+        // are disabled by default due to security reasons
+        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+    }
+
     protected static File zkDir;
     protected static ZooKeeperServerShim zks;
     protected static String zkServers;
@@ -43,10 +51,13 @@ public static void setupZooKeeper() throws Exception {
         zks = serverAndPort.getLeft();
         zkPort = serverAndPort.getRight();
         zkServers = "127.0.0.1:" + zkPort;
+
+        log.info("--- Setup zookeeper at {} ---", zkServers);
     }
 
     @AfterClass
     public static void shutdownZooKeeper() throws Exception {
+        log.info("--- Shutdown zookeeper at {} ---", zkServers);
         zks.stop();
         if (null != zkDir) {
             FileUtils.forceDeleteOnExit(zkDir);
diff --git a/stream/proto/src/main/proto/cluster.proto b/stream/proto/src/main/proto/cluster.proto
new file mode 100644
index 000000000..1454071ab
--- /dev/null
+++ b/stream/proto/src/main/proto/cluster.proto
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+import "common.proto";
+
+package bookkeeper.proto.cluster;
+
+option java_multiple_files = true;
+option java_package = "org.apache.bookkeeper.stream.proto.cluster";
+
+message ServerAssignmentData {
+    // assigned containers
+    repeated int64 containers = 1;
+}
+
+message ClusterAssignmentData {
+    map<string, ServerAssignmentData> servers = 1;
+}
+
+message ClusterMetadata {
+    int64 num_storage_containers = 1;
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 72a41e508..63e26791d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -16,17 +16,20 @@
 
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getSegmentsRootPath;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getStoragePath;
 
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
+import java.net.URI;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
@@ -35,25 +38,25 @@
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.StorageServer;
-import org.apache.bookkeeper.stream.storage.api.controller.StorageController;
+import org.apache.bookkeeper.stream.storage.StorageConstants;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageController;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LocalDLMEmulator;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.Transaction;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
 
 /**
  * A Cluster that runs a few storage nodes.
@@ -74,17 +77,10 @@ public static StreamCluster build(StreamClusterSpec spec) {
         return new StreamCluster(spec);
     }
 
-    //
-    // DL Settings
-    //
-    private static final String ROOT_PATH = "/stream";
-    private static final String LEDGERS_PATH = "/stream/ledgers";
-    private static final String LEDGERS_AVAILABLE_PATH = "/stream/ledgers/available";
-    private static final String NAMESPACE = "/stream/storage";
-
-    private static ServerConfiguration newServerConfiguration(String zkEnsemble) {
+    private static ServerConfiguration newBookieConfiguration(String zkEnsemble) {
         ServerConfiguration serverConf = new ServerConfiguration();
-        serverConf.setMetadataServiceUri("zk://" + zkEnsemble + LEDGERS_PATH);
+        serverConf.setMetadataServiceUri(
+            "zk://" + zkEnsemble + getSegmentsRootPath(StorageConstants.ZK_METADATA_ROOT_PATH));
         serverConf.setAllowLoopback(true);
         serverConf.setGcWaitTime(300000);
         serverConf.setDiskUsageWarnThreshold(0.9999f);
@@ -117,8 +113,8 @@ private StreamCluster(StreamClusterSpec spec) {
         return rpcEndpoints;
     }
 
-    public String getZkServers() {
-        return zkEnsemble;
+    public URI getDefaultBackendUri() {
+        return URI.create("distributedlog://" + zkEnsemble + getStoragePath(ZK_METADATA_ROOT_PATH));
     }
 
     private void startZooKeeper() throws Exception {
@@ -128,7 +124,7 @@ private void startZooKeeper() throws Exception {
             return;
         }
 
-        File zkDir = new File(spec.storageRootDir, "zookeeper");
+        File zkDir = new File(spec.storageRootDir(), "zookeeper");
         Pair<ZooKeeperServerShim, Integer> zkServerAndPort =
             LocalDLMEmulator.runZookeeperOnAnyPort(zkDir);
         zks = zkServerAndPort.getLeft();
@@ -145,39 +141,43 @@ private void stopZooKeeper() {
     }
 
     private void initializeCluster() throws Exception {
-        log.info("Initializing the stream cluster.");
-        ZooKeeper zkc = null;
-        try (StorageController controller = new HelixStorageController(zkEnsemble)) {
-            zkc = ZooKeeperClient.newBuilder()
-                .connectString(zkEnsemble)
-                .sessionTimeoutMs(60000)
-                .build();
-            Transaction txn = zkc.transaction();
-            txn.create(
-                ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            txn.create(
-                LEDGERS_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            txn.create(
-                LEDGERS_AVAILABLE_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(
+            zkEnsemble,
+            new ExponentialBackoffRetry(100, Integer.MAX_VALUE, 10000)
+        )) {
+            client.start();
+
+            ZkClusterMetadataStore store = new ZkClusterMetadataStore(client, zkEnsemble, ZK_METADATA_ROOT_PATH);
 
+            ClusterMetadata metadata;
             try {
-                txn.commit();
-            } catch (KeeperException ke) {
-                if (Code.NODEEXISTS != ke.code()) {
-                    throw ke;
+                metadata = store.getClusterMetadata();
+                log.info("Loaded cluster metadata : \n{}", metadata);
+            } catch (StorageRuntimeException sre) {
+                if (sre.getCause() instanceof KeeperException.NoNodeException) {
+                    log.info("Initializing the stream cluster.");
+                    store.initializeCluster(spec.numServers() * 2);
+                    log.info("Successfully initialized the stream cluster : \n{}", store.getClusterMetadata());
+                } else {
+                    throw sre;
                 }
             }
+        }
 
-            log.info("Initialize the bookkeeper metadata.");
-
-            // initialize the storage
-            controller.createCluster("stream/helix", spec.numServers() * 2, 1);
-            log.info("Initialized the helix metadata with {} storage containers.", spec.numServers() * 2);
-        } finally {
-            if (null != zkc) {
-                zkc.close();
+        // format the bookkeeper cluster
+        MetadataDrivers.runFunctionWithMetadataBookieDriver(newBookieConfiguration(zkEnsemble), driver -> {
+            try {
+                boolean initialized = driver.getRegistrationManager().initNewCluster();
+                if (initialized) {
+                    log.info("Successfully initialized the segment storage");
+                } else {
+                    log.info("The segment storage was already initialized");
+                }
+            } catch (Exception e) {
+                throw new StorageRuntimeException("Failed to initialize the segment storage", e);
             }
-        }
+            return null;
+        });
     }
 
     private LifecycleComponent startServer() throws Exception {
@@ -193,7 +193,7 @@ private LifecycleComponent startServer() throws Exception {
             }
             LifecycleComponent server = null;
             try {
-                ServerConfiguration serverConf = newServerConfiguration(zkEnsemble);
+                ServerConfiguration serverConf = newBookieConfiguration(zkEnsemble);
                 serverConf.setBookiePort(bookiePort);
                 File bkDir = new File(spec.storageRootDir(), "bookie_" + bookiePort);
                 serverConf.setJournalDirName(bkDir.getPath());
@@ -300,9 +300,6 @@ protected void doStart() {
 
             // create default namespaces
             createDefaultNamespaces();
-
-            // wait for 10 seconds
-            TimeUnit.SECONDS.sleep(10);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index b48ea0d5c..d5ca39772 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -13,6 +13,8 @@
  */
 package org.apache.bookkeeper.stream.server;
 
+import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
+
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import java.io.File;
@@ -35,14 +37,22 @@
 import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
 import org.apache.bookkeeper.stream.server.grpc.GrpcServerSpec;
 import org.apache.bookkeeper.stream.server.service.BookieService;
+import org.apache.bookkeeper.stream.server.service.ClusterControllerService;
+import org.apache.bookkeeper.stream.server.service.CuratorProviderService;
 import org.apache.bookkeeper.stream.server.service.DLNamespaceProviderService;
 import org.apache.bookkeeper.stream.server.service.GrpcService;
+import org.apache.bookkeeper.stream.server.service.RegistrationServiceProvider;
+import org.apache.bookkeeper.stream.server.service.RegistrationStateService;
 import org.apache.bookkeeper.stream.server.service.StatsProviderService;
 import org.apache.bookkeeper.stream.server.service.StorageService;
 import org.apache.bookkeeper.stream.storage.RangeStoreBuilder;
 import org.apache.bookkeeper.stream.storage.StorageResources;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageContainerManager;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
+import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactoryImpl;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.Configuration;
@@ -184,11 +194,35 @@ public static LifecycleComponent startStorageServer(CompositeConfiguration conf,
         // Create the bookie service
         BookieService bookieService = new BookieService(bkConf, rootStatsLogger);
 
+        // Create the curator provider service
+        CuratorProviderService curatorProviderService = new CuratorProviderService(
+            bookieService.serverConf(), dlConf, rootStatsLogger.scope("curator"));
+
         // Create the distributedlog namespace service
         DLNamespaceProviderService dlNamespaceProvider = new DLNamespaceProviderService(
             bookieService.serverConf(),
             dlConf,
-            rootStatsLogger.scope("dl"));
+            rootStatsLogger.scope("dlog"));
+
+        // Create a registration service provider
+        RegistrationServiceProvider regService = new RegistrationServiceProvider(
+            bookieService.serverConf(),
+            dlConf,
+            rootStatsLogger.scope("registration").scope("provider"));
+
+        // Create a cluster controller service
+        ClusterControllerService clusterControllerService = new ClusterControllerService(
+            storageConf,
+            () -> new ClusterControllerImpl(
+                new ZkClusterMetadataStore(
+                    curatorProviderService.get(),
+                    ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
+                    ZK_METADATA_ROOT_PATH),
+                regService.get(),
+                new DefaultStorageContainerController(),
+                new ZkClusterControllerLeaderSelector(curatorProviderService.get(), ZK_METADATA_ROOT_PATH),
+                storageConf),
+            rootStatsLogger.scope("cluster_controller"));
 
         // Create range (stream) store
         RangeStoreBuilder rangeStoreBuilder = RangeStoreBuilder.newBuilder()
@@ -200,16 +234,17 @@ public static LifecycleComponent startStorageServer(CompositeConfiguration conf,
             .withNumStorageContainers(numStorageContainers)
             // the default log backend uri
             .withDefaultBackendUri(dlNamespaceProvider.getDlogUri())
-            // with the storage container manager (currently it is helix)
+            // with zk-based storage container manager
             .withStorageContainerManagerFactory((ignored, storeConf, registry) ->
-                new HelixStorageContainerManager(
-                    ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
-                    "stream/helix",
-                    storeConf,
-                    registry,
+                new ZkStorageContainerManager(
                     myEndpoint,
-                    instanceName,
-                    rootStatsLogger.scope("helix")))
+                    storageConf,
+                    new ZkClusterMetadataStore(
+                        curatorProviderService.get(),
+                        ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
+                        ZK_METADATA_ROOT_PATH),
+                    registry,
+                    rootStatsLogger.scope("sc").scope("manager")))
             // with the inter storage container client manager
             .withRangeStoreFactory(
                 new MVCCStoreFactoryImpl(
@@ -232,15 +267,26 @@ public static LifecycleComponent startStorageServer(CompositeConfiguration conf,
         GrpcService grpcService = new GrpcService(
             serverConf, serverSpec, rpcStatsLogger);
 
+        // Create a registration state service only when service is ready.
+        RegistrationStateService regStateService = new RegistrationStateService(
+            myEndpoint,
+            bookieService.serverConf(),
+            bkConf,
+            regService,
+            rootStatsLogger.scope("registration"));
 
         // Create all the service stack
         return LifecycleComponentStack.newBuilder()
             .withName("storage-server")
             .addComponent(statsProviderService)     // stats provider
             .addComponent(bookieService)            // bookie server
+            .addComponent(curatorProviderService)   // service that provides curator client
             .addComponent(dlNamespaceProvider)      // service that provides dl namespace
+            .addComponent(regService)               // service that provides registration client
+            .addComponent(clusterControllerService) // service that run cluster controller service
             .addComponent(storageService)           // range (stream) store
             .addComponent(grpcService)              // range (stream) server (gRPC)
+            .addComponent(regStateService)          // service that manages server state
             .build();
     }
 
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/ClusterControllerService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/ClusterControllerService.java
new file mode 100644
index 000000000..8e4512704
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/ClusterControllerService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.server.service;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterController;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+
+/**
+ * A service that runs cluster controller.
+ */
+@Slf4j
+public class ClusterControllerService
+    extends AbstractLifecycleComponent<StorageConfiguration> {
+
+    private final Supplier<ClusterController> controllerSupplier;
+    private ClusterController controller;
+
+    public ClusterControllerService(StorageConfiguration conf,
+                                    Supplier<ClusterController> controllerSupplier,
+                                    StatsLogger statsLogger) {
+        super("cluster-controller", conf, statsLogger);
+        this.controllerSupplier = controllerSupplier;
+    }
+
+    @Override
+    protected void doStart() {
+        if (null == controller) {
+            controller = controllerSupplier.get();
+            controller.start();
+            log.info("Successfully started the cluster controller.");
+        }
+    }
+
+    @Override
+    protected void doStop() {
+        if (null != controller) {
+            controller.stop();
+        }
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        // no-op
+    }
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
new file mode 100644
index 000000000..7d404f0d9
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.server.service;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+/**
+ * A service to provide a curator client.
+ */
+public class CuratorProviderService
+    extends AbstractLifecycleComponent<DLConfiguration>
+    implements Supplier<CuratorFramework> {
+
+    private final String zkServers;
+    private final RetryPolicy curatorRetryPolicy;
+    private final CuratorFramework curatorClient;
+
+    public CuratorProviderService(ServerConfiguration bkServerConf,
+                                  DLConfiguration conf,
+                                  StatsLogger statsLogger) {
+        super("curator-provider", conf, statsLogger);
+        this.zkServers = ZKMetadataDriverBase.resolveZkServers(bkServerConf);
+        this.curatorRetryPolicy = new ExponentialBackoffRetry(
+            bkServerConf.getZkRetryBackoffStartMs(),
+            Integer.MAX_VALUE,
+            bkServerConf.getZkRetryBackoffMaxMs());
+        this.curatorClient = CuratorFrameworkFactory
+            .newClient(zkServers, curatorRetryPolicy);
+    }
+
+    @Override
+    protected void doStart() {
+        curatorClient.start();
+    }
+
+    @Override
+    protected void doStop() {
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        curatorClient.close();
+    }
+
+    @Override
+    public CuratorFramework get() {
+        return curatorClient;
+    }
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
index 04d030dd2..e0359ce8d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
@@ -23,6 +23,7 @@
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
+import org.apache.bookkeeper.stream.storage.StorageConstants;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
@@ -79,8 +80,9 @@ public DLNamespaceProviderService(ServerConfiguration bkServerConf,
                                       StatsLogger statsLogger) {
         super("namespace-provider", conf, statsLogger);
 
-        this.dlogUri = URI.create(String.format("distributedlog://%s/stream/storage",
-            ZKMetadataDriverBase.resolveZkServers(bkServerConf)));
+        this.dlogUri = URI.create(String.format("distributedlog://%s%s",
+            ZKMetadataDriverBase.resolveZkServers(bkServerConf),
+            StorageConstants.getStoragePath(StorageConstants.ZK_METADATA_ROOT_PATH)));
         this.bkServerConf = bkServerConf;
         this.dlConf = new DistributedLogConfiguration();
         this.dlConf.loadConf(conf);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
new file mode 100644
index 000000000..8db59fdea
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.server.service;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.SERVERS_PATH;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.discover.ZKRegistrationClient;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.RetryPolicy;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+
+/**
+ * A service that is responsible for registration using bookkeeper registration api.
+ */
+@Slf4j
+public class RegistrationServiceProvider
+    extends AbstractLifecycleComponent<DLConfiguration>
+    implements Supplier<RegistrationClient> {
+
+    private final String zkServers;
+    private final RetryPolicy bkZkRetryPolicy;
+    private final String regPath;
+    private final ScheduledExecutorService regExecutor;
+    private ZooKeeperClient zkClient;
+    private RegistrationClient client;
+
+    public RegistrationServiceProvider(ServerConfiguration bkServerConf,
+                                       DLConfiguration conf,
+                                       StatsLogger statsLogger) {
+        super("registration-service-provider", conf, statsLogger);
+        this.zkServers = ZKMetadataDriverBase.resolveZkServers(bkServerConf);
+        this.regPath = ZK_METADATA_ROOT_PATH + "/" + SERVERS_PATH;
+        this.bkZkRetryPolicy = new BoundExponentialBackoffRetryPolicy(
+            bkServerConf.getZkRetryBackoffStartMs(),
+            bkServerConf.getZkRetryBackoffMaxMs(),
+            Integer.MAX_VALUE);
+        this.regExecutor = Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder().setNameFormat("registration-service-provider-scheduler").build());
+    }
+
+    @Override
+    public RegistrationClient get() {
+        checkNotNull(client, "retrieve registration client before starting registration service");
+        return client;
+    }
+
+    public ZooKeeperClient getZkClient() {
+        return zkClient;
+    }
+
+    public String getRegistrationPath() {
+        return regPath;
+    }
+
+    @SneakyThrows
+    @Override
+    protected void doStart() {
+        if (null == zkClient) {
+            try {
+                zkClient = ZooKeeperClient.newBuilder()
+                    .operationRetryPolicy(bkZkRetryPolicy)
+                    .connectString(zkServers)
+                    .statsLogger(statsLogger.scope("zk"))
+                    .build();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.error("Interrupted at creating zookeeper client to {}", zkServers, e);
+                throw e;
+            } catch (Exception e) {
+                log.error("Failed to create zookeeper client to {}", zkServers, e);
+                throw e;
+            }
+            client = new ZKRegistrationClient(zkClient, regPath, regExecutor);
+        }
+    }
+
+    @Override
+    protected void doStop() {
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        if (null != client) {
+            client.close();
+        }
+        if (null != zkClient) {
+            try {
+                zkClient.close();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.warn("Interrupted at closing zookeeper client to {}", zkServers, e);
+            }
+        }
+        this.regExecutor.shutdown();
+    }
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationStateService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationStateService.java
new file mode 100644
index 000000000..40261c0cf
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationStateService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.server.service;
+
+import java.io.IOException;
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.BookieStateManager;
+import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
+
+/**
+ * A service that manages the registration state for a given server.
+ *
+ * <p>It registers the server to registration service and handle the state transition.
+ */
+@Slf4j
+public class RegistrationStateService
+    extends AbstractLifecycleComponent<BookieConfiguration> {
+
+    private final Endpoint myEndpoint;
+    private final ServerConfiguration bkServerConf;
+    private final RegistrationServiceProvider regServiceProvider;
+    private RegistrationManager regManager;
+    private BookieStateManager stateManager;
+
+    public RegistrationStateService(Endpoint myEndpoint,
+                                    ServerConfiguration bkServerConf,
+                                    BookieConfiguration bookieConf,
+                                    RegistrationServiceProvider serviceProvider,
+                                    StatsLogger statsLogger) {
+        super("registration-state-service", bookieConf, statsLogger);
+        this.myEndpoint = myEndpoint;
+        this.bkServerConf = bkServerConf;
+        this.regServiceProvider = serviceProvider;
+    }
+
+    @Override
+    protected void doStart() {
+        if (null == regManager) {
+            regManager = new ZKRegistrationManager(
+                bkServerConf,
+                regServiceProvider.getZkClient(),
+                regServiceProvider.getRegistrationPath(),
+                () -> {
+                    if (null == stateManager) {
+                        log.warn("Registration state manager is not initialized yet");
+                        return;
+                    }
+                    stateManager.forceToUnregistered();
+                    // schedule a re-register operation
+                    stateManager.registerBookie(false);
+                });
+            try {
+                stateManager = new BookieStateManager(
+                    bkServerConf,
+                    statsLogger.scope("state"),
+                    () -> regManager,
+                    Collections.emptyList(),
+                    () -> NetUtils.endpointToString(myEndpoint));
+                stateManager.initState();
+                stateManager.registerBookie(true).get();
+                log.info("Successfully register myself under registration path {}/{}",
+                    regServiceProvider.getRegistrationPath(), NetUtils.endpointToString(myEndpoint));
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to intiailize a registration state service", e);
+            }
+        }
+    }
+
+    @Override
+    protected void doStop() {
+        stateManager.forceToShuttingDown();
+
+        // turn the server to readonly during shutting down process
+
+        stateManager.forceToReadOnly();
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        stateManager.close();
+        regManager.close();
+    }
+}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterController.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterController.java
new file mode 100644
index 000000000..9ef7fd2e3
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterController.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.api.cluster;
+
+/**
+ * Cluster controller operates the cluster state.
+ */
+public interface ClusterController {
+
+    void start();
+
+    void stop();
+
+}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterControllerLeader.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterControllerLeader.java
new file mode 100644
index 000000000..3d1114416
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterControllerLeader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.api.cluster;
+
+/**
+ * Cluster controller that process the leader logic.
+ */
+public interface ClusterControllerLeader {
+
+    /**
+     * Process the controller logic as a leader.
+     */
+    void processAsLeader() throws Exception;
+
+    /**
+     * Suspend the leadership.
+     */
+    void suspend();
+
+    /**
+     * Resume the leadership.
+     */
+    void resume();
+
+}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterControllerLeaderSelector.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterControllerLeaderSelector.java
new file mode 100644
index 000000000..dff799df2
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterControllerLeaderSelector.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.api.cluster;
+
+/**
+ * A selector to select a leader among controller instances.
+ */
+public interface ClusterControllerLeaderSelector extends AutoCloseable {
+
+    /**
+     * Initialize the selector with <tt>leader</tt> logic.
+     *
+     * @param leader controller leader instance
+     */
+    void initialize(ClusterControllerLeader leader);
+
+    /**
+     * Start the selector.
+     */
+    void start();
+
+    /**
+     * Close the selector.
+     */
+    @Override
+    void close();
+}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
new file mode 100644
index 000000000..935552a9d
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.api.cluster;
+
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+
+/**
+ * Store the cluster related metadata, such as the number of storage containers, the mapping between servers
+ * to storage containers.
+ */
+public interface ClusterMetadataStore extends AutoCloseable {
+
+    /**
+     * Initialize the cluster metadata with the provided <i>numStorageContainers</i>.
+     *
+     * @param numStorageContainers number of storage containers.
+     */
+    void initializeCluster(int numStorageContainers);
+
+    /**
+     * Get the current cluster assignment data.
+     *
+     * @return the cluster assignment data.
+     */
+    ClusterAssignmentData getClusterAssignmentData();
+
+    /**
+     * Update the current cluster assignment data.
+     *
+     * @param assignmentData cluster assignment data
+     */
+    void updateClusterAssignmentData(ClusterAssignmentData assignmentData);
+
+    /**
+     * Watch the current cluster assignment data.
+     *
+     * @param watcher current cluster assignment data watcher
+     * @param executor the executor to run the <tt>watcher</tt>
+     */
+    void watchClusterAssignmentData(Consumer<Void> watcher, Executor executor);
+
+    /**
+     * Unwatch the current cluster assignment data.
+     *
+     * @param watcher current cluster assignment data watcher to remove
+     */
+    void unwatchClusterAssignmentData(Consumer<Void> watcher);
+
+    /**
+     * Returns the cluster metadata presents in the system.
+     *
+     * @return cluster metadata.
+     */
+    ClusterMetadata getClusterMetadata();
+
+    /**
+     * Update the current cluster metadata.
+     *
+     * @param clusterMetadata cluster metadata to update.
+     */
+    void updateClusterMetadata(ClusterMetadata clusterMetadata);
+
+    /**
+     * Close the metadata store.
+     */
+    @Override
+    void close();
+
+}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/package-info.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/package-info.java
new file mode 100644
index 000000000..ccf4fac68
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Define the cluster operations for operating the storage.
+ */
+package org.apache.bookkeeper.stream.storage.api.cluster;
\ No newline at end of file
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/controller/StorageController.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/controller/StorageController.java
deleted file mode 100644
index 28dd31aae..000000000
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/controller/StorageController.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.api.controller;
-
-import java.util.Optional;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-
-/**
- * Interface for managing the storage cluster.
- */
-public interface StorageController extends AutoCloseable {
-
-    /**
-     * Create the cluster.
-     *
-     * @param clusterName          cluster name.
-     * @param numStorageContainers num storage containers.
-     * @param numReplicas          num replicas per storage container.
-     */
-    void createCluster(String clusterName,
-                       int numStorageContainers,
-                       int numReplicas);
-
-    /**
-     * Add a node to a cluster.
-     *
-     * @param clusterName  cluster name.
-     * @param endpointName node endpoint name.
-     */
-    void addNode(String clusterName,
-                 Endpoint endpoint,
-                 Optional<String> endpointName);
-
-}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/controller/package-info.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/controller/package-info.java
deleted file mode 100644
index 27f7cfee1..000000000
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/controller/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Define the controller operations for operating the storage.
- */
-package org.apache.bookkeeper.stream.storage.api.controller;
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
index 5d5c75d82..47855b034 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
@@ -41,7 +41,7 @@
      *
      * @return a future represents the result of starting a storage container.
      */
-    CompletableFuture<Void> start();
+    CompletableFuture<StorageContainer> start();
 
     /**
      * Stop the storage container.
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
index c793742ac..57634d32d 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
@@ -41,15 +41,26 @@
      * @param scId storage container id
      * @return a future represents the started storage container or exception if failed to start.
      */
-    CompletableFuture<Void> startStorageContainer(long scId);
+    CompletableFuture<StorageContainer> startStorageContainer(long scId);
+
+    /**
+     * Stop the storage container in this registry blindly.
+     *
+     * @param scId storage container id
+     * @return a future represents the result of stopping a storage container or exception if failed to start.
+     */
+    default CompletableFuture<Void> stopStorageContainer(long scId) {
+        return stopStorageContainer(scId, null);
+    }
 
     /**
      * Stop the storage container in this registry.
      *
      * @param scId storage container id
+     * @param container storage container instance to stop
      * @return a future represents the result of stopping a storage container or exception if failed to start.
      */
-    CompletableFuture<Void> stopStorageContainer(long scId);
+    CompletableFuture<Void> stopStorageContainer(long scId, StorageContainer container);
 
     /**
      * Close the registry.
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
index b0f8909ad..0699a13e1 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
@@ -14,6 +14,8 @@
 package org.apache.bookkeeper.stream.storage.conf;
 
 import java.io.File;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.common.conf.ComponentConfiguration;
 import org.apache.commons.configuration.CompositeConfiguration;
 
@@ -28,6 +30,8 @@
 
     private static final String SERVE_READONLY_TABLES = "serve.readonly.tables";
 
+    private static final String CONTROLLER_SCHEDULE_INTERVAL_MS = "cluster.controller.schedule.interval.ms";
+
     public StorageConfiguration(CompositeConfiguration conf) {
         super(conf, COMPONENT_PREFIX);
     }
@@ -65,4 +69,25 @@ public boolean getServeReadOnlyTables() {
         return getBoolean(SERVE_READONLY_TABLES, false);
     }
 
+    /**
+     * Get the cluster controller schedule interval in milliseconds. The default value is 30 seconds.
+     *
+     * @return cluster controller schedule interval, in milliseconds.
+     */
+    public long getClusterControllerScheduleIntervalMs() {
+        return getLong(CONTROLLER_SCHEDULE_INTERVAL_MS, TimeUnit.SECONDS.toMillis(30));
+    }
+
+    /**
+     * Set the cluster controller schedule interval.
+     *
+     * @param time time value
+     * @param timeUnit time unit
+     * @return storage configuration
+     */
+    public StorageConfiguration setClusterControllerScheduleInterval(long time, TimeUnit timeUnit) {
+        setProperty(CONTROLLER_SCHEDULE_INTERVAL_MS, timeUnit.toMillis(time));
+        return this;
+    }
+
 }
diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml
index 6ed2f27da..a75403c38 100644
--- a/stream/storage/impl/pom.xml
+++ b/stream/storage/impl/pom.xml
@@ -38,11 +38,20 @@
       <artifactId>stream-storage-service-api</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-java-client-base</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>statelib</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.helix</groupId>
       <artifactId>helix-core</artifactId>
@@ -55,6 +64,13 @@
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+      <version>${project.parent.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageConstants.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageConstants.java
new file mode 100644
index 000000000..d6ec8a309
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageConstants.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage;
+
+import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+
+/**
+ * Defines the storage constants
+ */
+public final class StorageConstants {
+
+    private StorageConstants() {}
+
+    //
+    // metadata related
+    //
+
+    public static final String CONTROLLER_PATH = "controller";
+    public static final String SERVERS_PATH = "servers";
+    public static final String CLUSTER_METADATA_PATH = "metadata";
+    public static final String CLUSTER_ASSIGNMENT_PATH = "assignment";
+    public static final String SEGMENTS_PATH = "segments";
+    public static final String STORAGE_PATH = "storage";
+
+    //
+    // ZooKeeper metadata related.
+    //
+
+    public static final String ZK_METADATA_ROOT_PATH = "/stream";
+
+    public static String getControllerPath(String rootPath) {
+        return rootPath + "/" + CONTROLLER_PATH;
+    }
+
+    public static String getServersPath(String rootPath) {
+        return rootPath + "/" + SERVERS_PATH;
+    }
+
+    public static String getWritableServersPath(String rootPath) {
+        return getServersPath(rootPath) + "/" + AVAILABLE_NODE;
+    }
+
+    public static String getClusterMetadataPath(String rootPath) {
+        return rootPath + "/" + CLUSTER_METADATA_PATH;
+    }
+
+    public static String getClusterAssignmentPath(String rootPath) {
+        return rootPath + "/" + CLUSTER_ASSIGNMENT_PATH;
+    }
+
+    public static String getSegmentsRootPath(String rootPath) {
+        return rootPath + "/" + SEGMENTS_PATH;
+    }
+
+    public static String getStoragePath(String rootPath) {
+        return rootPath + "/" + STORAGE_PATH;
+    }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerImpl.java
new file mode 100644
index 000000000..598e9b294
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerImpl.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import java.time.Duration;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterController;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeaderSelector;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerController;
+
+/**
+ * A service that elects a cluster controller leader for performing cluster actions,
+ * such as assigning containers to servers.
+ */
+public class ClusterControllerImpl implements ClusterController {
+
+    private final ClusterControllerLeaderSelector controllerLeaderSelector;
+
+    public ClusterControllerImpl(ClusterMetadataStore clusterMetadataStore,
+                                 RegistrationClient registrationClient,
+                                 StorageContainerController scController,
+                                 ClusterControllerLeaderSelector clusterControllerLeaderSelector,
+                                 StorageConfiguration conf) {
+        ClusterControllerLeader controllerLeader = new ClusterControllerLeaderImpl(
+            clusterMetadataStore,
+            scController,
+            registrationClient,
+            Duration.ofMillis(conf.getClusterControllerScheduleIntervalMs()));
+
+        this.controllerLeaderSelector = clusterControllerLeaderSelector;
+        this.controllerLeaderSelector.initialize(controllerLeader);
+    }
+
+
+    @Override
+    public void start() {
+        this.controllerLeaderSelector.start();
+    }
+
+    @Override
+    public void stop() {
+        this.controllerLeaderSelector.close();
+    }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImpl.java
new file mode 100644
index 000000000..807bf6974
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImpl.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerController;
+import org.apache.bookkeeper.versioning.Versioned;
+
+@Slf4j
+public class ClusterControllerLeaderImpl implements ClusterControllerLeader, RegistrationListener {
+
+    // the metadata store for reading and writing cluster metadata.
+    private final ClusterMetadataStore clusterMetadataStore;
+
+    // the controller logic for assigning containers
+    private final StorageContainerController scController;
+
+    // permits that the controller leader can perform server changes.
+    @Getter(AccessLevel.PACKAGE)
+    private final Semaphore performServerChangesPermits;
+
+    // registration client that watch/unwatch registered servers.
+    private final RegistrationClient regClient;
+
+    // keep a reference to a set of available servers
+    private volatile Set<BookieSocketAddress> availableServers;
+
+    // variables for suspending controller
+    @Getter(AccessLevel.PACKAGE)
+    private final Object suspensionLock = new Object();
+    private volatile boolean suspended = false;
+
+    // last successful assignment happened at (timestamp)
+    @Getter(AccessLevel.PACKAGE)
+    private long lastSuccessfulAssigmentAt;
+
+    // the min interval that controller is scheduled to assign containers
+    private final Duration scheduleDuration;
+
+    ClusterControllerLeaderImpl(ClusterMetadataStore clusterMetadataStore,
+                                StorageContainerController scController,
+                                RegistrationClient regClient,
+                                Duration scheduleDuration) {
+        this.clusterMetadataStore = clusterMetadataStore;
+        this.scController = scController;
+        this.regClient = regClient;
+        this.performServerChangesPermits = new Semaphore(0);
+        this.lastSuccessfulAssigmentAt = -1L;
+        this.scheduleDuration = scheduleDuration;
+    }
+
+    /**
+     * Suspend the controller if the leader disconnects from zookeeper
+     */
+    @Override
+    public void suspend() {
+        synchronized (suspensionLock) {
+            suspended = true;
+            suspensionLock.notifyAll();
+        }
+    }
+
+    boolean isSuspended() {
+        return suspended;
+    }
+
+    /**
+     * Resume the controller.
+     */
+    @Override
+    public void resume() {
+        synchronized (suspensionLock) {
+            suspended = false;
+            suspensionLock.notifyAll();
+        }
+    }
+
+    @Override
+    public void processAsLeader() throws Exception {
+        log.info("Become controller leader to monitor servers for assigning storage containers.");
+
+        performServerChangesPermits.release();
+
+        // monitoring the servers
+        try {
+            this.regClient.watchWritableBookies(this).get();
+        } catch (Exception e) {
+            log.warn("Controller leader fails to watch servers : {}, giving up leadership", e.getMessage());
+            throw e;
+        }
+
+        // the leader is looping here to react to changes
+        while (true) {
+            try {
+                checkSuspension();
+
+                processServerChange();
+            } catch (InterruptedException ie) {
+                log.warn("Controller leader is interrupted, giving up leadership");
+
+                // stop monitoring the servers
+                this.regClient.unwatchWritableBookies(this);
+                throw ie;
+            } catch (Exception e) {
+                // if the leader is suspended due to losing connection to zookeeper
+                // we don't give leadership until it becomes leader again or being interrupted by curator
+                if (!suspended) {
+                    log.warn("Controller leader encountered exceptions on processing server changes," +
+                        " giving up leadership");
+
+                    // stop monitoring the servers
+                    this.regClient.unwatchWritableBookies(this);
+                    throw e;
+                }
+            }
+        }
+    }
+
+    private void checkSuspension() throws InterruptedException {
+        synchronized (suspensionLock) {
+            while (suspended) {
+                log.info("Controller leader is suspended, waiting for to be resumed");
+                suspensionLock.wait();
+                log.info("Controller leader is woke up from suspension");
+            }
+        }
+    }
+
+    private void processServerChange() throws InterruptedException {
+        // check if the leader can perform server changes
+        performServerChangesPermits.acquire();
+
+        long elapsedMs = System.currentTimeMillis() - lastSuccessfulAssigmentAt;
+        long remainingMs = scheduleDuration.toMillis() - elapsedMs;
+        if (remainingMs > 0) {
+            log.info("Waiting {} milliseconds for controller to assign containers", remainingMs);
+            TimeUnit.MILLISECONDS.sleep(remainingMs);
+        }
+
+        // now, the controller has permits and meet the time requirement for assigning containers.
+        performServerChangesPermits.drainPermits();
+
+        Set<BookieSocketAddress> availableServersSnapshot = availableServers;
+        if (null == availableServersSnapshot || availableServersSnapshot.isEmpty()) {
+            // haven't received any servers from registration service, wait for 200ms and retry.
+            if (lastSuccessfulAssigmentAt < 0) {
+                log.info("No servers is alive yet. Backoff 200ms and retry.");
+                TimeUnit.MILLISECONDS.sleep(200);
+                performServerChangesPermits.release();
+                return;
+            } else {
+                // there was already a successful assignment but all the servers are gone.
+                // it can be a registration service issue, so don't attempt to reassign the containers.
+                // return here direct to wait next server change
+                return;
+            }
+        }
+
+        ClusterMetadata clusterMetadata = clusterMetadataStore.getClusterMetadata();
+        ClusterAssignmentData currentState = clusterMetadataStore.getClusterAssignmentData();
+
+        // servers are changed, process the change.
+        ClusterAssignmentData newState = scController.computeIdealState(
+            clusterMetadata,
+            currentState,
+            availableServersSnapshot);
+
+        if (newState.equals(currentState)) {
+            // no assignment state is changed, so do nothing
+            if (log.isDebugEnabled()) {
+                log.debug("Assignment state is unchanged - {}", newState);
+            }
+        } else {
+            // update the assignment state
+            lastSuccessfulAssigmentAt = System.currentTimeMillis();
+            clusterMetadataStore.updateClusterAssignmentData(newState);
+        }
+    }
+
+    @Override
+    public void onBookiesChanged(Versioned<Set<BookieSocketAddress>> bookies) {
+        log.info("Cluster topology is changed - new cluster : {}", bookies);
+        // when bookies are changed, notify the leader to take actions
+        this.availableServers = bookies.getValue();
+        performServerChangesPermits.release();
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
new file mode 100644
index 000000000..da7486711
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import lombok.AccessLevel;
+import lombok.Data;
+import lombok.Getter;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+
+/**
+ * An in-memory implementation of {@link ClusterMetadataStore}.
+ */
+public class InMemClusterMetadataStore implements ClusterMetadataStore {
+
+    @Data
+    private static class WatcherAndExecutor {
+        private final Consumer<Void> watcher;
+        private final Executor executor;
+    }
+
+    private final Map<Consumer<Void>, WatcherAndExecutor> watchers;
+
+    private ClusterMetadata metadata;
+    private ClusterAssignmentData assignmentData;
+
+    InMemClusterMetadataStore(int numStorageContainers) {
+        this.watchers = Maps.newHashMap();
+        initializeCluster(numStorageContainers);
+    }
+
+    synchronized int getNumWatchers() {
+        return watchers.size();
+    }
+
+    @Override
+    public synchronized void initializeCluster(int numStorageContainers) {
+        this.metadata = ClusterMetadata.newBuilder()
+            .setNumStorageContainers(numStorageContainers)
+            .build();
+        this.assignmentData = ClusterAssignmentData.newBuilder().build();
+    }
+
+    @Override
+    public synchronized ClusterAssignmentData getClusterAssignmentData() {
+        return assignmentData;
+    }
+
+    @Override
+    public synchronized void updateClusterAssignmentData(ClusterAssignmentData assignmentData) {
+        this.assignmentData = assignmentData;
+        watchers.values().forEach(wae -> wae.executor.execute(() -> wae.watcher.accept(null)));
+    }
+
+    @Override
+    public synchronized void watchClusterAssignmentData(Consumer<Void> watcher, Executor executor) {
+        WatcherAndExecutor wae = watchers.get(watcher);
+        if (null == wae) {
+            wae = new WatcherAndExecutor(watcher, executor);
+            watchers.put(watcher, wae);
+        }
+    }
+
+    @Override
+    public synchronized void unwatchClusterAssignmentData(Consumer<Void> watcher) {
+        watchers.remove(watcher);
+    }
+
+    @Override
+    public synchronized ClusterMetadata getClusterMetadata() {
+        return metadata;
+    }
+
+    @Override
+    public synchronized void updateClusterMetadata(ClusterMetadata clusterMetadata) {
+        this.metadata = clusterMetadata;
+    }
+
+    @Override
+    public void close() {
+        // no-op
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelector.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelector.java
new file mode 100644
index 000000000..4f4a096a8
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelector.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getControllerPath;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeaderSelector;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+
+/**
+ * A controller leader selector implemented using zookeeper.
+ */
+@Slf4j
+public class ZkClusterControllerLeaderSelector implements ClusterControllerLeaderSelector, ConnectionStateListener {
+
+    private final CuratorFramework client;
+    // the zookeeper path that controller is using for leader election
+    private final String controllerZkPath;
+
+    // leader selector to select leader
+    private ClusterControllerLeader leader;
+    private LeaderSelector leaderSelector;
+
+    public ZkClusterControllerLeaderSelector(CuratorFramework client,
+                                             String zkRootPath) {
+        this.client = client;
+        this.controllerZkPath = getControllerPath(zkRootPath);
+    }
+
+    @Override
+    public void initialize(ClusterControllerLeader leader) {
+        this.leader = leader;
+        ZkClusterControllerLeaderSelectorListener zkLeader = new ZkClusterControllerLeaderSelectorListener(leader);
+        this.leaderSelector = new LeaderSelector(client, controllerZkPath, zkLeader);
+        client.getConnectionStateListenable().addListener(this);
+    }
+
+    @Override
+    public void start() {
+        checkNotNull(leaderSelector, "leader selector is not initialized");
+        leaderSelector.autoRequeue();
+        leaderSelector.start();
+    }
+
+    @Override
+    public void close() {
+        if (null != leaderSelector) {
+            leaderSelector.interruptLeadership();
+            leaderSelector.close();
+        }
+    }
+
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState) {
+        switch (newState) {
+            case LOST:
+                log.warn("Connection to zookeeper is lost. So interrupt my current leadership.");
+                leaderSelector.interruptLeadership();
+                break;
+            case SUSPENDED:
+                if (leaderSelector.hasLeadership()) {
+                    log.info("Connection to zookeeper is disconnected, suspend the leader until it is reconnected.");
+                    leader.suspend();
+                }
+                break;
+            case RECONNECTED:
+                if (leaderSelector.hasLeadership()) {
+                    log.info("Connection to zookeeper is reconnected, resume the leader");
+                    leader.resume();
+                }
+                break;
+            default:
+                break;
+        }
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorListener.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorListener.java
new file mode 100644
index 000000000..87af0f403
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import org.apache.curator.framework.state.ConnectionState;
+
+/**
+ * This is the controller leader which watches the servers in the cluster and handles server level failures.
+ *
+ * <p>The controller leader is responsible for making sure all storage containers are assigned to servers, and load
+ * balancing storage containers if necessary.
+ */
+@Slf4j
+class ZkClusterControllerLeaderSelectorListener implements LeaderSelectorListener {
+
+    private final ClusterControllerLeader controller;
+
+
+    ZkClusterControllerLeaderSelectorListener(ClusterControllerLeader controller) {
+        this.controller = controller;
+    }
+
+    @Override
+    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
+        controller.processAsLeader();
+    }
+
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState) {
+        log.info("zookeeper connection state changed to {} for cluster controller", newState);
+    }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
new file mode 100644
index 000000000..bcf70c3f3
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getClusterAssignmentPath;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getClusterMetadataPath;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getSegmentsRootPath;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getServersPath;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getStoragePath;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getWritableServersPath;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.metadata.DLMetadata;
+
+/**
+ * A zookeeper based implementation of cluster metadata store.
+ */
+@Slf4j
+public class ZkClusterMetadataStore implements ClusterMetadataStore {
+
+    private final CuratorFramework client;
+
+    private final String zkServers;
+    private final String zkRootPath;
+    private final String zkClusterMetadataPath;
+    private final String zkClusterAssignmentPath;
+
+    private final Map<Consumer<Void>, NodeCacheListener> assignmentDataConsumers;
+    private NodeCache assignmentDataCache;
+
+    private volatile boolean closed = false;
+
+    public ZkClusterMetadataStore(CuratorFramework client, String zkServers, String zkRootPath) {
+        this.client = client;
+        this.zkServers = zkServers;
+        this.zkRootPath = zkRootPath;
+        this.zkClusterMetadataPath = getClusterMetadataPath(zkRootPath);
+        this.zkClusterAssignmentPath = getClusterAssignmentPath(zkRootPath);
+        this.assignmentDataConsumers = new HashMap<>();
+    }
+
+    synchronized int getNumWatchers() {
+        return assignmentDataConsumers.size();
+    }
+
+    @Override
+    public void close() {
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            closed = true;
+
+            if (null != assignmentDataCache) {
+                try {
+                    assignmentDataCache.close();
+                } catch (IOException e) {
+                    log.warn("Failed to close assignment data cache", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void initializeCluster(int numStorageContainers) {
+        ClusterMetadata metadata = ClusterMetadata.newBuilder()
+            .setNumStorageContainers(numStorageContainers)
+            .build();
+        ClusterAssignmentData assigmentData = ClusterAssignmentData.newBuilder()
+            .build();
+        try {
+            // we are using dlog for the storage backend, so we need to initialize the dlog namespace
+            BKDLConfig dlogConfig = new BKDLConfig(
+                zkServers, getSegmentsRootPath(zkRootPath));
+            DLMetadata dlogMetadata = DLMetadata.create(dlogConfig);
+
+            client.transaction()
+                .forOperations(
+                    client.transactionOp().create().forPath(zkRootPath),
+                    client.transactionOp().create().forPath(zkClusterMetadataPath, metadata.toByteArray()),
+                    client.transactionOp().create().forPath(zkClusterAssignmentPath, assigmentData.toByteArray()),
+                    client.transactionOp().create().forPath(getServersPath(zkRootPath)),
+                    client.transactionOp().create().forPath(getWritableServersPath(zkRootPath)),
+                    client.transactionOp().create().forPath(getStoragePath(zkRootPath), dlogMetadata.serialize()));
+        } catch (Exception e) {
+            throw new StorageRuntimeException("Failed to initialize storage cluster with "
+                + numStorageContainers + " storage containers", e);
+        }
+    }
+
+    @Override
+    public ClusterAssignmentData getClusterAssignmentData() {
+        try {
+            byte[] data = client.getData().forPath(zkClusterAssignmentPath);
+            return ClusterAssignmentData.parseFrom(data);
+        } catch (InvalidProtocolBufferException ie) {
+            throw new StorageRuntimeException("The cluster assignment data from zookeeper @"
+                + zkClusterAssignmentPath + " is corrupted", ie);
+        } catch (Exception e) {
+            throw new StorageRuntimeException("Failed to fetch cluster assignment data from zookeeper @"
+                + zkClusterAssignmentPath, e);
+        }
+    }
+
+    @Override
+    public void updateClusterAssignmentData(ClusterAssignmentData assigmentData) {
+        byte[] data = assigmentData.toByteArray();
+        try {
+            client.setData().forPath(zkClusterAssignmentPath, data);
+        } catch (Exception e) {
+            throw new StorageRuntimeException("Failed to update cluster assignment data to zookeeper @"
+                + zkClusterAssignmentPath, e);
+        }
+    }
+
+    @Override
+    public void watchClusterAssignmentData(Consumer<Void> watcher, Executor executor) {
+        synchronized (this) {
+            if (assignmentDataCache == null) {
+                assignmentDataCache = new NodeCache(client, zkClusterAssignmentPath);
+                try {
+                    assignmentDataCache.start();
+                } catch (Exception e) {
+                    throw new StorageRuntimeException("Failed to watch cluster assignment data", e);
+                }
+            }
+            NodeCacheListener listener = assignmentDataConsumers.get(watcher);
+            if (null == listener) {
+                listener = () -> watcher.accept(null);
+                assignmentDataConsumers.put(watcher, listener);
+                assignmentDataCache.getListenable().addListener(listener, executor);
+            }
+        }
+    }
+
+    @Override
+    public void unwatchClusterAssignmentData(Consumer<Void> watcher) {
+        synchronized (this) {
+            NodeCacheListener listener = assignmentDataConsumers.remove(watcher);
+            if (null != listener && null != assignmentDataCache) {
+                assignmentDataCache.getListenable().removeListener(listener);
+            }
+            if (assignmentDataConsumers.isEmpty() && null != assignmentDataCache) {
+                try {
+                    assignmentDataCache.close();
+                } catch (IOException e) {
+                    log.warn("Failed to close assignment data cache when there is no watcher", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public ClusterMetadata getClusterMetadata() {
+        try {
+            byte[] data = client.getData().forPath(zkClusterMetadataPath);
+            return ClusterMetadata.parseFrom(data);
+        } catch (InvalidProtocolBufferException ie) {
+            throw new StorageRuntimeException("The cluster metadata from zookeeper @"
+                + zkClusterMetadataPath + " is corrupted", ie);
+        } catch (Exception e) {
+            throw new StorageRuntimeException("Failed to fetch cluster metadata from zookeeper @"
+                + zkClusterMetadataPath, e);
+        }
+    }
+
+    @Override
+    public void updateClusterMetadata(ClusterMetadata metadata) {
+        byte[] data = metadata.toByteArray();
+        try {
+            client.setData().forPath(zkClusterMetadataPath, data);
+        } catch (Exception e) {
+            throw new StorageRuntimeException("Failed to update cluster metadata to zookeeper @"
+                + zkClusterMetadataPath, e);
+        }
+    }
+
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/package-info.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/package-info.java
new file mode 100644
index 000000000..b843c8e3c
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Cluster operation implementation.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
\ No newline at end of file
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerController.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerController.java
new file mode 100644
index 000000000..2a3d70478
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerController.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * The default implementation of storage container controller.
+ *
+ * <p>The goal of this controller is uniformly distributing storage containers across all alive servers in
+ * the cluster.
+ *
+ * <p>The algorithm here is based on the count-based stream balancer in distributedlog-proxy-server.
+ */
+@Slf4j
+public class DefaultStorageContainerController implements StorageContainerController {
+
+    static final class ServerAssignmentDataComparator
+        implements Comparator<Pair<BookieSocketAddress, LinkedList<Long>>> {
+
+        @Override
+        public int compare(Pair<BookieSocketAddress, LinkedList<Long>> o1, Pair<BookieSocketAddress, LinkedList<Long>> o2) {
+            int res = Integer.compare(o1.getValue().size(), o2.getValue().size());
+            if (0 == res) {
+                // two servers have same number of container
+                // the order of these two servers doesn't matter, so use any attribute than can provide deterministic
+                // ordering during state computation is good enough
+                return String.CASE_INSENSITIVE_ORDER.compare(
+                    o1.getKey().toString(),
+                    o2.getKey().toString());
+            } else {
+                return res;
+            }
+        }
+    }
+
+    @Override
+    public ClusterAssignmentData computeIdealState(ClusterMetadata clusterMetadata,
+                                                   ClusterAssignmentData currentState,
+                                                   Set<BookieSocketAddress> currentCluster) {
+
+        if (currentCluster.isEmpty()) {
+            log.info("Current cluster is empty. No alive server is found.");
+            return currentState;
+        }
+
+        // 1. get current server assignments
+        Map<BookieSocketAddress, Set<Long>> currentServerAssignments;
+        try {
+            currentServerAssignments = currentState.getServersMap()
+                .entrySet()
+                .stream()
+                .collect(Collectors.toMap(
+                    e1 -> {
+                        try {
+                            return new BookieSocketAddress(e1.getKey());
+                        } catch (UnknownHostException uhe) {
+                            log.error("Invalid cluster ");
+                            throw new UncheckedExecutionException("Invalid server found in current assignment map"
+                                + e1.getKey(), uhe);
+                        }
+                    },
+                    e2 -> e2.getValue().getContainersList().stream().collect(Collectors.toSet())
+                ));
+        } catch (UncheckedExecutionException uee) {
+            log.warn("Invalid cluster assignment data is found : {} - {}. Recompute assignment from empty state",
+                currentState, uee.getCause().getMessage());
+            currentServerAssignments = Maps.newHashMap();
+        }
+        Set<BookieSocketAddress> currentServersAssigned = currentServerAssignments.keySet();
+
+        // 2. if no servers is assigned, initialize the ideal state
+        if (currentServersAssigned.isEmpty()) {
+            return initializeIdealState(clusterMetadata, currentCluster);
+        }
+
+        // 3. get the cluster diffs
+        Set<BookieSocketAddress> serversAdded =
+            Sets.difference(currentCluster, currentServersAssigned).immutableCopy();
+        Set<BookieSocketAddress> serversRemoved =
+            Sets.difference(currentServersAssigned, currentCluster).immutableCopy();
+
+        if (serversAdded.isEmpty() && serversRemoved.isEmpty()) {
+            // cluster is unchanged, assuming the current state is ideal, no re-assignment is required.
+            return currentState;
+        }
+
+        log.info("Storage container controller detects cluster changed:\n"
+                + "\t {} servers added: {}\n\t {} servers removed: {}",
+            serversAdded.size(), serversAdded, serversRemoved.size(), serversRemoved);
+
+        // 4. compute the containers that owned by servers removed. these containers are needed to be reassigned.
+        Set<Long> containersToReassign = currentServerAssignments.entrySet().stream()
+            .filter(serverEntry -> !currentCluster.contains(serverEntry.getKey()))
+            .flatMap(serverEntry -> serverEntry.getValue().stream())
+            .collect(Collectors.toSet());
+
+        // 5. use an ordered set as priority deque to sort the servers by the number of assigned containers
+        TreeSet<Pair<BookieSocketAddress, LinkedList<Long>>> assignmentQueue
+            = new TreeSet<>(new ServerAssignmentDataComparator());
+        for (Map.Entry<BookieSocketAddress, Set<Long>> entry : currentServerAssignments.entrySet()) {
+            BookieSocketAddress host = entry.getKey();
+
+            if (!currentCluster.contains(host)) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Host {} is not in current cluster anymore", host);
+                }
+                continue;
+            } else {
+                if (log.isTraceEnabled()) {
+                    log.trace("Adding host {} to assignment queue", host);
+                }
+                assignmentQueue.add(Pair.of(host, Lists.newLinkedList(entry.getValue())));
+            }
+        }
+
+        // 6. add new servers
+        for (BookieSocketAddress server : serversAdded) {
+            assignmentQueue.add(Pair.of(server, Lists.newLinkedList()));
+        }
+
+        // 7. assign the containers that are needed to be reassigned.
+        for (Long containerId : containersToReassign) {
+            Pair<BookieSocketAddress, LinkedList<Long>> leastLoadedServer = assignmentQueue.pollFirst();
+            leastLoadedServer.getValue().add(containerId);
+            assignmentQueue.add(leastLoadedServer);
+        }
+
+        // 8. rebalance the containers if needed
+        int diffAllowed;
+        if (assignmentQueue.size() > clusterMetadata.getNumStorageContainers()) {
+            diffAllowed = 1;
+        } else {
+            diffAllowed = clusterMetadata.getNumStorageContainers() % assignmentQueue.size() == 0 ? 0 : 1;
+        }
+
+        Pair<BookieSocketAddress, LinkedList<Long>> leastLoaded = assignmentQueue.first();
+        Pair<BookieSocketAddress, LinkedList<Long>> mostLoaded = assignmentQueue.last();
+        while (mostLoaded.getValue().size() - leastLoaded.getValue().size() > diffAllowed) {
+            leastLoaded = assignmentQueue.pollFirst();
+            mostLoaded = assignmentQueue.pollLast();
+
+            // move container from mostLoaded to leastLoaded
+            Long containerId = mostLoaded.getValue().removeFirst();
+            // add the container to the end to avoid balancing this container again.
+            leastLoaded.getValue().addLast(containerId);
+
+            assignmentQueue.add(leastLoaded);
+            assignmentQueue.add(mostLoaded);
+
+            leastLoaded = assignmentQueue.first();
+            mostLoaded = assignmentQueue.last();
+        }
+
+        // 9. the new ideal state is computed, finalize it
+        Map<String, ServerAssignmentData> newAssignmentMap = Maps.newHashMap();
+        assignmentQueue.forEach(assignment -> newAssignmentMap.put(
+            assignment.getKey().toString(),
+            ServerAssignmentData.newBuilder()
+                .addAllContainers(assignment.getValue())
+                .build()));
+        return ClusterAssignmentData.newBuilder()
+            .putAllServers(newAssignmentMap)
+            .build();
+    }
+
+    static ClusterAssignmentData initializeIdealState(ClusterMetadata clusterMetadata,
+                                                      Set<BookieSocketAddress> currentCluster) {
+        List<BookieSocketAddress> serverList = Lists.newArrayListWithExpectedSize(currentCluster.size());
+        serverList.addAll(currentCluster);
+        Collections.shuffle(serverList);
+
+        int numServers = currentCluster.size();
+        int numTotalContainers = (int) clusterMetadata.getNumStorageContainers();
+        int numContainersPerServer = numTotalContainers / currentCluster.size();
+
+        Map<String, ServerAssignmentData> assignmentMap = Maps.newHashMap();
+        for (int serverIdx = 0; serverIdx < serverList.size(); serverIdx++) {
+            BookieSocketAddress server = serverList.get(serverIdx);
+
+            int finalServerIdx = serverIdx;
+            ServerAssignmentData assignmentData = ServerAssignmentData.newBuilder()
+                .addAllContainers(
+                    LongStream.rangeClosed(0, numContainersPerServer).boxed()
+                        .map(j -> j * numServers + finalServerIdx)
+                        .filter(containerId -> containerId < numTotalContainers)
+                        .collect(Collectors.toSet()))
+                .build();
+            assignmentMap.put(server.toString(), assignmentData);
+        }
+
+        return ClusterAssignmentData.newBuilder()
+            .putAllServers(assignmentMap)
+            .build();
+    }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
index 39e00c06c..8217e8922 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
@@ -63,8 +63,8 @@ public long getId() {
     }
 
     @Override
-    public CompletableFuture<Void> start() {
-        return CompletableFuture.completedFuture(null);
+    public CompletableFuture<StorageContainer> start() {
+        return CompletableFuture.completedFuture(this);
     }
 
     @Override
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/LocalStorageContainerManager.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/LocalStorageContainerManager.java
index edb1e5f5d..10469ae6c 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/LocalStorageContainerManager.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/LocalStorageContainerManager.java
@@ -22,6 +22,7 @@
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -54,7 +55,7 @@ public Endpoint getStorageContainer(long scId) {
 
     @Override
     protected void doStart() {
-        List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(numStorageContainers);
+        List<CompletableFuture<StorageContainer>> futures = Lists.newArrayListWithExpectedSize(numStorageContainers);
         for (int scId = 0; scId < numStorageContainers; scId++) {
             futures.add(this.registry.startStorageContainer(scId));
         }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerController.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerController.java
new file mode 100644
index 000000000..246f52c58
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerController.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import java.util.Set;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+
+/**
+ * Storage container controller is used for assigning containers to servers.
+ */
+public interface StorageContainerController {
+
+    /**
+     * Compute the ideal container assignment state based on hosts alive in the cluster.
+     *
+     * @param clusterMetadata cluster metadata
+     * @param currentState current container assignment state
+     * @param currentCluster current servers alive in the cluster
+     * @return the compute ideal assignment state
+     */
+    ClusterAssignmentData computeIdealState(ClusterMetadata clusterMetadata,
+                                            ClusterAssignmentData currentState,
+                                            Set<BookieSocketAddress> currentCluster);
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
index 0905ce972..52fa06085 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
@@ -172,7 +172,7 @@ public long getId() {
     }
 
     @Override
-    public CompletableFuture<Void> start() {
+    public CompletableFuture<StorageContainer> start() {
         log.info("Starting storage container ({}) ...", getId());
 
         List<CompletableFuture<Void>> futures = Lists.newArrayList(
@@ -181,7 +181,7 @@ public long getId() {
 
         return FutureUtils.collect(futures).thenApply(ignored -> {
             log.info("Successfully started storage container ({}).", getId());
-            return null;
+            return StorageContainerImpl.this;
         });
     }
 
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
index 6d27f6616..4499801dd 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
@@ -37,7 +37,7 @@
     private static final String COMPONENT_NAME = StorageContainerRegistry.class.getSimpleName();
 
     private final StorageContainerFactory scFactory;
-    private final ConcurrentMap<Long, StorageContainer> groups;
+    private final ConcurrentMap<Long, StorageContainer> containers;
     private final ReentrantReadWriteLock closeLock;
     private final StorageContainer failRequestStorageContainer;
     private boolean closed = false;
@@ -46,27 +46,27 @@ public StorageContainerRegistryImpl(StorageContainerFactory factory,
                                         OrderedScheduler scheduler) {
         this.scFactory = factory;
         this.failRequestStorageContainer = FailRequestStorageContainer.of(scheduler);
-        this.groups = Maps.newConcurrentMap();
+        this.containers = Maps.newConcurrentMap();
         this.closeLock = new ReentrantReadWriteLock();
     }
 
     @VisibleForTesting
     public void setStorageContainer(long scId, StorageContainer group) {
-        groups.put(scId, group);
+        containers.put(scId, group);
     }
 
     @Override
     public int getNumStorageContainers() {
-        return groups.size();
+        return containers.size();
     }
 
     @Override
     public StorageContainer getStorageContainer(long storageContainerId) {
-        return groups.getOrDefault(storageContainerId, failRequestStorageContainer);
+        return containers.getOrDefault(storageContainerId, failRequestStorageContainer);
     }
 
     @Override
-    public CompletableFuture<Void> startStorageContainer(long scId) {
+    public CompletableFuture<StorageContainer> startStorageContainer(long scId) {
         closeLock.readLock().lock();
         try {
             return unsafeStartStorageContainer(scId);
@@ -75,49 +75,71 @@ public StorageContainer getStorageContainer(long storageContainerId) {
         }
     }
 
-    private CompletableFuture<Void> unsafeStartStorageContainer(long scId) {
+    private CompletableFuture<StorageContainer> unsafeStartStorageContainer(long scId) {
         if (closed) {
             return FutureUtils.exception(new ObjectClosedException(COMPONENT_NAME));
         }
 
-        if (groups.containsKey(scId)) {
+        if (containers.containsKey(scId)) {
             return FutureUtils.exception(new StorageException("StorageContainer " + scId + " already registered"));
         }
 
         StorageContainer newStorageContainer = scFactory.createStorageContainer(scId);
-        StorageContainer oldStorageContainer = groups.putIfAbsent(scId, newStorageContainer);
+        StorageContainer oldStorageContainer = containers.putIfAbsent(scId, newStorageContainer);
         if (null != oldStorageContainer) {
             newStorageContainer.close();
             return FutureUtils.exception(new StorageException("StorageContainer " + scId + " already registered"));
         }
 
         log.info("Registered StorageContainer ('{}').", scId);
-        return newStorageContainer.start();
+        return newStorageContainer.start()
+            .whenComplete((container, cause) -> {
+                if (null != cause) {
+                    if (containers.remove(scId, newStorageContainer)) {
+                        log.warn("De-registered StorageContainer ('{}') when failed to start", scId, cause);
+                    } else {
+                        log.warn("Fail to de-register StorageContainer ('{}') when failed to start", scId, cause);
+                    }
+                } else {
+                    log.info("Successfully started registered StorageContainer ('{}').", scId);
+                }
+            });
     }
 
     @Override
-    public CompletableFuture<Void> stopStorageContainer(long scId) {
+    public CompletableFuture<Void> stopStorageContainer(long scId, StorageContainer container) {
         closeLock.readLock().lock();
         try {
-            return unsafeStopStorageContainer(scId);
+            return unsafeStopStorageContainer(scId, container);
         } finally {
             closeLock.readLock().unlock();
         }
     }
 
-    private CompletableFuture<Void> unsafeStopStorageContainer(long scId) {
+    private CompletableFuture<Void> unsafeStopStorageContainer(long scId, StorageContainer container) {
         if (closed) {
             return FutureUtils.exception(new ObjectClosedException(COMPONENT_NAME));
         }
 
-        StorageContainer group = groups.remove(scId);
+        if (null == container) {
+            StorageContainer existingContainer = containers.remove(scId);
+            if (null != existingContainer) {
+                log.info("Unregistered StorageContainer ('{}').", scId);
+                return existingContainer.stop();
+            } else {
+                return FutureUtils.Void();
+            }
+        } else {
+            boolean removed = containers.remove(scId, container);
 
-        if (null == group) {
-            return FutureUtils.value(null);
-        }
+            if (removed) {
+                log.info("Unregistered StorageContainer ('{}').", scId);
+            }
 
-        log.info("Unregistered StorageContainer ('{}').", scId);
-        return group.stop();
+            // no matter we successfully removed the containers or not, we need to close the current container.
+            // this ensure the resources held by the passed-in container are released.
+            return container.stop();
+        }
     }
 
     @Override
@@ -133,7 +155,7 @@ public void close() {
         }
 
         // close all the ranges
-        groups.values().forEach(StorageContainer::close);
-        groups.clear();
+        containers.values().forEach(StorageContainer::close);
+        containers.clear();
     }
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java
new file mode 100644
index 000000000..f77244cce
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+
+/**
+ * A zookeeper based implementation of {@link StorageContainerManager}.
+ */
+@Slf4j
+public class ZkStorageContainerManager
+    extends AbstractLifecycleComponent<StorageConfiguration>
+    implements StorageContainerManager, Consumer<Void> {
+
+    private final Endpoint endpoint;
+    private final ClusterMetadataStore metadataStore;
+    private final StorageContainerRegistry registry;
+    private final ScheduledExecutorService executor;
+
+    // for routing purpose
+    private volatile ClusterAssignmentData clusterAssignmentData;
+    private volatile Map<Endpoint, ServerAssignmentData> clusterAssignmentMap;
+    private volatile ServerAssignmentData myAssignmentData;
+    private volatile ConcurrentLongHashMap<Endpoint> containerAssignmentMap;
+
+    // a probe task to probe containers and make sure this manager running containers as assigned
+    private ScheduledFuture<?> containerProbeTask;
+    private final Duration probeInterval;
+
+    @Getter(AccessLevel.PACKAGE)
+    private final Map<Long, StorageContainer> liveContainers;
+    @Getter(AccessLevel.PACKAGE)
+    private final Set<Long> pendingStartStopContainers;
+
+    public ZkStorageContainerManager(Endpoint myEndpoint,
+                                     StorageConfiguration conf,
+                                     ClusterMetadataStore clusterMetadataStore,
+                                     StorageContainerRegistry registry,
+                                     StatsLogger statsLogger) {
+        super("zk-storage-container-manager", conf, statsLogger);
+        this.endpoint = myEndpoint;
+        this.metadataStore = clusterMetadataStore;
+        this.registry = registry;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder().setNameFormat("zk-storage-container-manager").build());
+        this.liveContainers = Collections.synchronizedMap(Maps.newConcurrentMap());
+        this.pendingStartStopContainers = Collections.synchronizedSet(Sets.newConcurrentHashSet());
+        this.containerAssignmentMap = new ConcurrentLongHashMap<>();
+        this.clusterAssignmentMap = Maps.newHashMap();
+        // probe the containers every 1/2 of controller scheduling interval. this ensures the manager
+        // can attempt to start containers before controller reassign them.
+        this.probeInterval = Duration.ofMillis(conf.getClusterControllerScheduleIntervalMs() / 2);
+    }
+
+    @Override
+    protected void doStart() {
+        // watch the cluster assignment data
+        metadataStore.watchClusterAssignmentData(this, executor);
+        log.info("Watched cluster assignment data.");
+
+        // schedule the container probe task
+        containerProbeTask = executor.scheduleAtFixedRate(
+            this::probeContainers, 0, probeInterval.toMillis(), TimeUnit.MILLISECONDS);
+        log.info("Scheduled storage container probe task at every {} ms", probeInterval.toMillis());
+    }
+
+    @Override
+    protected void doStop() {
+        // unwatch the cluster assignment data
+        metadataStore.unwatchClusterAssignmentData(this);
+
+        // cancel the probe task
+        if (!containerProbeTask.cancel(true)) {
+            log.warn("Failed to cancel the container probe task.");
+        }
+
+        stopContainers();
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        // close the registry to shutdown the containers
+        registry.close();
+        // shutdown the scheduler
+        executor.shutdown();
+    }
+
+    @Override
+    public Endpoint getStorageContainer(long scId) {
+        return containerAssignmentMap.get(scId);
+    }
+
+    void probeContainers() {
+        boolean isMyAssignmentRefreshed = refreshMyAssignment();
+        if (!isMyAssignmentRefreshed) {
+            // no change to my assignment, quitting
+            return;
+        }
+
+        if (myAssignmentData == null) {
+            // I don't have any containers assigned to me, so stop containers that I am running.
+            stopContainers();
+        } else {
+            processMyAssignment(myAssignmentData);
+        }
+    }
+
+    private boolean refreshMyAssignment() {
+        ClusterAssignmentData clusterAssignmentData = metadataStore.getClusterAssignmentData();
+
+        if (null == clusterAssignmentData) {
+            log.info("Cluster assignment data is empty, so skip refreshing");
+            return false;
+        }
+
+        Map<Endpoint, ServerAssignmentData> newAssignmentMap = clusterAssignmentData.getServersMap().entrySet()
+            .stream()
+            .collect(Collectors.toMap(
+                e -> NetUtils.parseEndpoint(e.getKey()),
+                e -> e.getValue()));
+
+        Set<Endpoint> oldAssignedServers = clusterAssignmentMap.keySet();
+        Set<Endpoint> newAssignedServers = newAssignmentMap.keySet();
+
+        Set<Endpoint> serversJoined = Sets.difference(newAssignedServers, oldAssignedServers).immutableCopy();
+        Set<Endpoint> serversLeft = Sets.difference(oldAssignedServers, newAssignedServers).immutableCopy();
+        Set<Endpoint> commonServers = Sets.intersection(newAssignedServers, oldAssignedServers).immutableCopy();
+
+        processServersLeft(serversLeft, clusterAssignmentMap);
+        processServersJoined(serversJoined, newAssignmentMap);
+        processServersAssignmentChanged(commonServers, clusterAssignmentMap, newAssignmentMap);
+
+        this.clusterAssignmentMap = newAssignmentMap;
+        myAssignmentData = newAssignmentMap.get(endpoint);
+        return true;
+    }
+
+    private void processServersJoined(Set<Endpoint> serversJoined,
+                                      Map<Endpoint, ServerAssignmentData> newAssignmentMap) {
+        log.info("Servers joined : {}", serversJoined);
+        serversJoined.forEach(ep -> {
+            ServerAssignmentData sad = newAssignmentMap.get(ep);
+            if (null != sad) {
+                sad.getContainersList().forEach(container -> containerAssignmentMap.put(container, ep));
+            }
+        });
+    }
+
+    private void processServersLeft(Set<Endpoint> serversLeft,
+                                    Map<Endpoint, ServerAssignmentData> oldAssignmentMap) {
+        log.info("Servers left : {}", serversLeft);
+        serversLeft.forEach(ep -> {
+            ServerAssignmentData sad = oldAssignmentMap.get(ep);
+            if (null != sad) {
+                sad.getContainersList().forEach(container -> containerAssignmentMap.remove(container, ep));
+            }
+        });
+    }
+
+    private void processServersAssignmentChanged(Set<Endpoint> commonServers,
+                                                 Map<Endpoint, ServerAssignmentData> oldAssignmentMap,
+                                                 Map<Endpoint, ServerAssignmentData> newAssignmentMap) {
+        commonServers.forEach(ep -> {
+
+            ServerAssignmentData oldSad = oldAssignmentMap.getOrDefault(ep, ServerAssignmentData.getDefaultInstance());
+            ServerAssignmentData newSad = newAssignmentMap.getOrDefault(ep, ServerAssignmentData.getDefaultInstance());
+
+            if (oldSad.equals(newSad)) {
+                return;
+            } else {
+                log.info("Server assignment is change for {}:\nold assignment: {}\nnew assignment: {}",
+                    NetUtils.endpointToString(ep), oldSad, newSad);
+                oldSad.getContainersList().forEach(container -> containerAssignmentMap.remove(container, ep));
+                newSad.getContainersList().forEach(container -> containerAssignmentMap.put(container, ep));
+            }
+
+        });
+    }
+
+
+    private void stopContainers() {
+        Set<Long> liveContainerSet = ImmutableSet.copyOf(liveContainers.keySet());
+        liveContainerSet.forEach(this::stopStorageContainer);
+    }
+
+    private void processMyAssignment(ServerAssignmentData myAssignment) {
+        Set<Long> assignedContainerSet = myAssignment.getContainersList().stream().collect(Collectors.toSet());
+        Set<Long> liveContainerSet = Sets.newHashSet(liveContainers.keySet());
+
+        Set<Long> containersToStart =
+            Sets.newHashSet(Sets.difference(assignedContainerSet, liveContainerSet).immutableCopy());
+        Set<Long> containersToStop =
+            Sets.newHashSet(Sets.difference(liveContainerSet, assignedContainerSet).immutableCopy());
+
+        // if the containers are already in the pending start/stop list, we don't touch it until they are completed.
+
+        containersToStart =
+            Sets.filter(containersToStart, container -> !pendingStartStopContainers.contains(container));
+        containersToStop =
+            Sets.filter(containersToStop, container -> !pendingStartStopContainers.contains(container));
+
+        log.info("Process container changes:\n\tIdeal = {}\n\tLive = {}\n\t"
+            + "Pending = {}\n\tToStart = {}\n\tToStop = {}",
+            assignedContainerSet, liveContainerSet, pendingStartStopContainers, containersToStart, containersToStop);
+
+        containersToStart.forEach(this::startStorageContainer);
+        containersToStop.forEach(this::stopStorageContainer);
+    }
+
+    private CompletableFuture<StorageContainer> startStorageContainer(long scId) {
+        log.info("Starting storage container ({})", scId);
+        StorageContainer sc = liveContainers.get(scId);
+        if (null != sc) {
+            log.warn("Storage container ({}) is already started", scId);
+            return FutureUtils.value(sc);
+        } else {
+            // register the container to pending list
+            pendingStartStopContainers.add(scId);
+            return registry
+                .startStorageContainer(scId)
+                .whenComplete((container, cause ) -> {
+                    try {
+                        if (null != cause) {
+                            log.warn("Failed to start storage container ({})", scId, cause);
+                        } else {
+                            log.info("Successfully started storage container ({})", scId);
+                            addStorageContainer(scId, container);
+                        }
+                    } finally {
+                        pendingStartStopContainers.remove(scId);
+                    }
+                });
+        }
+    }
+
+    private CompletableFuture<Void> stopStorageContainer(long scId) {
+        log.info("Stopping storage container ({})", scId);
+        StorageContainer sc = liveContainers.get(scId);
+        if (null == sc) {
+            log.warn("Storage container ({}) is not alive anymore", scId);
+            return FutureUtils.Void();
+        } else {
+            // register the container to pending list
+            pendingStartStopContainers.add(scId);
+            return registry
+                .stopStorageContainer(scId, sc)
+                .whenComplete((container, cause) -> {
+                    try {
+                        if (cause != null) {
+                            log.warn("Failed to stop storage container ({})", scId, cause);
+                        } else {
+                            log.info("Successfully stopped storage container ({})", scId);
+                            removeStorageContainer(scId, sc);
+                        }
+
+                    } finally {
+                        pendingStartStopContainers.remove(scId);
+                    }
+                });
+        }
+    }
+
+    private StorageContainer addStorageContainer(long scId, StorageContainer sc) {
+        StorageContainer oldSc = liveContainers.putIfAbsent(scId, sc);
+        if (null == oldSc) {
+            log.info("Storage container ({}) is added to live set.", sc);
+            return sc;
+        } else {
+            log.warn("Storage container ({}) has already been added to live set", sc);
+            sc.stop();
+            return oldSc;
+        }
+    }
+
+    private void removeStorageContainer(long scId, StorageContainer sc) {
+        if (liveContainers.remove(scId, sc)) {
+            log.info("Storage container ({}) is removed from live set.", scId);
+        } else {
+            log.warn("Storage container ({}) can't be removed from live set.", scId);
+        }
+    }
+
+    //
+    // Callback on cluster assignment data changed.
+    //
+
+    @Override
+    public void accept(Void aVoid) {
+        // any time if the cluster assignment data is changed, schedule a probe task
+        this.executor.submit(() -> probeContainers());
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/HelixStorageContainerManager.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/HelixStorageContainerManager.java
deleted file mode 100644
index 62a0f2580..000000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/HelixStorageContainerManager.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.sc.helix;
-
-import static org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageController.RESOURCE_NAME;
-import static org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageController.getEndpointName;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.spectator.RoutingTableProvider;
-
-/**
- * A storage container manager implemented using Helix.
- */
-@Slf4j
-public class HelixStorageContainerManager
-    extends AbstractLifecycleComponent<StorageConfiguration>
-    implements StorageContainerManager {
-
-    private final String zkServers;
-    private final String clusterName;
-    private final StorageContainerRegistry registry;
-    private final Endpoint endpoint;
-    private final Optional<String> endpointName;
-    private final HelixManager manager;
-    private final RoutingTableProvider rtProvider;
-
-    public HelixStorageContainerManager(String zkServers,
-                                        String clusterName,
-                                        StorageConfiguration conf,
-                                        StorageContainerRegistry registry,
-                                        Endpoint endpoint,
-                                        Optional<String> endpointName,
-                                        StatsLogger statsLogger) {
-        super("helix-storage-container-manager", conf, statsLogger);
-        this.zkServers = zkServers;
-        this.clusterName = clusterName;
-        this.registry = registry;
-        this.endpoint = endpoint;
-        this.endpointName = endpointName;
-        // create the helix manager
-        this.manager = HelixManagerFactory.getZKHelixManager(
-            clusterName,
-            endpointName.orElse(getEndpointName(endpoint)),
-            InstanceType.CONTROLLER_PARTICIPANT,
-            zkServers);
-        this.rtProvider = new RoutingTableProvider();
-    }
-
-    @Override
-    public Endpoint getStorageContainer(long scId) {
-        List<InstanceConfig> instances = this.rtProvider.getInstances(
-            RESOURCE_NAME, RESOURCE_NAME + "_" + scId, "WRITE");
-        if (instances.isEmpty()) {
-            return null;
-        } else {
-            InstanceConfig instance = instances.get(0);
-            return Endpoint.newBuilder()
-                .setHostname(instance.getHostName())
-                .setPort(Integer.parseInt(instance.getPort()))
-                .build();
-        }
-    }
-
-    @Override
-    protected void doStart() {
-        // create the controller
-        try (HelixStorageController controller = new HelixStorageController(zkServers)) {
-            controller.addNode(clusterName, endpoint, endpointName);
-        }
-        StateMachineEngine sme = this.manager.getStateMachineEngine();
-        StateModelFactory<StateModel> smFactory = new WriteReadStateModelFactory(registry);
-        sme.registerStateModelFactory(WriteReadSMD.NAME, smFactory);
-        try {
-            manager.connect();
-            manager.addExternalViewChangeListener(rtProvider);
-        } catch (Exception e) {
-            throw new StorageRuntimeException(e);
-        }
-    }
-
-    @Override
-    protected void doStop() {
-        manager.disconnect();
-    }
-
-    @Override
-    protected void doClose() throws IOException {
-
-    }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/HelixStorageController.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/HelixStorageController.java
deleted file mode 100644
index f76c21df0..000000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/HelixStorageController.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.sc.helix;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.List;
-import java.util.Optional;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-import org.apache.bookkeeper.stream.storage.api.controller.StorageController;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.model.InstanceConfig;
-
-/**
- * A storage controller client based on Helix.
- */
-@Slf4j
-public class HelixStorageController implements StorageController {
-
-    static final String RESOURCE_NAME = "storagecontainers";
-
-    private final ZKHelixAdmin admin;
-
-    public HelixStorageController(String zkServers) {
-        this.admin = new ZKHelixAdmin(zkServers);
-    }
-
-    @VisibleForTesting
-    ZKHelixAdmin getAdmin() {
-        return admin;
-    }
-
-    static long getStorageContainerFromPartitionName(String partitionName) {
-        return Long.parseLong(partitionName.replace(RESOURCE_NAME + "_", ""));
-    }
-
-    @Override
-    public void createCluster(String clusterName,
-                              int numStorageContainers,
-                              int numReplicas) {
-        List<String> clusters = admin.getClusters();
-        if (clusters.contains(clusterName)) {
-            return;
-        }
-        log.info("Creating new cluster : {}", clusterName);
-
-        admin.addCluster(clusterName);
-        admin.addStateModelDef(clusterName, "WriteRead", WriteReadSMD.build());
-        admin.addResource(
-            clusterName,
-            RESOURCE_NAME,
-            numStorageContainers,
-            WriteReadSMD.NAME,
-            "FULL_AUTO");
-        admin.rebalance(clusterName, RESOURCE_NAME, 3);
-
-        log.info("Created new cluster : {}", clusterName);
-    }
-
-    static String getEndpointName(Endpoint endpoint) {
-        return endpoint.getHostname() + "_" + endpoint.getPort();
-    }
-
-    @Override
-    public void addNode(String clusterName,
-                        Endpoint endpoint,
-                        Optional<String> endpointNameOptional) {
-        String endpointName = endpointNameOptional.orElse(getEndpointName(endpoint));
-        if (admin.getInstancesInCluster(clusterName)
-            .contains(endpointName)) {
-            log.info("Instance {} already exists in cluster {}, skip creating the instance",
-                endpointName, clusterName);
-            return;
-        }
-        log.info("Adding a new instance {} ({}) to the cluster {}.",
-            new Object[]{endpointName, endpoint, clusterName});
-        InstanceConfig config = new InstanceConfig(endpointName);
-        config.setHostName(endpoint.getHostname());
-        config.setPort(Integer.toString(endpoint.getPort()));
-        admin.addInstance(clusterName, config);
-    }
-
-    @Override
-    public void close() {
-        admin.close();
-    }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/WriteReadSMD.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/WriteReadSMD.java
deleted file mode 100644
index 6104919ca..000000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/WriteReadSMD.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.sc.helix;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.model.StateModelDefinition;
-
-/**
- * The state model definition for write-read transition.
- */
-public final class WriteReadSMD extends StateModelDefinition {
-
-    public static final String NAME = "WriteRead";
-
-    /**
-     * The state for write and read.
-     */
-    public enum States {
-        WRITE,
-        READ,
-        OFFLINE
-    }
-
-    public WriteReadSMD() {
-        super(generateConfigForWriteRead());
-    }
-
-    /**
-     * Build Write-Read state model definition.
-     *
-     * @return State model definition.
-     */
-    public static StateModelDefinition build() {
-        StateModelDefinition.Builder builder = new StateModelDefinition.Builder(NAME);
-        // init state
-        builder.initialState(States.OFFLINE.name());
-
-        // add states
-        builder.addState(States.WRITE.name(), 0);
-        builder.addState(States.READ.name(), 1);
-        builder.addState(States.OFFLINE.name(), 2);
-        for (HelixDefinedState state : HelixDefinedState.values()) {
-            builder.addState(state.name());
-        }
-
-        // add transitions
-        builder.addTransition(States.WRITE.name(), States.READ.name(), 0);
-        builder.addTransition(States.READ.name(), States.WRITE.name(), 1);
-        builder.addTransition(States.OFFLINE.name(), States.READ.name(), 2);
-        builder.addTransition(States.READ.name(), States.OFFLINE.name(), 3);
-        builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());
-
-        // bounds
-        builder.upperBound(States.WRITE.name(), 1);
-        builder.dynamicUpperBound(States.READ.name(), "R");
-
-        return builder.build();
-    }
-
-    /**
-     * Generate Write-slave state model definition.
-     *
-     * @return ZNRecord.
-     */
-    public static ZNRecord generateConfigForWriteRead() {
-        ZNRecord record = new ZNRecord("WriteRead");
-        record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), "OFFLINE");
-        List<String> statePriorityList = new ArrayList<String>();
-        statePriorityList.add("WRITE");
-        statePriorityList.add("READ");
-        statePriorityList.add("OFFLINE");
-        statePriorityList.add("DROPPED");
-        statePriorityList.add("ERROR");
-        record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
-            statePriorityList);
-        for (String state : statePriorityList) {
-            String key = state + ".meta";
-            Map<String, String> metadata = new HashMap<String, String>();
-            if (state.equals("WRITE")) {
-                metadata.put("count", "1");
-                record.setMapField(key, metadata);
-            } else if (state.equals("READ")) {
-                metadata.put("count", "R");
-                record.setMapField(key, metadata);
-            } else if (state.equals("OFFLINE")) {
-                metadata.put("count", "-1");
-                record.setMapField(key, metadata);
-            } else if (state.equals("DROPPED")) {
-                metadata.put("count", "-1");
-                record.setMapField(key, metadata);
-            } else if (state.equals("ERROR")) {
-                metadata.put("count", "-1");
-                record.setMapField(key, metadata);
-            }
-        }
-        for (String state : statePriorityList) {
-            String key = state + ".next";
-            if (state.equals("WRITE")) {
-                Map<String, String> metadata = new HashMap<String, String>();
-                metadata.put("READ", "READ");
-                metadata.put("OFFLINE", "READ");
-                metadata.put("DROPPED", "READ");
-                record.setMapField(key, metadata);
-            } else if (state.equals("READ")) {
-                Map<String, String> metadata = new HashMap<String, String>();
-                metadata.put("WRITE", "WRITE");
-                metadata.put("OFFLINE", "OFFLINE");
-                metadata.put("DROPPED", "OFFLINE");
-                record.setMapField(key, metadata);
-            } else if (state.equals("OFFLINE")) {
-                Map<String, String> metadata = new HashMap<String, String>();
-                metadata.put("READ", "READ");
-                metadata.put("WRITE", "READ");
-                metadata.put("DROPPED", "DROPPED");
-                record.setMapField(key, metadata);
-            } else if (state.equals("ERROR")) {
-                Map<String, String> metadata = new HashMap<String, String>();
-                metadata.put("OFFLINE", "OFFLINE");
-                record.setMapField(key, metadata);
-            }
-        }
-        List<String> stateTransitionPriorityList = new ArrayList<String>();
-        stateTransitionPriorityList.add("WRITE-READ");
-        stateTransitionPriorityList.add("READ-WRITE");
-        stateTransitionPriorityList.add("OFFLINE-READ");
-        stateTransitionPriorityList.add("READ-OFFLINE");
-        stateTransitionPriorityList.add("OFFLINE-DROPPED");
-        record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
-            stateTransitionPriorityList);
-        return record;
-    }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/WriteReadStateModelFactory.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/WriteReadStateModelFactory.java
deleted file mode 100644
index eb6e86b09..000000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/WriteReadStateModelFactory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.sc.helix;
-
-import static org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageController.getStorageContainerFromPartitionName;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-
-/**
- * Storage Container State Model Factory.
- *
- * <p>It is used by helix for managing the state transition.
- */
-@Slf4j
-public class WriteReadStateModelFactory extends StateModelFactory<StateModel> {
-
-    private final StorageContainerRegistry registry;
-
-    public WriteReadStateModelFactory(StorageContainerRegistry registry) {
-        this.registry = registry;
-    }
-
-    @Override
-    public StateModel createNewStateModel(String resourceName, String partitionName) {
-        return new WriteReadStateModel(registry);
-    }
-
-    /**
-     * The state model for storage container.
-     */
-    @StateModelInfo(states = "{'OFFLINE','READ','WRITE'}", initialState = "OFFLINE")
-    public static class WriteReadStateModel extends StateModel {
-
-        private final StorageContainerRegistry registry;
-
-        WriteReadStateModel(StorageContainerRegistry registry) {
-            this.registry = registry;
-        }
-
-        @Transition(from = "OFFLINE", to = "READ")
-        public void offlineToRead(Message msg,
-                                  NotificationContext ctx) {
-            log.info("----- [OFFLINE --> READ] {} / {}", msg.getResourceName(), msg.getPartitionName());
-            // do nothing now
-        }
-
-        @Transition(from = "READ", to = "WRITE")
-        public void readToWrite(Message msg,
-                                NotificationContext ctx) throws Exception {
-            log.info("----- [READ --> WRITE] {} / {}", msg.getResourceName(), msg.getPartitionName());
-            try {
-                FutureUtils.result(registry.startStorageContainer(
-                    getStorageContainerFromPartitionName(msg.getPartitionName())));
-            } catch (Exception e) {
-                log.error("----- [READ --> WRITE] {} / {} failed",
-                    new Object[]{msg.getResourceName(), msg.getPartitionName(), e});
-                throw e;
-            }
-        }
-
-        @Transition(from = "WRITE", to = "READ")
-        public void writeToRead(Message msg,
-                                NotificationContext ctx) throws Exception {
-            log.info("----- [WRITE --> READ] {} / {}", msg.getResourceName(), msg.getPartitionName());
-            try {
-                FutureUtils.result(registry.stopStorageContainer(
-                    getStorageContainerFromPartitionName(msg.getPartitionName())));
-            } catch (Exception e) {
-                log.error("----- [WRITE --> READ] {} / {} failed",
-                    new Object[]{msg.getResourceName(), msg.getPartitionName(), e});
-                throw e;
-            }
-        }
-
-        @Transition(from = "READ", to = "OFFLINE")
-        public void readToOffline(Message msg,
-                                  NotificationContext ctx) {
-            log.info("----- [READ --> OFFLINE] {} / {}", msg.getResourceName(), msg.getPartitionName());
-            // do nothing now
-        }
-
-    }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/package-info.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/package-info.java
deleted file mode 100644
index 018df173b..000000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Helix based storage container management.
- */
-package org.apache.bookkeeper.stream.storage.impl.sc.helix;
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerImplTest.java
new file mode 100644
index 000000000..63967a683
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerImplTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterController;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeaderSelector;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerController;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ClusterControllerImpl}.
+ */
+public class ClusterControllerImplTest {
+
+    private ClusterControllerLeaderSelector leaderSelector;
+
+    private ClusterController service;
+
+    @Before
+    public void setup() {
+        this.leaderSelector = mock(ClusterControllerLeaderSelector.class);
+        this.service = new ClusterControllerImpl(
+            mock(ClusterMetadataStore.class),
+            mock(RegistrationClient.class),
+            mock(StorageContainerController.class),
+            leaderSelector,
+            new StorageConfiguration(new CompositeConfiguration()));
+    }
+
+    @Test
+    public void testInitialize() {
+        verify(leaderSelector, times(1))
+            .initialize(any(ClusterControllerLeader.class));
+    }
+
+    @Test
+    public void testStart() {
+        service.start();
+        verify(leaderSelector, times(1)).start();
+    }
+
+    @Test
+    public void testStop() {
+        service.start();
+        service.stop();
+        verify(leaderSelector, times(1)).close();
+    }
+
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
new file mode 100644
index 000000000..4d2828bb9
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerController;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.collections.Sets;
+
+/**
+ * Unit test {@link ClusterControllerLeaderImpl}.
+ */
+@Slf4j
+public class ClusterControllerLeaderImplTest {
+
+    private static final int NUM_STORAGE_CONTAINERS = 32;
+
+    private ClusterMetadataStore metadataStore;
+    private ClusterControllerLeaderImpl clusterController;
+    private ExecutorService leaderExecutor;
+
+    private final Semaphore coordSem = new Semaphore(0);
+    private final AtomicReference<RegistrationListener> regListenerRef
+        = new AtomicReference<>(null);
+    private final CompletableFuture<Void> watchFuture = new CompletableFuture<>();
+
+    @Before
+    public void setup() {
+        this.metadataStore = spy(new InMemClusterMetadataStore(NUM_STORAGE_CONTAINERS));
+        this.metadataStore.initializeCluster(NUM_STORAGE_CONTAINERS);
+        // wrap the metadata store with the coord sem with coordinating testing
+        ClusterMetadataStore originalStore = metadataStore;
+        this.metadataStore = new ClusterMetadataStore() {
+            @Override
+            public void initializeCluster(int numStorageContainers) {
+                originalStore.initializeCluster(numStorageContainers);
+            }
+
+            @Override
+            public ClusterAssignmentData getClusterAssignmentData() {
+                return originalStore.getClusterAssignmentData();
+            }
+
+            @Override
+            public void updateClusterAssignmentData(ClusterAssignmentData assignmentData) {
+                originalStore.updateClusterAssignmentData(assignmentData);
+
+                // notify when cluster assignment data is updated.
+                coordSem.release();
+            }
+
+            @Override
+            public void watchClusterAssignmentData(Consumer<Void> watcher, Executor executor) {
+                originalStore.watchClusterAssignmentData(watcher, executor);
+            }
+
+            @Override
+            public void unwatchClusterAssignmentData(Consumer<Void> watcher) {
+                originalStore.unwatchClusterAssignmentData(watcher);
+            }
+
+            @Override
+            public ClusterMetadata getClusterMetadata() {
+                return originalStore.getClusterMetadata();
+            }
+
+            @Override
+            public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
+                originalStore.updateClusterMetadata(clusterMetadata);
+            }
+
+            @Override
+            public void close() {
+                originalStore.close();
+            }
+        };
+
+        StorageContainerController scController = spy(new DefaultStorageContainerController());
+        RegistrationClient mockRegClient = mock(RegistrationClient.class);
+        when(mockRegClient.watchWritableBookies(any(RegistrationListener.class)))
+            .thenAnswer(invocationOnMock -> {
+                RegistrationListener listener = invocationOnMock.getArgument(0);
+                regListenerRef.set(listener);
+                return watchFuture;
+            });
+        doAnswer(invocationOnMock -> {
+            RegistrationListener listener = invocationOnMock.getArgument(0);
+            regListenerRef.compareAndSet(listener, null);
+            return null;
+        }).when(mockRegClient).unwatchWritableBookies(any(RegistrationListener.class));
+
+        this.clusterController = new ClusterControllerLeaderImpl(
+            metadataStore,
+            scController,
+            mockRegClient,
+            Duration.ofMillis(10));
+        this.leaderExecutor = Executors.newSingleThreadExecutor();
+    }
+
+    @After
+    public void teardown() {
+        if (null != metadataStore) {
+            metadataStore.close();
+        }
+        if (null != leaderExecutor) {
+            leaderExecutor.shutdown();
+        }
+    }
+
+    @Test
+    public void testProcessAsLeader() throws Exception {
+        clusterController.suspend();
+        assertTrue(clusterController.isSuspended());
+
+        // start the leader controller
+        leaderExecutor.submit(() -> {
+            try {
+                clusterController.processAsLeader();
+            } catch (Exception e) {
+                log.info("Encountered exception when cluster controller processes as a leader", e);
+            }
+        });
+
+        // resume the controller
+        clusterController.resume();
+        assertFalse(clusterController.isSuspended());
+
+        // simulate `watchWritableBookies` is done, the listener should be registered
+        FutureUtils.complete(watchFuture, null);
+        assertNotNull(regListenerRef);
+
+        // once the controller is resumed, it should start processing server change
+        // but since there is no servers available, the storage controller will not compute any ideal state
+        // for the assignment and `lastSuccessfulAssignmentAt` will remain negative.
+        assertFalse(coordSem.tryAcquire(1, TimeUnit.SECONDS));
+        assertTrue(clusterController.getLastSuccessfulAssigmentAt() < 0);
+
+        // notify the registration client that a new host is added
+        Set<BookieSocketAddress> cluster = Sets.newSet(
+            new BookieSocketAddress("127.0.0.1", 4181));
+        Version version = new LongVersion(0L);
+
+        regListenerRef.get().onBookiesChanged(new Versioned<>(cluster, version));
+        // the cluster controller will be notified with cluster change and storage controller will compute
+        // the assignment state. cluster metadata store should be used for updating cluster assignment data.
+        coordSem.acquire();
+        assertTrue(clusterController.getLastSuccessfulAssigmentAt() > 0);
+        long lastSuccessfulAssignmentAt = clusterController.getLastSuccessfulAssigmentAt();
+
+        // notify the cluster controller with same cluster, cluster controller should not attempt to update
+        // the assignment
+        regListenerRef.get().onBookiesChanged(new Versioned<>(cluster, version));
+        assertFalse(coordSem.tryAcquire(200, TimeUnit.MILLISECONDS));
+        assertEquals(lastSuccessfulAssignmentAt, clusterController.getLastSuccessfulAssigmentAt());
+
+        // multiple hosts added and removed
+        cluster.add(new BookieSocketAddress("127.0.0.1", 4182));
+        cluster.add(new BookieSocketAddress("127.0.0.1", 4183));
+        cluster.add(new BookieSocketAddress("127.0.0.1", 4184));
+        cluster.add(new BookieSocketAddress("127.0.0.1", 4185));
+        version = new LongVersion(1L);
+
+        regListenerRef.get().onBookiesChanged(new Versioned<>(cluster, version));
+        // the cluster controller should update assignment data if cluster is changed
+        coordSem.acquire();
+        assertTrue(clusterController.getLastSuccessfulAssigmentAt() > lastSuccessfulAssignmentAt);
+        lastSuccessfulAssignmentAt = clusterController.getLastSuccessfulAssigmentAt();
+
+        // if cluster information is changed to empty, cluster controller should not be eager to change
+        // the assignment.
+        regListenerRef.get().onBookiesChanged(new Versioned<>(Collections.emptySet(), new LongVersion(2L)));
+        assertFalse(coordSem.tryAcquire(1, TimeUnit.SECONDS));
+        assertEquals(lastSuccessfulAssignmentAt, clusterController.getLastSuccessfulAssigmentAt());
+    }
+
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStoreTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStoreTest.java
new file mode 100644
index 000000000..3da879140
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStoreTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import lombok.Cleanup;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link InMemClusterMetadataStore}.
+ */
+public class InMemClusterMetadataStoreTest {
+
+    private static final int NUM_STORAGE_CONTAINERS = 1024;
+
+    private InMemClusterMetadataStore store;
+
+    @Before
+    public void setup() {
+        store = new InMemClusterMetadataStore(NUM_STORAGE_CONTAINERS);
+    }
+
+    @After
+    public void teardown() {
+        if (null != store) {
+            store.close();
+        }
+    }
+
+    @Test
+    public void testUnitialized() {
+        assertEquals(
+            ClusterMetadata.newBuilder().setNumStorageContainers(NUM_STORAGE_CONTAINERS).build(),
+            store.getClusterMetadata());
+        assertEquals(
+            ClusterAssignmentData.newBuilder().build(),
+            store.getClusterAssignmentData());
+    }
+
+    @Test
+    public void testInitialize() {
+        int numStorageContainers = 2048;
+        store.initializeCluster(numStorageContainers);
+        assertEquals(
+            ClusterMetadata.newBuilder().setNumStorageContainers(numStorageContainers).build(),
+            store.getClusterMetadata());
+        assertEquals(
+            ClusterAssignmentData.newBuilder().build(),
+            store.getClusterAssignmentData());
+    }
+
+    @Test
+    public void testUpdateClusterMetadata() {
+       int numStorageContainers = 4096;
+       ClusterMetadata metadata = ClusterMetadata.newBuilder()
+           .setNumStorageContainers(numStorageContainers)
+           .build();
+       store.updateClusterMetadata(metadata);
+       assertEquals(metadata, store.getClusterMetadata());
+    }
+
+    @Test
+    public void testUpdateClusterAssignmentData() {
+        ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder()
+            .putServers(
+                "server-0",
+                ServerAssignmentData.newBuilder()
+                    .addContainers(1L)
+                    .addContainers(2L)
+                    .build())
+            .build();
+        store.updateClusterAssignmentData(assignmentData);
+        assertEquals(assignmentData, store.getClusterAssignmentData());
+    }
+
+    @Test
+    public void testWatchClusterAssignmentData() {
+        ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder()
+            .putServers(
+                "server-0",
+                ServerAssignmentData.newBuilder()
+                    .addContainers(1L)
+                    .addContainers(2L)
+                    .build())
+            .build();
+
+        @Cleanup("shutdown")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CompletableFuture<Void> watchFuture = new CompletableFuture<>();
+
+        store.watchClusterAssignmentData(data -> {
+            FutureUtils.complete(watchFuture, null);
+        }, executor);
+
+        store.updateClusterAssignmentData(assignmentData);
+
+        watchFuture.join();
+        assertEquals(assignmentData, store.getClusterAssignmentData());
+    }
+
+    @Test
+    public void testUnwatchClusterAssignmentData() throws Exception {
+        ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder()
+            .putServers(
+                "server-0",
+                ServerAssignmentData.newBuilder()
+                    .addContainers(1L)
+                    .addContainers(2L)
+                    .build())
+            .build();
+
+        @Cleanup("shutdown")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        CompletableFuture<Void> watchFuture = new CompletableFuture<>();
+        CountDownLatch latch = new CountDownLatch(2);
+
+        Consumer<Void> dataConsumer = ignored -> {
+            latch.countDown();
+            FutureUtils.complete(watchFuture, null);
+        };
+
+        assertEquals(0, store.getNumWatchers());
+        store.watchClusterAssignmentData(dataConsumer, executor);
+        assertEquals(1, store.getNumWatchers());
+        store.updateClusterAssignmentData(assignmentData);
+
+        watchFuture.join();
+        assertEquals(1, latch.getCount());
+        assertEquals(assignmentData, store.getClusterAssignmentData());
+
+        store.unwatchClusterAssignmentData(dataConsumer);
+        assertEquals(0, store.getNumWatchers());
+        store.updateClusterAssignmentData(assignmentData);
+
+        watchFuture.get(100, TimeUnit.MILLISECONDS);
+        assertEquals(1, latch.getCount());
+    }
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorListenerTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorListenerTest.java
new file mode 100644
index 000000000..bebed2b88
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorListenerTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ZkClusterControllerLeaderSelectorListener}.
+ */
+public class ZkClusterControllerLeaderSelectorListenerTest {
+
+    @Test
+    public void testTakeLeadership() throws Exception {
+        ClusterControllerLeader leaderLogic = mock(ClusterControllerLeader.class);
+
+        ZkClusterControllerLeaderSelectorListener listener = new ZkClusterControllerLeaderSelectorListener(leaderLogic);
+        CuratorFramework curator = mock(CuratorFramework.class);
+        listener.takeLeadership(curator);
+
+        // if the listener is taking the leadership, it should execute the leader logic
+        verify(leaderLogic, times(1)).processAsLeader();
+    }
+
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorTest.java
new file mode 100644
index 000000000..c072d004e
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.distributedlog.ZooKeeperClusterTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Unit test of {@link ZkClusterControllerLeaderSelector}.
+ */
+@Slf4j
+public class ZkClusterControllerLeaderSelectorTest extends ZooKeeperClusterTestCase {
+
+    @Rule
+    public final TestName runtime = new TestName();
+
+    private CuratorFramework curatorClient;
+    private String zkRootPath;
+    private ZkClusterControllerLeaderSelector selector;
+
+    @Before
+    public void setup() throws Exception {
+        curatorClient = CuratorFrameworkFactory.newClient(
+            zkServers,
+            new ExponentialBackoffRetry(200, 10, 5000));
+        curatorClient.start();
+
+        zkRootPath = "/" + runtime.getMethodName();
+        curatorClient.create().forPath(zkRootPath);
+
+        selector = new ZkClusterControllerLeaderSelector(curatorClient, zkRootPath);
+    }
+
+    @After
+    public void teardown() {
+        if (null != selector) {
+            selector.close();
+        }
+        curatorClient.close();
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testStartBeforeInitialize() {
+        selector.start();
+    }
+
+    @Test
+    public void testLeaderElection() throws InterruptedException {
+        CountDownLatch leaderLatch = new CountDownLatch(1);
+        selector.initialize(new ClusterControllerLeader() {
+            @Override
+            public void processAsLeader() throws Exception {
+                log.info("Become leader");
+                leaderLatch.countDown();
+                try {
+                    TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
+                } catch (InterruptedException ie) {
+                    log.info("Leadership is interrupted");
+                    Thread.currentThread().interrupt();
+                }
+                log.info("Ended leadership");
+            }
+
+            @Override
+            public void suspend() {
+
+            }
+
+            @Override
+            public void resume() {
+
+            }
+        });
+
+        selector.start();
+        leaderLatch.await();
+        assertTrue("Should successfully become leader", true);
+
+        log.info("Ended test");
+    }
+
+    @Test
+    public void testStateChangedToLost() throws InterruptedException {
+        CountDownLatch leaderLatch = new CountDownLatch(1);
+        CountDownLatch interruptedLatch = new CountDownLatch(1);
+        selector.initialize(new ClusterControllerLeader() {
+            @Override
+            public void processAsLeader() throws Exception {
+                log.info("Become leader");
+                leaderLatch.countDown();
+                try {
+                    TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
+                } catch (InterruptedException ie) {
+                    log.info("Leader is interrupted", ie);
+                    Thread.currentThread().interrupt();
+                    interruptedLatch.countDown();
+                }
+            }
+
+            @Override
+            public void suspend() {
+            }
+
+            @Override
+            public void resume() {
+            }
+        });
+
+        selector.start();
+
+        leaderLatch.await();
+
+        assertTrue("Should successfully become leader", true);
+
+        selector.stateChanged(curatorClient, ConnectionState.LOST);
+
+        interruptedLatch.await();
+
+        assertTrue("Leader should be interrupted", true);
+    }
+
+    @Test
+    public void testStateChangedToSuspendedResumed() throws InterruptedException {
+        CountDownLatch leaderLatch = new CountDownLatch(1);
+        CountDownLatch suspendedLatch = new CountDownLatch(1);
+        CountDownLatch resumeLatch = new CountDownLatch(1);
+        selector.initialize(new ClusterControllerLeader() {
+            @Override
+            public void processAsLeader() throws Exception {
+                log.info("Become leader");
+                leaderLatch.countDown();
+                try {
+                    TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
+                } catch (InterruptedException ie) {
+                    log.info("Leader is interrupted", ie);
+                    Thread.currentThread().interrupt();
+                }
+            }
+
+            @Override
+            public void suspend() {
+                suspendedLatch.countDown();
+            }
+
+            @Override
+            public void resume() {
+                resumeLatch.countDown();
+            }
+        });
+
+        selector.start();
+        leaderLatch.await();
+        assertTrue("Should successfully become leader", true);
+
+        selector.stateChanged(curatorClient, ConnectionState.SUSPENDED);
+        suspendedLatch.await();
+
+        assertTrue("Leader should be suspended", true);
+
+        selector.stateChanged(curatorClient, ConnectionState.RECONNECTED);
+        resumeLatch.await();
+
+        assertTrue("Leader should be resumed", true);
+    }
+
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStoreTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStoreTest.java
new file mode 100644
index 000000000..1c224db3e
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStoreTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
+import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.distributedlog.ZooKeeperClusterTestCase;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Unit test {@link ZkClusterMetadataStore}.
+ */
+@Slf4j
+public class ZkClusterMetadataStoreTest extends ZooKeeperClusterTestCase {
+
+    private static final int NUM_STORAGE_CONTAINERS = 1024;
+
+    @Rule
+    public final TestName runtime = new TestName();
+
+    private CuratorFramework curatorClient;
+    private ZkClusterMetadataStore store;
+
+    @Before
+    public void setup() {
+        curatorClient = CuratorFrameworkFactory.newClient(
+            zkServers,
+            new ExponentialBackoffRetry(200, 10, 5000));
+        curatorClient.start();
+        store = new ZkClusterMetadataStore(curatorClient, zkServers,"/" + runtime.getMethodName());
+        store.initializeCluster(NUM_STORAGE_CONTAINERS);
+    }
+
+    @After
+    public void teardown() {
+        if (null != store) {
+            store.close();
+        }
+        if (null != curatorClient) {
+            curatorClient.close();
+        }
+    }
+
+    @Test
+    public void testUnitialized() {
+        ZkClusterMetadataStore newStore = new ZkClusterMetadataStore(
+            curatorClient, zkServers, "/" + runtime.getMethodName() + "-new");
+
+        try {
+            newStore.getClusterMetadata();
+            fail("Should fail to get cluster metadata if not initialized");
+        } catch (StorageRuntimeException sre) {
+            assertTrue(sre.getCause() instanceof KeeperException);
+            KeeperException cause = (KeeperException) sre.getCause();
+            assertEquals(Code.NONODE, cause.code());
+        }
+
+        try {
+            newStore.getClusterAssignmentData();
+            fail("Should fail to get cluster assignment data if not initialized");
+        } catch (StorageRuntimeException sre) {
+            assertTrue(sre.getCause() instanceof KeeperException);
+            KeeperException cause = (KeeperException) sre.getCause();
+            assertEquals(Code.NONODE, cause.code());
+        }
+    }
+
+    @Test
+    public void testInitialize() {
+        int numStorageContainers = 2048;
+        try {
+            store.initializeCluster(numStorageContainers);
+            fail("Should fail to initialize cluster if already initialized");
+        } catch (StorageRuntimeException sre) {
+            assertTrue(sre.getCause() instanceof KeeperException);
+            KeeperException cause = (KeeperException) sre.getCause();
+            assertEquals(Code.NODEEXISTS, cause.code());
+        }
+    }
+
+    @Test
+    public void testUpdateClusterMetadata() {
+       int numStorageContainers = 4096;
+       ClusterMetadata metadata = ClusterMetadata.newBuilder()
+           .setNumStorageContainers(numStorageContainers)
+           .build();
+       store.updateClusterMetadata(metadata);
+       assertEquals(metadata, store.getClusterMetadata());
+    }
+
+    @Test
+    public void testUpdateClusterAssignmentData() {
+        ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder()
+            .putServers(
+                "server-0",
+                ServerAssignmentData.newBuilder()
+                    .addContainers(1L)
+                    .addContainers(2L)
+                    .build())
+            .build();
+        store.updateClusterAssignmentData(assignmentData);
+        assertEquals(assignmentData, store.getClusterAssignmentData());
+    }
+
+    @Test
+    public void testWatchClusterAssignmentData() {
+        ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder()
+            .putServers(
+                "server-0",
+                ServerAssignmentData.newBuilder()
+                    .addContainers(1L)
+                    .addContainers(2L)
+                    .build())
+            .build();
+
+        @Cleanup("shutdown")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CompletableFuture<Void> watchFuture = new CompletableFuture<>();
+
+        store.watchClusterAssignmentData(data -> {
+            FutureUtils.complete(watchFuture, null);
+        }, executor);
+
+        store.updateClusterAssignmentData(assignmentData);
+
+        watchFuture.join();
+        assertEquals(assignmentData, store.getClusterAssignmentData());
+    }
+
+    @Test
+    public void testUnwatchClusterAssignmentData() throws Exception {
+        ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder()
+            .putServers(
+                "server-0",
+                ServerAssignmentData.newBuilder()
+                    .addContainers(1L)
+                    .addContainers(2L)
+                    .build())
+            .build();
+
+        @Cleanup("shutdown")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        CompletableFuture<Void> watchFuture = new CompletableFuture<>();
+        CountDownLatch latch = new CountDownLatch(2);
+
+        Consumer<Void> dataConsumer = ignored -> {
+            log.info("Notify cluster assignment data changed");
+            latch.countDown();
+            FutureUtils.complete(watchFuture, null);
+        };
+
+        assertEquals(0, store.getNumWatchers());
+        store.watchClusterAssignmentData(dataConsumer, executor);
+        assertEquals(1, store.getNumWatchers());
+        store.updateClusterAssignmentData(assignmentData);
+
+        watchFuture.join();
+        assertEquals(1, latch.getCount());
+        assertEquals(assignmentData, store.getClusterAssignmentData());
+
+        store.unwatchClusterAssignmentData(dataConsumer);
+        assertEquals(0, store.getNumWatchers());
+    }
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerControllerTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerControllerTest.java
new file mode 100644
index 000000000..e152998c3
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerControllerTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
+import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController.ServerAssignmentDataComparator;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+
+/**
+ * Unit test {@link DefaultStorageContainerController}.
+ */
+@Slf4j
+public class DefaultStorageContainerControllerTest {
+
+    private static final int NUM_STORAGE_CONTAINERS = 32;
+
+    private final ClusterMetadata clusterMetadata;
+    private final StorageContainerController controller;
+    private final ClusterAssignmentData currentAssignment;
+
+    public DefaultStorageContainerControllerTest() {
+        this.controller = new DefaultStorageContainerController();
+        this.clusterMetadata = ClusterMetadata.newBuilder()
+            .setNumStorageContainers(NUM_STORAGE_CONTAINERS)
+            .build();
+        this.currentAssignment = ClusterAssignmentData.newBuilder()
+            .putServers("default-server", ServerAssignmentData.newBuilder()
+                .addContainers(0L)
+                .addContainers(1L)
+                .addContainers(3L)
+                .build())
+            .build();
+    }
+
+    @Test
+    public void testServerAssignmentDataComparator() {
+        ServerAssignmentDataComparator comparator = new ServerAssignmentDataComparator();
+
+        LinkedList<Long> serverList1 = new LinkedList<>();
+        serverList1.add(1L);
+        LinkedList<Long> serverList2 = new LinkedList<>();
+        serverList2.add(2L);
+        serverList2.add(3L);
+
+        BookieSocketAddress address1 = new BookieSocketAddress("127.0.0.1", 4181);
+        BookieSocketAddress address2 = new BookieSocketAddress("127.0.0.1", 4182);
+
+        Pair<BookieSocketAddress, LinkedList<Long>> pair1 = Pair.of(address1, serverList1);
+        Pair<BookieSocketAddress, LinkedList<Long>> pair2 = Pair.of(address1, serverList2);
+        Pair<BookieSocketAddress, LinkedList<Long>> pair3 = Pair.of(address2, serverList2);
+
+        assertEquals(-1, comparator.compare(pair1, pair2));
+        assertEquals(-1, comparator.compare(pair1, pair2));
+        assertEquals(
+            Integer.compare(address1.hashCode(), address2.hashCode()),
+            comparator.compare(pair2, pair3));
+    }
+
+    @Test
+    public void testComputeIdealStateEmptyCluster() {
+        assertSame(
+            currentAssignment,
+            controller.computeIdealState(
+                clusterMetadata,
+                currentAssignment,
+                Collections.emptySet()));
+    }
+
+    private static Set<BookieSocketAddress> newCluster(int numServers) {
+        Set<BookieSocketAddress> cluster = IntStream.range(0, numServers)
+            .mapToObj(idx -> new BookieSocketAddress("127.0.0.1", 4181 + idx))
+            .collect(Collectors.toSet());
+        return ImmutableSet.copyOf(cluster);
+    }
+
+    private static Set<BookieSocketAddress> newCluster(int numServers, int startServerIdx) {
+        Set<BookieSocketAddress> cluster = IntStream.range(0, numServers)
+            .mapToObj(idx -> new BookieSocketAddress("127.0.0.1", 4181 + startServerIdx + idx))
+            .collect(Collectors.toSet());
+        return ImmutableSet.copyOf(cluster);
+    }
+
+    private static void verifyAssignmentData(ClusterAssignmentData newAssignment,
+                                             Set<BookieSocketAddress> currentCluster,
+                                             boolean isInitialIdealState)
+            throws Exception {
+        int numServers = currentCluster.size();
+
+        assertEquals(numServers, newAssignment.getServersCount());
+        Set<Long> assignedContainers = Sets.newHashSet();
+        Set<BookieSocketAddress> assignedServers = Sets.newHashSet();
+
+        int numContainersPerServer = NUM_STORAGE_CONTAINERS / numServers;
+        int serverIdx = 0;
+        for (Map.Entry<String, ServerAssignmentData> entry : newAssignment.getServersMap().entrySet()) {
+            log.info("Check assignment for server {} = {}", entry.getKey(), entry.getValue());
+
+            BookieSocketAddress address = new BookieSocketAddress(entry.getKey());
+            assignedServers.add(address);
+            assertEquals(serverIdx + 1, assignedServers.size());
+
+            ServerAssignmentData serverData = entry.getValue();
+            assertEquals(numContainersPerServer, serverData.getContainersCount());
+            List<Long> containers = Lists.newArrayList(serverData.getContainersList());
+            Collections.sort(containers);
+            assignedContainers.addAll(containers);
+
+            if (isInitialIdealState) {
+                long startContainerId = containers.get(0);
+                for (int i = 0; i < containers.size(); i++) {
+                    assertEquals(startContainerId + i * numServers, containers.get(i).longValue());
+                }
+            }
+            ++serverIdx;
+        }
+
+        // each server should be assigned with equal number of containers
+        assertTrue(Sets.difference(currentCluster, assignedServers).isEmpty());
+        // all containers should be assigned
+        Set<Long> expectedContainers = LongStream.range(0L, NUM_STORAGE_CONTAINERS)
+            .mapToObj(scId -> Long.valueOf(scId))
+            .collect(Collectors.toSet());
+        assertTrue(Sets.difference(expectedContainers, assignedContainers).isEmpty());
+    }
+
+    private static void verifyAssignmentDataWhenHasMoreServers(ClusterAssignmentData newAssignment,
+                                                               Set<BookieSocketAddress> currentCluster)
+            throws Exception {
+        int numServers = currentCluster.size();
+
+        assertEquals(numServers, newAssignment.getServersCount());
+        Set<Long> assignedContainers = Sets.newHashSet();
+        Set<BookieSocketAddress> assignedServers = Sets.newHashSet();
+
+        int numEmptyServers = 0;
+        int numAssignedServers = 0;
+        int serverIdx = 0;
+        for (Map.Entry<String, ServerAssignmentData> entry : newAssignment.getServersMap().entrySet()) {
+            log.info("Check assignment for server {} = {}", entry.getKey(), entry.getValue());
+
+            BookieSocketAddress address = new BookieSocketAddress(entry.getKey());
+            assignedServers.add(address);
+            assertEquals(serverIdx + 1, assignedServers.size());
+
+            ServerAssignmentData serverData = entry.getValue();
+            if (serverData.getContainersCount() > 0) {
+                assertEquals(1, serverData.getContainersCount());
+                ++numAssignedServers;
+            } else {
+                ++numEmptyServers;
+            }
+            List<Long> containers = Lists.newArrayList(serverData.getContainersList());
+            Collections.sort(containers);
+            assignedContainers.addAll(containers);
+
+            ++serverIdx;
+        }
+
+        assertEquals(numServers / 2, numEmptyServers);
+        assertEquals(numServers / 2, numAssignedServers);
+
+        // each server should be assigned with equal number of containers
+        assertTrue(Sets.difference(currentCluster, assignedServers).isEmpty());
+        // all containers should be assigned
+        Set<Long> expectedContainers = LongStream.range(0L, NUM_STORAGE_CONTAINERS)
+            .mapToObj(scId -> Long.valueOf(scId))
+            .collect(Collectors.toSet());
+        assertTrue(Sets.difference(expectedContainers, assignedContainers).isEmpty());
+    }
+
+    @Test
+    public void testComputeIdealStateFromEmptyAssignment() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 8;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+
+        ClusterAssignmentData newAssignment = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+
+        verifyAssignmentData(newAssignment, currentCluster, true);
+    }
+
+    @Test
+    public void testComputeIdealStateIfClusterUnchanged() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 8;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+        ClusterAssignmentData newAssignment = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+        verifyAssignmentData(newAssignment, currentCluster, true);
+
+        ClusterAssignmentData newAssignment2 = controller.computeIdealState(
+            clusterMetadata,
+            newAssignment,
+            currentCluster);
+
+        // the state should not change if cluster is unchanged.
+        assertSame(newAssignment, newAssignment2);
+    }
+
+    @Test
+    public void testComputeIdealStateWhenHostsRemoved() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 8;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+
+        ClusterAssignmentData assignmentData = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+        verifyAssignmentData(assignmentData, currentCluster, true);
+
+        int newNumServers = 4;
+        Set<BookieSocketAddress> newCluster = newCluster(newNumServers);
+
+        ClusterAssignmentData newAssignmentData = controller.computeIdealState(
+            clusterMetadata,
+            assignmentData,
+            newCluster);
+        verifyAssignmentData(newAssignmentData, newCluster, false);
+    }
+
+    @Test
+    public void testComputeIdealStateWhenHostsAdded() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 4;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+
+        ClusterAssignmentData assignmentData = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+        verifyAssignmentData(assignmentData, currentCluster, true);
+
+        int newNumServers = 8;
+        Set<BookieSocketAddress> newCluster = newCluster(newNumServers);
+
+        ClusterAssignmentData newAssignmentData = controller.computeIdealState(
+            clusterMetadata,
+            assignmentData,
+            newCluster);
+        verifyAssignmentData(newAssignmentData, newCluster, false);
+    }
+
+    @Test
+    public void testComputeIdealStateWhenHostsRemovedAdded() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 4;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+
+        ClusterAssignmentData assignmentData = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+        verifyAssignmentData(assignmentData, currentCluster, true);
+
+        Set<BookieSocketAddress> serversToAdd = newCluster(6, numServers);
+        Set<BookieSocketAddress> serversToRemove = newCluster(2);
+
+        Set<BookieSocketAddress> newCluster = Sets.newHashSet(currentCluster);
+        newCluster.addAll(serversToAdd);
+        serversToRemove.forEach(newCluster::remove);
+
+        ClusterAssignmentData newAssignmentData = controller.computeIdealState(
+            clusterMetadata,
+            assignmentData,
+            newCluster);
+        verifyAssignmentData(newAssignmentData, newCluster, false);
+    }
+
+    @Test
+    public void testComputeIdealStateWhenHasMoreServers() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 2 * NUM_STORAGE_CONTAINERS;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+
+        ClusterAssignmentData assignmentData = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+        verifyAssignmentDataWhenHasMoreServers(assignmentData, currentCluster);
+    }
+
+    @Test
+    public void testComputeIdealStateWhenScaleToMoreServers() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 4;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+
+        ClusterAssignmentData assignmentData = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+        verifyAssignmentData(assignmentData, currentCluster, true);
+
+        numServers = 2 * NUM_STORAGE_CONTAINERS;
+        Set<BookieSocketAddress> newCluster = newCluster(numServers);
+        ClusterAssignmentData newAssignment = controller.computeIdealState(
+            clusterMetadata,
+            assignmentData,
+            newCluster);
+        verifyAssignmentDataWhenHasMoreServers(newAssignment, newCluster);
+    }
+
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java
index 34a1474b7..7031629ac 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java
@@ -69,7 +69,7 @@ public void tearDown() {
 
     private StorageContainer createStorageContainer() {
         StorageContainer sc = mock(StorageContainer.class);
-        when(sc.start()).thenReturn(FutureUtils.value(null));
+        when(sc.start()).thenReturn(FutureUtils.value(sc));
         when(sc.stop()).thenReturn(FutureUtils.value(null));
         return sc;
     }
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java
new file mode 100644
index 000000000..193503bcd
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.testing.MoreAsserts;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerFactory;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.distributedlog.ZooKeeperClusterTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Unit test {@link ZkStorageContainerManager}.
+ */
+public class ZkStorageContainerManagerTest extends ZooKeeperClusterTestCase {
+
+    private static final int NUM_STORAGE_CONTAINERS = 32;
+
+    @Rule
+    public final TestName runtime = new TestName();
+
+    private final Endpoint myEndpoint = Endpoint.newBuilder()
+        .setHostname("127.0.0.1")
+        .setPort(4181)
+        .build();
+
+    private CuratorFramework curatorClient;
+    private StorageContainerFactory mockScFactory;
+    private StorageContainerRegistry scRegistry;
+    private ZkClusterMetadataStore clusterMetadataStore;
+    private ZkStorageContainerManager scManager;
+    private OrderedScheduler scheduler;
+
+    @Before
+    public void setup() {
+        curatorClient = CuratorFrameworkFactory.newClient(
+            zkServers,
+            new ExponentialBackoffRetry(200, 10, 5000));
+        curatorClient.start();
+
+        clusterMetadataStore = spy(new ZkClusterMetadataStore(
+            curatorClient, zkServers, "/" + runtime.getMethodName()));
+        clusterMetadataStore.initializeCluster(NUM_STORAGE_CONTAINERS);
+
+        scheduler = OrderedScheduler.newSchedulerBuilder()
+            .name("test-scheduler")
+            .numThreads(1)
+            .build();
+
+        mockScFactory = mock(StorageContainerFactory.class);
+        scRegistry = spy(new StorageContainerRegistryImpl(mockScFactory, scheduler));
+
+        scManager = new ZkStorageContainerManager(
+            myEndpoint,
+            new StorageConfiguration(new CompositeConfiguration())
+                .setClusterControllerScheduleInterval(1, TimeUnit.SECONDS),
+            clusterMetadataStore,
+            scRegistry,
+            NullStatsLogger.INSTANCE);
+    }
+
+    @After
+    public void teardown() {
+        if (null != scManager) {
+            scManager.close();
+        }
+
+        if (null != scheduler) {
+            scheduler.shutdown();
+        }
+
+        if (null != curatorClient) {
+            curatorClient.close();
+        }
+
+        if (null != clusterMetadataStore) {
+            clusterMetadataStore.close();
+        }
+    }
+
+    private static StorageContainer createStorageContainer(long scId,
+                                                           CompletableFuture<StorageContainer> startFuture,
+                                                           CompletableFuture<Void> stopFuture) {
+        StorageContainer sc = mock(StorageContainer.class);
+        when(sc.getId()).thenReturn(scId);
+        when(sc.start()).thenReturn(startFuture);
+        when(sc.stop()).thenReturn(stopFuture);
+        return sc;
+    }
+
+    /**
+     * Test basic operations such as starting or stopping containers.
+     */
+    @Test
+    public void testBasicOps() throws Exception {
+        // start the storage container manager
+        scManager.start();
+
+        long containerId = 11L;
+        long containerId2 = 22L;
+
+        // mock a container and start it in the registry
+        CompletableFuture<StorageContainer> startFuture = new CompletableFuture<>();
+        CompletableFuture<Void> stopFuture = new CompletableFuture<>();
+        CompletableFuture<StorageContainer> startFuture2 = new CompletableFuture<>();
+        CompletableFuture<Void> stopFuture2 = new CompletableFuture<>();
+
+        StorageContainer mockSc = createStorageContainer(containerId, startFuture, stopFuture);
+        when(mockScFactory.createStorageContainer(eq(containerId)))
+            .thenReturn(mockSc);
+
+        StorageContainer mockSc2 = createStorageContainer(containerId2, startFuture2, stopFuture2);
+        when(mockScFactory.createStorageContainer(eq(containerId2)))
+            .thenReturn(mockSc2);
+
+        // update assignment map
+        ClusterAssignmentData cad = ClusterAssignmentData.newBuilder()
+            .putServers(
+                NetUtils.endpointToString(myEndpoint),
+                ServerAssignmentData.newBuilder()
+                    .addContainers(containerId)
+                    .build())
+            .build();
+        clusterMetadataStore.updateClusterAssignmentData(cad);
+
+        // notify the container to complete startup
+        startFuture.complete(mockSc);
+        verify(scRegistry, timeout(10000).times(1)).startStorageContainer(eq(containerId));
+        MoreAsserts.assertUtil(
+            ignored -> scManager.getLiveContainers().size() >= 1,
+            () -> null);
+        assertEquals(1, scManager.getLiveContainers().size());
+        assertTrue(scManager.getLiveContainers().containsKey(containerId));
+
+
+        // update assignment map to remove containerId and add containerId2
+        ClusterAssignmentData newCad = ClusterAssignmentData.newBuilder()
+            .putServers(
+                NetUtils.endpointToString(myEndpoint),
+                ServerAssignmentData.newBuilder()
+                    .addContainers(22L)
+                    .build())
+            .build();
+        clusterMetadataStore.updateClusterAssignmentData(newCad);
+
+        // notify the container1 to stop and container2 to start
+        FutureUtils.complete(stopFuture, null);
+        startFuture2.complete(mockSc2);
+        verify(scRegistry, timeout(10000).times(1)).stopStorageContainer(eq(containerId), same(mockSc));
+        verify(scRegistry, timeout(10000).times(1)).startStorageContainer(eq(containerId2));
+        MoreAsserts.assertUtil(
+            ignored -> !scManager.getLiveContainers().containsKey(containerId)
+                && scManager.getLiveContainers().containsKey(containerId2),
+            () -> null);
+        assertEquals(1, scManager.getLiveContainers().size());
+        assertFalse(scManager.getLiveContainers().containsKey(containerId));
+        assertTrue(scManager.getLiveContainers().containsKey(containerId2));
+    }
+
+    @Test
+    public void testShutdownPendingStartStorageContainer() throws Exception {
+        // start the storage container manager
+        scManager.start();
+
+        long containerId = 11L;
+
+        // mock a container and start it in the registry
+        CompletableFuture<StorageContainer> startFuture = new CompletableFuture<>();
+        CompletableFuture<Void> stopFuture = new CompletableFuture<>();
+
+        StorageContainer mockSc = createStorageContainer(
+            containerId, startFuture, stopFuture);
+        when(mockScFactory.createStorageContainer(eq(containerId)))
+            .thenReturn(mockSc);
+
+        // update assignment map
+        ClusterAssignmentData cad = ClusterAssignmentData.newBuilder()
+            .putServers(
+                NetUtils.endpointToString(myEndpoint),
+                ServerAssignmentData.newBuilder()
+                    .addContainers(containerId)
+                    .build())
+            .build();
+        clusterMetadataStore.updateClusterAssignmentData(cad);
+
+        // wait until container start is called
+        verify(scRegistry, timeout(10000).times(1)).startStorageContainer(eq(containerId));
+        assertEquals(0, scManager.getLiveContainers().size());
+        assertEquals(1, scManager.getPendingStartStopContainers().size());
+        assertTrue(scManager.getPendingStartStopContainers().contains(containerId));
+
+        // now shutting the manager down
+        cad = ClusterAssignmentData.newBuilder().build();
+        clusterMetadataStore.updateClusterAssignmentData(cad);
+
+        // the container should not be stopped since it is pending starting.
+        Thread.sleep(200);
+        verify(scRegistry, timeout(10000).times(0)).stopStorageContainer(eq(containerId), same(mockSc));
+        assertEquals(1, scManager.getPendingStartStopContainers().size());
+        assertTrue(scManager.getPendingStartStopContainers().contains(containerId));
+
+        // now complete the start future and the container is eventually going to shutdown
+        FutureUtils.complete(startFuture, mockSc);
+        FutureUtils.complete(stopFuture, null);
+
+        verify(scRegistry, timeout(10000).times(1)).stopStorageContainer(eq(containerId), same(mockSc));
+        MoreAsserts.assertUtil(
+            ignored -> scManager.getPendingStartStopContainers().size() == 0,
+            () -> null);
+        assertEquals(0, scManager.getLiveContainers().size());
+        assertEquals(0, scManager.getPendingStartStopContainers().size());
+    }
+
+    @Test
+    public void testStartContainerOnFailures() throws Exception {
+        scManager.close();
+
+        long containerId = 11L;
+        AtomicBoolean returnGoodContainer = new AtomicBoolean(false);
+
+        CompletableFuture<StorageContainer> startFuture = new CompletableFuture<>();
+        StorageContainer goodSc = createStorageContainer(containerId, startFuture, FutureUtils.Void());
+        mockScFactory = (scId) -> {
+            if (returnGoodContainer.get()) {
+                return goodSc;
+            } else {
+                return createStorageContainer(
+                    scId,
+                    FutureUtils.exception(new Exception("Failed to start")),
+                    FutureUtils.Void()
+                );
+            }
+        };
+        scRegistry = spy(new StorageContainerRegistryImpl(mockScFactory, scheduler));
+
+        scManager = new ZkStorageContainerManager(
+            myEndpoint,
+            new StorageConfiguration(new CompositeConfiguration())
+                .setClusterControllerScheduleInterval(1, TimeUnit.SECONDS),
+            clusterMetadataStore,
+            scRegistry,
+            NullStatsLogger.INSTANCE);
+
+
+        // start the storage container manager
+        scManager.start();
+
+        // update assignment map
+        ClusterAssignmentData cad = ClusterAssignmentData.newBuilder()
+            .putServers(
+                NetUtils.endpointToString(myEndpoint),
+                ServerAssignmentData.newBuilder()
+                    .addContainers(containerId)
+                    .build())
+            .build();
+        clusterMetadataStore.updateClusterAssignmentData(cad);
+
+        // wait until container start is called and verify it is not started.
+        verify(scRegistry, timeout(10000).atLeastOnce()).startStorageContainer(eq(containerId));
+        assertEquals(0, scManager.getLiveContainers().size());
+
+        // flip the flag to return a good container to simulate successful startup
+        returnGoodContainer.set(true);
+        FutureUtils.complete(startFuture, goodSc);
+
+        // wait until container start is called again and the container is started
+        MoreAsserts.assertUtil(
+            ignored -> scManager.getLiveContainers().size() >= 1,
+            () -> null);
+        assertEquals(1, scManager.getLiveContainers().size());
+        assertTrue(scManager.getLiveContainers().containsKey(containerId));
+    }
+
+
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/TestHelixStorageContainerManager.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/TestHelixStorageContainerManager.java
deleted file mode 100644
index e3efe942f..000000000
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/TestHelixStorageContainerManager.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.sc.helix;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.distributedlog.ZooKeeperClusterTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Unit test for {@link HelixStorageContainerManager}.
- */
-@Slf4j
-public class TestHelixStorageContainerManager extends ZooKeeperClusterTestCase {
-
-    @Rule
-    public TestName runtime = new TestName();
-
-    private HelixStorageController controller;
-    private OrderedScheduler scheduler;
-
-    @Before
-    public void setup() {
-        controller = new HelixStorageController(zkServers);
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .numThreads(1)
-            .name("test-helix-storagecontainer-manager")
-            .build();
-    }
-
-    @After
-    public void tearDown() {
-        if (null != controller) {
-            controller.close();
-        }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
-    }
-
-    private static Endpoint createEndpoint(String hostname,
-                                           int port) {
-        return Endpoint.newBuilder()
-            .setHostname(hostname)
-            .setPort(port)
-            .build();
-    }
-
-    @Test
-    public void testCreateCluster() {
-        String clusterName = runtime.getMethodName();
-
-        controller.createCluster(clusterName, 1024, 3);
-        List<String> clusters = controller.getAdmin().getClusters();
-        assertTrue(clusters.contains(clusterName));
-
-        // create an existing cluster
-        controller.createCluster(clusterName, 1024, 3);
-        clusters = controller.getAdmin().getClusters();
-        assertTrue(clusters.contains(clusterName));
-    }
-
-    @Test
-    public void testAddNode() {
-        String clusterName = runtime.getMethodName();
-        controller.createCluster(clusterName, 1024, 3);
-        List<String> instances = controller.getAdmin().getInstancesInCluster(clusterName);
-        assertEquals(0, instances.size());
-
-        Endpoint endpoint = createEndpoint("127.0.0.1", 4181);
-        controller.addNode(clusterName, endpoint, Optional.empty());
-        instances = controller.getAdmin().getInstancesInCluster(clusterName);
-        assertEquals(1, instances.size());
-        assertTrue(instances.contains(HelixStorageController.getEndpointName(endpoint)));
-
-        // add the instance again
-        controller.addNode(clusterName, endpoint, Optional.empty());
-        instances = controller.getAdmin().getInstancesInCluster(clusterName);
-        assertEquals(1, instances.size());
-        assertTrue(instances.contains(HelixStorageController.getEndpointName(endpoint)));
-
-        // add a different instance
-        Endpoint endpoint2 = createEndpoint("127.0.0.1", 4481);
-        controller.addNode(clusterName, endpoint2, Optional.empty());
-        instances = controller.getAdmin().getInstancesInCluster(clusterName);
-        assertEquals(2, instances.size());
-        assertTrue(instances.contains(HelixStorageController.getEndpointName(endpoint)));
-        assertTrue(instances.contains(HelixStorageController.getEndpointName(endpoint2)));
-    }
-
-    private StorageContainerRegistryImpl createRegistry() {
-        return new StorageContainerRegistryImpl(
-            (scId) -> {
-                StorageContainer sc = mock(StorageContainer.class);
-                when(sc.getId()).thenReturn(scId);
-                when(sc.start()).thenReturn(FutureUtils.value(null));
-                when(sc.stop()).thenReturn(FutureUtils.value(null));
-                return sc;
-            },
-            scheduler);
-    }
-
-    private HelixStorageContainerManager createManager(String clusterName,
-                                                       StorageConfiguration conf,
-                                                       StorageContainerRegistryImpl registry,
-                                                       Endpoint endpoint) {
-        return new HelixStorageContainerManager(
-            zkServers,
-            clusterName,
-            conf,
-            registry,
-            endpoint,
-            Optional.empty(),
-            NullStatsLogger.INSTANCE);
-    }
-
-    @Ignore
-    @Test
-    public void testStorageContainerManager() throws Exception {
-        String clusterName = runtime.getMethodName();
-        int numStorageContainers = 12;
-        int numHosts = 3;
-        controller.createCluster(clusterName, numStorageContainers, 3);
-
-        StorageConfiguration conf = new StorageConfiguration(new CompositeConfiguration());
-        Endpoint[] endpoints = new Endpoint[numHosts];
-        StorageContainerRegistryImpl[] registries = new StorageContainerRegistryImpl[numHosts];
-        HelixStorageContainerManager[] managers = new HelixStorageContainerManager[numHosts];
-
-        int basePort = 80;
-        for (int i = 0; i < numHosts; i++) {
-            endpoints[i] = createEndpoint("127.0.0.1", basePort + i);
-            registries[i] = createRegistry();
-            managers[i] = createManager(
-                clusterName, conf, registries[i], endpoints[i]);
-        }
-
-        managers[0].start();
-        while (registries[0].getNumStorageContainers() < numStorageContainers) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-
-        assertEquals(numStorageContainers, registries[0].getNumStorageContainers());
-        assertEquals(0, registries[1].getNumStorageContainers());
-        assertEquals(0, registries[2].getNumStorageContainers());
-
-        // start the second node
-        managers[1].start();
-        while (registries[0].getNumStorageContainers() > numStorageContainers / 2) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-        while (registries[1].getNumStorageContainers() < numStorageContainers / 2) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-
-        assertEquals(numStorageContainers / 2, registries[0].getNumStorageContainers());
-        assertEquals(numStorageContainers / 2, registries[1].getNumStorageContainers());
-        assertEquals(0, registries[2].getNumStorageContainers());
-
-        // start the third node
-        managers[2].start();
-        while (registries[0].getNumStorageContainers() > numStorageContainers / 3) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-        while (registries[1].getNumStorageContainers() > numStorageContainers / 3) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-        while (registries[2].getNumStorageContainers() < numStorageContainers / 3) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-
-        int totalStorageContainers = registries[0].getNumStorageContainers()
-            + registries[1].getNumStorageContainers()
-            + registries[2].getNumStorageContainers();
-        assertEquals("Expected " + numStorageContainers + "But " + totalStorageContainers + " found",
-            numStorageContainers, totalStorageContainers);
-        assertEquals(numStorageContainers / 3, registries[0].getNumStorageContainers());
-        assertEquals(numStorageContainers / 3, registries[1].getNumStorageContainers());
-        assertEquals(numStorageContainers / 3, registries[2].getNumStorageContainers());
-
-        for (int i = 0; i < 10; i++) {
-            int nid = ThreadLocalRandom.current().nextInt(numHosts);
-            long scId = ThreadLocalRandom.current().nextLong(numStorageContainers);
-            Endpoint endpoint = managers[nid].getStorageContainer(scId);
-            if (null != endpoint) {
-                assertTrue(endpoint.equals(endpoints[0])
-                    || endpoint.equals(endpoints[1])
-                    || endpoint.equals(endpoints[2]));
-            }
-        }
-
-        for (HelixStorageContainerManager manager : managers) {
-            manager.close();
-        }
-
-    }
-
-
-}
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
index 03bd10fdd..03edfb8df 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
+++ b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
@@ -23,7 +23,6 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.net.URI;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
@@ -54,7 +53,6 @@
 
     private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
-    private URI defaultBackendUri;
 
     @Override
     protected void doSetup() throws Exception {
@@ -69,7 +67,6 @@ protected void doSetup() throws Exception {
         adminClient = StorageClientBuilder.newBuilder()
             .withSettings(settings)
             .buildAdmin();
-        defaultBackendUri = URI.create("distributedlog://" + cluster.getZkServers() + "/stream/storage");
     }
 
     @Override
@@ -157,7 +154,7 @@ public void testStreamAPI() throws Exception {
         assertEquals(streamName, streamProps.getStreamName());
         assertEquals(
             StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(defaultBackendUri.toString())
+                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
                 .build(),
             streamProps.getStreamConf());
 
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
index 8a8eee4ec..ebe99f04e 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
+++ b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
@@ -19,7 +19,6 @@
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SPLIT_POLICY;
 import static org.junit.Assert.assertEquals;
 
-import java.net.URI;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
@@ -59,11 +58,9 @@
     private final NamespaceConfiguration colConf = NamespaceConfiguration.newBuilder()
         .setDefaultStreamConf(streamConf)
         .build();
-    private URI defaultBackendUri;
 
     @Override
     protected void doSetup() throws Exception {
-        defaultBackendUri = URI.create("distributedlog://" + cluster.getZkServers() + "/stream/storage");
         StorageClientSettings settings = StorageClientSettings.newBuilder()
             .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
             .usePlaintext(true)
@@ -106,7 +103,7 @@ public void testAdmin() throws Exception {
             FutureUtils.result(adminClient.getStream(nsName, streamName));
         assertEquals(
             StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(defaultBackendUri.toString())
+                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
                 .build()
             , properties.getStreamConf());
     }
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
index e5a9a7dc4..21dc004fc 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
+++ b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
@@ -43,7 +43,7 @@
     @Rule
     public final TemporaryFolder testDir = new TemporaryFolder();
 
-    protected final StreamClusterSpec spec;
+    protected StreamClusterSpec spec;
     protected StreamCluster cluster;
 
     protected StorageServerTestBase() {
@@ -59,7 +59,7 @@ protected StorageServerTestBase(StreamClusterSpec spec) {
 
     @Before
     public void setUp() throws Exception {
-        spec.storageRootDir(testDir.newFolder("tests"));
+        spec = spec.storageRootDir(testDir.newFolder("tests"));
         this.cluster = StreamCluster.build(spec);
         this.cluster.start();
         doSetup();
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
index 00813ce26..7add6d72d 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
+++ b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
@@ -31,7 +31,6 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
-import java.net.URI;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.StorageClient;
@@ -65,11 +64,9 @@
     private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
     private StorageClient storageClient;
-    private URI defaultBackendUri;
 
     @Override
     protected void doSetup() throws Exception {
-        defaultBackendUri = URI.create("distributedlog://" + cluster.getZkServers() + "/stream/storage");
         scheduler = OrderedScheduler.newSchedulerBuilder()
             .name("table-client-test")
             .numThreads(1)
@@ -128,7 +125,7 @@ public void testTableSimpleAPI() throws Exception {
         assertEquals(streamName, streamProps.getStreamName());
         assertEquals(
             StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(defaultBackendUri.toString())
+                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
                 .build(),
             streamProps.getStreamConf());
 
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
index d5fcde6b4..7db265b50 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
+++ b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
@@ -28,7 +28,6 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
-import java.net.URI;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.StorageClient;
@@ -76,11 +75,9 @@
     private StorageAdminClient adminClient;
     private StorageClient storageClient;
     private final OptionFactory<ByteBuf> optionFactory = new OptionFactoryImpl<>();
-    private URI defaultBackendUri;
 
     @Override
     protected void doSetup() throws Exception {
-        defaultBackendUri = URI.create("distributedlog://" + cluster.getZkServers() + "/stream/storage");
         scheduler = OrderedScheduler.newSchedulerBuilder()
             .name("table-client-test")
             .numThreads(1)
@@ -139,7 +136,7 @@ public void testTableAPI() throws Exception {
         assertEquals(streamName, streamProps.getStreamName());
         assertEquals(
             StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(defaultBackendUri.toString())
+                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
                 .build(),
             streamProps.getStreamConf());
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services