You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 20:41:20 UTC

[bookkeeper] 02/03: [TABLE SERVICE] a zk based storage container controller

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit ac1ec72033c0adb8a4dc9e77b687f08e43b748e6
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon May 7 16:50:29 2018 +0800

    [TABLE SERVICE] a zk based storage container controller
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    The original storage container "controller" was written in helix. The helix controller's placement is kind of not assuming any properties of shared storage. however the table service is leverage bookkeeper's as its segment/log store. the table service is built in a more `stateless` way, where a storage container can be moved between servers in a much lightweight way.
    
    Also helix codebase is a bit large, if we eventually going to eliminate zookeeper, the cost of switching it off helix will be much expensive. so this PR is introducing a simple zk based controller for assigning storage containers to servers.
    
    *Solution*
    
    In this zk base solution, it is comprised of 3 parts: server registration, cluster & storage controller, storage container manager.
    
    **registration**
    
    this solution leverages existing registration client/manager interfaces. so each storage server registers itself under `/stream/servers/available`.
    
    **cluster & storage controller**
    
    A storage server is elected as a the leader, which runs a ClusterController. The cluster controller reads cluster metadata from zookeeper (the cluster metadata includes like how many containers available in the cluster), and use a storage controller to compute an assignment plan as the ideal state for container assignment mapping, and then update the assignment mapping to zookeeper under `/stream/assignment`.
    
    **storage container manager**
    
    Each storage server runs a storage container manager, which watches the assignment plan in zookeeper. The cluster controller updates the assignment plan. Once it receives watches/updates from the assignment plan that computed by cluster controller, the storage container manager will align itself to the ideal assignment map by starting/stopping storage containers.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1392 from sijie/table_service_manager
---
 .../bookkeeper/common/testing/MoreAsserts.java     |   9 +
 pom.xml                                            |  14 +
 .../internal/RootRangeClientImplWithRetries.java   |   5 +-
 .../apache/bookkeeper/clients/utils/NetUtils.java  |  12 +
 .../distributedlog/ZooKeeperClusterTestCase.java   |  11 +
 stream/proto/src/main/proto/cluster.proto          |  38 +++
 .../bookkeeper/stream/cluster/StreamCluster.java   | 103 +++---
 .../bookkeeper/stream/server/StorageServer.java    |  66 +++-
 .../server/service/ClusterControllerService.java   |  66 ++++
 .../server/service/CuratorProviderService.java     |  75 +++++
 .../server/service/DLNamespaceProviderService.java |   6 +-
 .../service/RegistrationServiceProvider.java       | 127 ++++++++
 .../server/service/RegistrationStateService.java   | 107 +++++++
 .../storage/api/cluster/ClusterController.java     |  25 +-
 .../api/cluster/ClusterControllerLeader.java       |  36 +--
 .../cluster/ClusterControllerLeaderSelector.java   |  38 ++-
 .../storage/api/cluster/ClusterMetadataStore.java  |  88 ++++++
 .../stream/storage/api/cluster/package-info.java   |  26 +-
 .../storage/api/controller/StorageController.java  |  46 ---
 .../storage/api/controller/package-info.java       |  18 --
 .../stream/storage/api/sc/StorageContainer.java    |   2 +-
 .../storage/api/sc/StorageContainerRegistry.java   |  15 +-
 .../stream/storage/conf/StorageConfiguration.java  |  25 ++
 stream/storage/impl/pom.xml                        |  16 +
 .../stream/storage/StorageConstants.java           |  75 +++++
 .../impl/cluster/ClusterControllerImpl.java        |  64 ++++
 .../impl/cluster/ClusterControllerLeaderImpl.java  | 216 +++++++++++++
 .../impl/cluster/InMemClusterMetadataStore.java    | 104 ++++++
 .../cluster/ZkClusterControllerLeaderSelector.java |  98 ++++++
 .../ZkClusterControllerLeaderSelectorListener.java |  53 ++++
 .../impl/cluster/ZkClusterMetadataStore.java       | 209 ++++++++++++
 .../stream/storage/impl/cluster/package-info.java  |  26 +-
 .../impl/sc/DefaultStorageContainerController.java | 232 ++++++++++++++
 .../impl/sc/FailRequestStorageContainer.java       |   4 +-
 .../impl/sc/LocalStorageContainerManager.java      |   3 +-
 .../impl/sc/StorageContainerController.java        |  43 +++
 .../storage/impl/sc/StorageContainerImpl.java      |   4 +-
 .../impl/sc/StorageContainerRegistryImpl.java      |  64 ++--
 .../storage/impl/sc/ZkStorageContainerManager.java | 339 ++++++++++++++++++++
 .../sc/helix/HelixStorageContainerManager.java     | 119 -------
 .../impl/sc/helix/HelixStorageController.java      | 113 -------
 .../stream/storage/impl/sc/helix/WriteReadSMD.java | 163 ----------
 .../impl/sc/helix/WriteReadStateModelFactory.java  | 117 -------
 .../stream/storage/impl/sc/helix/package-info.java |  18 --
 .../impl/cluster/ClusterControllerImplTest.java    |  76 +++++
 .../cluster/ClusterControllerLeaderImplTest.java   | 226 +++++++++++++
 .../cluster/InMemClusterMetadataStoreTest.java     | 168 ++++++++++
 ...lusterControllerLeaderSelectorListenerTest.java |  46 +++
 .../ZkClusterControllerLeaderSelectorTest.java     | 196 ++++++++++++
 .../impl/cluster/ZkClusterMetadataStoreTest.java   | 204 ++++++++++++
 .../sc/DefaultStorageContainerControllerTest.java  | 352 +++++++++++++++++++++
 .../impl/sc/TestStorageContainerRegistryImpl.java  |   2 +-
 .../impl/sc/ZkStorageContainerManagerTest.java     | 322 +++++++++++++++++++
 .../sc/helix/TestHelixStorageContainerManager.java | 232 --------------
 .../tests/integration/StorageAdminClientTest.java  |   5 +-
 .../tests/integration/StorageClientTest.java       |   5 +-
 .../tests/integration/StorageServerTestBase.java   |   4 +-
 .../tests/integration/TableClientSimpleTest.java   |   5 +-
 .../stream/tests/integration/TableClientTest.java  |   5 +-
 59 files changed, 3838 insertions(+), 1048 deletions(-)

diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
index 938bf49..b62c8ae 100644
--- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
@@ -23,6 +23,9 @@ import static org.junit.Assert.assertTrue;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets.SetView;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
 
 /**
  * Assertion utils.
@@ -42,4 +45,10 @@ public final class MoreAsserts {
             diff.isEmpty());
     }
 
+    public static <T> void assertUtil(Predicate<T> predicate, Supplier<T> supplier) throws InterruptedException {
+        while (!predicate.test(supplier.get())) {
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+    }
+
 }
diff --git a/pom.xml b/pom.xml
index dd35a99..5472f2b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,6 +116,7 @@
     <commons-lang.version>2.6</commons-lang.version>
     <commons-lang3.version>3.3.2</commons-lang3.version>
     <commons-io.version>2.4</commons-io.version>
+    <curator.version>4.0.1</curator.version>
     <dropwizard.version>3.1.0</dropwizard.version>
     <finagle.version>6.44.0</finagle.version>
     <freebuilder.version>1.12.3</freebuilder.version>
@@ -414,6 +415,19 @@
         </exclusions>
       </dependency>
 
+      <!-- curator dependencies -->
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-recipes</artifactId>
+        <version>${curator.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
       <!-- http server dependencies -->
       <dependency>
         <groupId>com.twitter</groupId>
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
index d83d5a7..ba2c247 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
@@ -25,6 +25,7 @@ import io.grpc.StatusRuntimeException;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
 import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -37,6 +38,7 @@ import org.apache.bookkeeper.stream.proto.StreamProperties;
 /**
  * A root range client wrapper with retries.
  */
+@Slf4j
 class RootRangeClientImplWithRetries implements RootRangeClient {
 
     @VisibleForTesting
@@ -63,7 +65,8 @@ class RootRangeClientImplWithRetries implements RootRangeClient {
         } else if (cause instanceof RuntimeException) {
             return false;
         } else {
-            return true;
+            // storage level exceptions
+            return false;
         }
     }
 
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/NetUtils.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/NetUtils.java
index 23b9ca1..28fdd41 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/NetUtils.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/NetUtils.java
@@ -119,4 +119,16 @@ public class NetUtils {
             .build();
     }
 
+    /**
+     * Convert an endpoint to string.
+     *
+     * @param ep endpoint
+     * @return endpoint string representation
+     */
+    public static String endpointToString(Endpoint ep) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(ep.getHostname()).append(":").append(ep.getPort());
+        return sb.toString();
+    }
+
 }
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/ZooKeeperClusterTestCase.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/ZooKeeperClusterTestCase.java
index 1f7ac60..7fb8bef 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/ZooKeeperClusterTestCase.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/ZooKeeperClusterTestCase.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog;
 
 import java.io.File;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.commons.io.FileUtils;
@@ -29,8 +30,15 @@ import org.junit.BeforeClass;
 /**
  * ZooKeeperClusterTestCase.
  */
+@Slf4j
 public class ZooKeeperClusterTestCase {
 
+    static {
+        // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words
+        // are disabled by default due to security reasons
+        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+    }
+
     protected static File zkDir;
     protected static ZooKeeperServerShim zks;
     protected static String zkServers;
@@ -43,10 +51,13 @@ public class ZooKeeperClusterTestCase {
         zks = serverAndPort.getLeft();
         zkPort = serverAndPort.getRight();
         zkServers = "127.0.0.1:" + zkPort;
+
+        log.info("--- Setup zookeeper at {} ---", zkServers);
     }
 
     @AfterClass
     public static void shutdownZooKeeper() throws Exception {
+        log.info("--- Shutdown zookeeper at {} ---", zkServers);
         zks.stop();
         if (null != zkDir) {
             FileUtils.forceDeleteOnExit(zkDir);
diff --git a/stream/proto/src/main/proto/cluster.proto b/stream/proto/src/main/proto/cluster.proto
new file mode 100644
index 0000000..1454071
--- /dev/null
+++ b/stream/proto/src/main/proto/cluster.proto
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+import "common.proto";
+
+package bookkeeper.proto.cluster;
+
+option java_multiple_files = true;
+option java_package = "org.apache.bookkeeper.stream.proto.cluster";
+
+message ServerAssignmentData {
+    // assigned containers
+    repeated int64 containers = 1;
+}
+
+message ClusterAssignmentData {
+    map<string, ServerAssignmentData> servers = 1;
+}
+
+message ClusterMetadata {
+    int64 num_storage_containers = 1;
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 72a41e5..63e2679 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -16,17 +16,20 @@ package org.apache.bookkeeper.stream.cluster;
 
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getSegmentsRootPath;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getStoragePath;
 
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
+import java.net.URI;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
@@ -35,25 +38,25 @@ import org.apache.bookkeeper.clients.exceptions.NamespaceExistsException;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.StorageServer;
-import org.apache.bookkeeper.stream.storage.api.controller.StorageController;
+import org.apache.bookkeeper.stream.storage.StorageConstants;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageController;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LocalDLMEmulator;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.Transaction;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
 
 /**
  * A Cluster that runs a few storage nodes.
@@ -74,17 +77,10 @@ public class StreamCluster
         return new StreamCluster(spec);
     }
 
-    //
-    // DL Settings
-    //
-    private static final String ROOT_PATH = "/stream";
-    private static final String LEDGERS_PATH = "/stream/ledgers";
-    private static final String LEDGERS_AVAILABLE_PATH = "/stream/ledgers/available";
-    private static final String NAMESPACE = "/stream/storage";
-
-    private static ServerConfiguration newServerConfiguration(String zkEnsemble) {
+    private static ServerConfiguration newBookieConfiguration(String zkEnsemble) {
         ServerConfiguration serverConf = new ServerConfiguration();
-        serverConf.setMetadataServiceUri("zk://" + zkEnsemble + LEDGERS_PATH);
+        serverConf.setMetadataServiceUri(
+            "zk://" + zkEnsemble + getSegmentsRootPath(StorageConstants.ZK_METADATA_ROOT_PATH));
         serverConf.setAllowLoopback(true);
         serverConf.setGcWaitTime(300000);
         serverConf.setDiskUsageWarnThreshold(0.9999f);
@@ -117,8 +113,8 @@ public class StreamCluster
         return rpcEndpoints;
     }
 
-    public String getZkServers() {
-        return zkEnsemble;
+    public URI getDefaultBackendUri() {
+        return URI.create("distributedlog://" + zkEnsemble + getStoragePath(ZK_METADATA_ROOT_PATH));
     }
 
     private void startZooKeeper() throws Exception {
@@ -128,7 +124,7 @@ public class StreamCluster
             return;
         }
 
-        File zkDir = new File(spec.storageRootDir, "zookeeper");
+        File zkDir = new File(spec.storageRootDir(), "zookeeper");
         Pair<ZooKeeperServerShim, Integer> zkServerAndPort =
             LocalDLMEmulator.runZookeeperOnAnyPort(zkDir);
         zks = zkServerAndPort.getLeft();
@@ -145,39 +141,43 @@ public class StreamCluster
     }
 
     private void initializeCluster() throws Exception {
-        log.info("Initializing the stream cluster.");
-        ZooKeeper zkc = null;
-        try (StorageController controller = new HelixStorageController(zkEnsemble)) {
-            zkc = ZooKeeperClient.newBuilder()
-                .connectString(zkEnsemble)
-                .sessionTimeoutMs(60000)
-                .build();
-            Transaction txn = zkc.transaction();
-            txn.create(
-                ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            txn.create(
-                LEDGERS_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            txn.create(
-                LEDGERS_AVAILABLE_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(
+            zkEnsemble,
+            new ExponentialBackoffRetry(100, Integer.MAX_VALUE, 10000)
+        )) {
+            client.start();
+
+            ZkClusterMetadataStore store = new ZkClusterMetadataStore(client, zkEnsemble, ZK_METADATA_ROOT_PATH);
 
+            ClusterMetadata metadata;
             try {
-                txn.commit();
-            } catch (KeeperException ke) {
-                if (Code.NODEEXISTS != ke.code()) {
-                    throw ke;
+                metadata = store.getClusterMetadata();
+                log.info("Loaded cluster metadata : \n{}", metadata);
+            } catch (StorageRuntimeException sre) {
+                if (sre.getCause() instanceof KeeperException.NoNodeException) {
+                    log.info("Initializing the stream cluster.");
+                    store.initializeCluster(spec.numServers() * 2);
+                    log.info("Successfully initialized the stream cluster : \n{}", store.getClusterMetadata());
+                } else {
+                    throw sre;
                 }
             }
+        }
 
-            log.info("Initialize the bookkeeper metadata.");
-
-            // initialize the storage
-            controller.createCluster("stream/helix", spec.numServers() * 2, 1);
-            log.info("Initialized the helix metadata with {} storage containers.", spec.numServers() * 2);
-        } finally {
-            if (null != zkc) {
-                zkc.close();
+        // format the bookkeeper cluster
+        MetadataDrivers.runFunctionWithMetadataBookieDriver(newBookieConfiguration(zkEnsemble), driver -> {
+            try {
+                boolean initialized = driver.getRegistrationManager().initNewCluster();
+                if (initialized) {
+                    log.info("Successfully initialized the segment storage");
+                } else {
+                    log.info("The segment storage was already initialized");
+                }
+            } catch (Exception e) {
+                throw new StorageRuntimeException("Failed to initialize the segment storage", e);
             }
-        }
+            return null;
+        });
     }
 
     private LifecycleComponent startServer() throws Exception {
@@ -193,7 +193,7 @@ public class StreamCluster
             }
             LifecycleComponent server = null;
             try {
-                ServerConfiguration serverConf = newServerConfiguration(zkEnsemble);
+                ServerConfiguration serverConf = newBookieConfiguration(zkEnsemble);
                 serverConf.setBookiePort(bookiePort);
                 File bkDir = new File(spec.storageRootDir(), "bookie_" + bookiePort);
                 serverConf.setJournalDirName(bkDir.getPath());
@@ -300,9 +300,6 @@ public class StreamCluster
 
             // create default namespaces
             createDefaultNamespaces();
-
-            // wait for 10 seconds
-            TimeUnit.SECONDS.sleep(10);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index b48ea0d..d5ca397 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -13,6 +13,8 @@
  */
 package org.apache.bookkeeper.stream.server;
 
+import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
+
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import java.io.File;
@@ -35,14 +37,22 @@ import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
 import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
 import org.apache.bookkeeper.stream.server.grpc.GrpcServerSpec;
 import org.apache.bookkeeper.stream.server.service.BookieService;
+import org.apache.bookkeeper.stream.server.service.ClusterControllerService;
+import org.apache.bookkeeper.stream.server.service.CuratorProviderService;
 import org.apache.bookkeeper.stream.server.service.DLNamespaceProviderService;
 import org.apache.bookkeeper.stream.server.service.GrpcService;
+import org.apache.bookkeeper.stream.server.service.RegistrationServiceProvider;
+import org.apache.bookkeeper.stream.server.service.RegistrationStateService;
 import org.apache.bookkeeper.stream.server.service.StatsProviderService;
 import org.apache.bookkeeper.stream.server.service.StorageService;
 import org.apache.bookkeeper.stream.storage.RangeStoreBuilder;
 import org.apache.bookkeeper.stream.storage.StorageResources;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageContainerManager;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
+import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactoryImpl;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.Configuration;
@@ -184,11 +194,35 @@ public class StorageServer {
         // Create the bookie service
         BookieService bookieService = new BookieService(bkConf, rootStatsLogger);
 
+        // Create the curator provider service
+        CuratorProviderService curatorProviderService = new CuratorProviderService(
+            bookieService.serverConf(), dlConf, rootStatsLogger.scope("curator"));
+
         // Create the distributedlog namespace service
         DLNamespaceProviderService dlNamespaceProvider = new DLNamespaceProviderService(
             bookieService.serverConf(),
             dlConf,
-            rootStatsLogger.scope("dl"));
+            rootStatsLogger.scope("dlog"));
+
+        // Create a registration service provider
+        RegistrationServiceProvider regService = new RegistrationServiceProvider(
+            bookieService.serverConf(),
+            dlConf,
+            rootStatsLogger.scope("registration").scope("provider"));
+
+        // Create a cluster controller service
+        ClusterControllerService clusterControllerService = new ClusterControllerService(
+            storageConf,
+            () -> new ClusterControllerImpl(
+                new ZkClusterMetadataStore(
+                    curatorProviderService.get(),
+                    ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
+                    ZK_METADATA_ROOT_PATH),
+                regService.get(),
+                new DefaultStorageContainerController(),
+                new ZkClusterControllerLeaderSelector(curatorProviderService.get(), ZK_METADATA_ROOT_PATH),
+                storageConf),
+            rootStatsLogger.scope("cluster_controller"));
 
         // Create range (stream) store
         RangeStoreBuilder rangeStoreBuilder = RangeStoreBuilder.newBuilder()
@@ -200,16 +234,17 @@ public class StorageServer {
             .withNumStorageContainers(numStorageContainers)
             // the default log backend uri
             .withDefaultBackendUri(dlNamespaceProvider.getDlogUri())
-            // with the storage container manager (currently it is helix)
+            // with zk-based storage container manager
             .withStorageContainerManagerFactory((ignored, storeConf, registry) ->
-                new HelixStorageContainerManager(
-                    ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
-                    "stream/helix",
-                    storeConf,
-                    registry,
+                new ZkStorageContainerManager(
                     myEndpoint,
-                    instanceName,
-                    rootStatsLogger.scope("helix")))
+                    storageConf,
+                    new ZkClusterMetadataStore(
+                        curatorProviderService.get(),
+                        ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
+                        ZK_METADATA_ROOT_PATH),
+                    registry,
+                    rootStatsLogger.scope("sc").scope("manager")))
             // with the inter storage container client manager
             .withRangeStoreFactory(
                 new MVCCStoreFactoryImpl(
@@ -232,15 +267,26 @@ public class StorageServer {
         GrpcService grpcService = new GrpcService(
             serverConf, serverSpec, rpcStatsLogger);
 
+        // Create a registration state service only when service is ready.
+        RegistrationStateService regStateService = new RegistrationStateService(
+            myEndpoint,
+            bookieService.serverConf(),
+            bkConf,
+            regService,
+            rootStatsLogger.scope("registration"));
 
         // Create all the service stack
         return LifecycleComponentStack.newBuilder()
             .withName("storage-server")
             .addComponent(statsProviderService)     // stats provider
             .addComponent(bookieService)            // bookie server
+            .addComponent(curatorProviderService)   // service that provides curator client
             .addComponent(dlNamespaceProvider)      // service that provides dl namespace
+            .addComponent(regService)               // service that provides registration client
+            .addComponent(clusterControllerService) // service that run cluster controller service
             .addComponent(storageService)           // range (stream) store
             .addComponent(grpcService)              // range (stream) server (gRPC)
+            .addComponent(regStateService)          // service that manages server state
             .build();
     }
 
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/ClusterControllerService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/ClusterControllerService.java
new file mode 100644
index 0000000..8e45127
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/ClusterControllerService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.server.service;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterController;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+
+/**
+ * A service that runs cluster controller.
+ */
+@Slf4j
+public class ClusterControllerService
+    extends AbstractLifecycleComponent<StorageConfiguration> {
+
+    private final Supplier<ClusterController> controllerSupplier;
+    private ClusterController controller;
+
+    public ClusterControllerService(StorageConfiguration conf,
+                                    Supplier<ClusterController> controllerSupplier,
+                                    StatsLogger statsLogger) {
+        super("cluster-controller", conf, statsLogger);
+        this.controllerSupplier = controllerSupplier;
+    }
+
+    @Override
+    protected void doStart() {
+        if (null == controller) {
+            controller = controllerSupplier.get();
+            controller.start();
+            log.info("Successfully started the cluster controller.");
+        }
+    }
+
+    @Override
+    protected void doStop() {
+        if (null != controller) {
+            controller.stop();
+        }
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        // no-op
+    }
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
new file mode 100644
index 0000000..7d404f0
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.server.service;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+/**
+ * A service to provide a curator client.
+ */
+public class CuratorProviderService
+    extends AbstractLifecycleComponent<DLConfiguration>
+    implements Supplier<CuratorFramework> {
+
+    private final String zkServers;
+    private final RetryPolicy curatorRetryPolicy;
+    private final CuratorFramework curatorClient;
+
+    public CuratorProviderService(ServerConfiguration bkServerConf,
+                                  DLConfiguration conf,
+                                  StatsLogger statsLogger) {
+        super("curator-provider", conf, statsLogger);
+        this.zkServers = ZKMetadataDriverBase.resolveZkServers(bkServerConf);
+        this.curatorRetryPolicy = new ExponentialBackoffRetry(
+            bkServerConf.getZkRetryBackoffStartMs(),
+            Integer.MAX_VALUE,
+            bkServerConf.getZkRetryBackoffMaxMs());
+        this.curatorClient = CuratorFrameworkFactory
+            .newClient(zkServers, curatorRetryPolicy);
+    }
+
+    @Override
+    protected void doStart() {
+        curatorClient.start();
+    }
+
+    @Override
+    protected void doStop() {
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        curatorClient.close();
+    }
+
+    @Override
+    public CuratorFramework get() {
+        return curatorClient;
+    }
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
index 04d030d..e0359ce 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
@@ -23,6 +23,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
+import org.apache.bookkeeper.stream.storage.StorageConstants;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
@@ -79,8 +80,9 @@ public class DLNamespaceProviderService
                                       StatsLogger statsLogger) {
         super("namespace-provider", conf, statsLogger);
 
-        this.dlogUri = URI.create(String.format("distributedlog://%s/stream/storage",
-            ZKMetadataDriverBase.resolveZkServers(bkServerConf)));
+        this.dlogUri = URI.create(String.format("distributedlog://%s%s",
+            ZKMetadataDriverBase.resolveZkServers(bkServerConf),
+            StorageConstants.getStoragePath(StorageConstants.ZK_METADATA_ROOT_PATH)));
         this.bkServerConf = bkServerConf;
         this.dlConf = new DistributedLogConfiguration();
         this.dlConf.loadConf(conf);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
new file mode 100644
index 0000000..8db59fd
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.server.service;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.SERVERS_PATH;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.discover.ZKRegistrationClient;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.RetryPolicy;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+
+/**
+ * A service that is responsible for registration using bookkeeper registration api.
+ */
+@Slf4j
+public class RegistrationServiceProvider
+    extends AbstractLifecycleComponent<DLConfiguration>
+    implements Supplier<RegistrationClient> {
+
+    private final String zkServers;
+    private final RetryPolicy bkZkRetryPolicy;
+    private final String regPath;
+    private final ScheduledExecutorService regExecutor;
+    private ZooKeeperClient zkClient;
+    private RegistrationClient client;
+
+    public RegistrationServiceProvider(ServerConfiguration bkServerConf,
+                                       DLConfiguration conf,
+                                       StatsLogger statsLogger) {
+        super("registration-service-provider", conf, statsLogger);
+        this.zkServers = ZKMetadataDriverBase.resolveZkServers(bkServerConf);
+        this.regPath = ZK_METADATA_ROOT_PATH + "/" + SERVERS_PATH;
+        this.bkZkRetryPolicy = new BoundExponentialBackoffRetryPolicy(
+            bkServerConf.getZkRetryBackoffStartMs(),
+            bkServerConf.getZkRetryBackoffMaxMs(),
+            Integer.MAX_VALUE);
+        this.regExecutor = Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder().setNameFormat("registration-service-provider-scheduler").build());
+    }
+
+    @Override
+    public RegistrationClient get() {
+        checkNotNull(client, "retrieve registration client before starting registration service");
+        return client;
+    }
+
+    public ZooKeeperClient getZkClient() {
+        return zkClient;
+    }
+
+    public String getRegistrationPath() {
+        return regPath;
+    }
+
+    @SneakyThrows
+    @Override
+    protected void doStart() {
+        if (null == zkClient) {
+            try {
+                zkClient = ZooKeeperClient.newBuilder()
+                    .operationRetryPolicy(bkZkRetryPolicy)
+                    .connectString(zkServers)
+                    .statsLogger(statsLogger.scope("zk"))
+                    .build();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.error("Interrupted at creating zookeeper client to {}", zkServers, e);
+                throw e;
+            } catch (Exception e) {
+                log.error("Failed to create zookeeper client to {}", zkServers, e);
+                throw e;
+            }
+            client = new ZKRegistrationClient(zkClient, regPath, regExecutor);
+        }
+    }
+
+    @Override
+    protected void doStop() {
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        if (null != client) {
+            client.close();
+        }
+        if (null != zkClient) {
+            try {
+                zkClient.close();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.warn("Interrupted at closing zookeeper client to {}", zkServers, e);
+            }
+        }
+        this.regExecutor.shutdown();
+    }
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationStateService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationStateService.java
new file mode 100644
index 0000000..40261c0
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationStateService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.server.service;
+
+import java.io.IOException;
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.BookieStateManager;
+import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
+
+/**
+ * A service that manages the registration state for a given server.
+ *
+ * <p>It registers the server to registration service and handle the state transition.
+ */
+@Slf4j
+public class RegistrationStateService
+    extends AbstractLifecycleComponent<BookieConfiguration> {
+
+    private final Endpoint myEndpoint;
+    private final ServerConfiguration bkServerConf;
+    private final RegistrationServiceProvider regServiceProvider;
+    private RegistrationManager regManager;
+    private BookieStateManager stateManager;
+
+    public RegistrationStateService(Endpoint myEndpoint,
+                                    ServerConfiguration bkServerConf,
+                                    BookieConfiguration bookieConf,
+                                    RegistrationServiceProvider serviceProvider,
+                                    StatsLogger statsLogger) {
+        super("registration-state-service", bookieConf, statsLogger);
+        this.myEndpoint = myEndpoint;
+        this.bkServerConf = bkServerConf;
+        this.regServiceProvider = serviceProvider;
+    }
+
+    @Override
+    protected void doStart() {
+        if (null == regManager) {
+            regManager = new ZKRegistrationManager(
+                bkServerConf,
+                regServiceProvider.getZkClient(),
+                regServiceProvider.getRegistrationPath(),
+                () -> {
+                    if (null == stateManager) {
+                        log.warn("Registration state manager is not initialized yet");
+                        return;
+                    }
+                    stateManager.forceToUnregistered();
+                    // schedule a re-register operation
+                    stateManager.registerBookie(false);
+                });
+            try {
+                stateManager = new BookieStateManager(
+                    bkServerConf,
+                    statsLogger.scope("state"),
+                    () -> regManager,
+                    Collections.emptyList(),
+                    () -> NetUtils.endpointToString(myEndpoint));
+                stateManager.initState();
+                stateManager.registerBookie(true).get();
+                log.info("Successfully register myself under registration path {}/{}",
+                    regServiceProvider.getRegistrationPath(), NetUtils.endpointToString(myEndpoint));
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to intiailize a registration state service", e);
+            }
+        }
+    }
+
+    @Override
+    protected void doStop() {
+        stateManager.forceToShuttingDown();
+
+        // turn the server to readonly during shutting down process
+
+        stateManager.forceToReadOnly();
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        stateManager.close();
+        regManager.close();
+    }
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterController.java
similarity index 51%
copy from bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
copy to stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterController.java
index 938bf49..9ef7fd2 100644
--- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterController.java
@@ -16,30 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bookkeeper.common.testing;
-
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
-import java.util.Set;
+package org.apache.bookkeeper.stream.storage.api.cluster;
 
 /**
- * Assertion utils.
+ * Cluster controller operates the cluster state.
  */
-public final class MoreAsserts {
+public interface ClusterController {
 
-    private MoreAsserts() {}
+    void start();
 
-    public static <T> void assertSetEquals(Set<T> expected, Set<T> actual) {
-        SetView<T> diff = Sets.difference(expected, actual);
-        assertTrue(
-            "Expected set contains items not exist at actual set : " + diff.immutableCopy(),
-            diff.isEmpty());
-        diff = Sets.difference(actual, expected);
-        assertTrue(
-            "Actual set contains items not exist at expected set : " + diff.immutableCopy(),
-            diff.isEmpty());
-    }
+    void stop();
 
 }
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterControllerLeader.java
similarity index 52%
copy from bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
copy to stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterControllerLeader.java
index 938bf49..3d11144 100644
--- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterControllerLeader.java
@@ -16,30 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bookkeeper.common.testing;
-
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
-import java.util.Set;
+package org.apache.bookkeeper.stream.storage.api.cluster;
 
 /**
- * Assertion utils.
+ * Cluster controller that process the leader logic.
  */
-public final class MoreAsserts {
+public interface ClusterControllerLeader {
+
+    /**
+     * Process the controller logic as a leader.
+     */
+    void processAsLeader() throws Exception;
 
-    private MoreAsserts() {}
+    /**
+     * Suspend the leadership.
+     */
+    void suspend();
 
-    public static <T> void assertSetEquals(Set<T> expected, Set<T> actual) {
-        SetView<T> diff = Sets.difference(expected, actual);
-        assertTrue(
-            "Expected set contains items not exist at actual set : " + diff.immutableCopy(),
-            diff.isEmpty());
-        diff = Sets.difference(actual, expected);
-        assertTrue(
-            "Actual set contains items not exist at expected set : " + diff.immutableCopy(),
-            diff.isEmpty());
-    }
+    /**
+     * Resume the leadership.
+     */
+    void resume();
 
 }
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterControllerLeaderSelector.java
similarity index 51%
copy from bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
copy to stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterControllerLeaderSelector.java
index 938bf49..dff799d 100644
--- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterControllerLeaderSelector.java
@@ -16,30 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bookkeeper.common.testing;
-
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
-import java.util.Set;
+package org.apache.bookkeeper.stream.storage.api.cluster;
 
 /**
- * Assertion utils.
+ * A selector to select a leader among controller instances.
  */
-public final class MoreAsserts {
+public interface ClusterControllerLeaderSelector extends AutoCloseable {
 
-    private MoreAsserts() {}
+    /**
+     * Initialize the selector with <tt>leader</tt> logic.
+     *
+     * @param leader controller leader instance
+     */
+    void initialize(ClusterControllerLeader leader);
 
-    public static <T> void assertSetEquals(Set<T> expected, Set<T> actual) {
-        SetView<T> diff = Sets.difference(expected, actual);
-        assertTrue(
-            "Expected set contains items not exist at actual set : " + diff.immutableCopy(),
-            diff.isEmpty());
-        diff = Sets.difference(actual, expected);
-        assertTrue(
-            "Actual set contains items not exist at expected set : " + diff.immutableCopy(),
-            diff.isEmpty());
-    }
+    /**
+     * Start the selector.
+     */
+    void start();
 
+    /**
+     * Close the selector.
+     */
+    @Override
+    void close();
 }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
new file mode 100644
index 0000000..935552a
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.api.cluster;
+
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+
+/**
+ * Store the cluster related metadata, such as the number of storage containers, the mapping between servers
+ * to storage containers.
+ */
+public interface ClusterMetadataStore extends AutoCloseable {
+
+    /**
+     * Initialize the cluster metadata with the provided <i>numStorageContainers</i>.
+     *
+     * @param numStorageContainers number of storage containers.
+     */
+    void initializeCluster(int numStorageContainers);
+
+    /**
+     * Get the current cluster assignment data.
+     *
+     * @return the cluster assignment data.
+     */
+    ClusterAssignmentData getClusterAssignmentData();
+
+    /**
+     * Update the current cluster assignment data.
+     *
+     * @param assignmentData cluster assignment data
+     */
+    void updateClusterAssignmentData(ClusterAssignmentData assignmentData);
+
+    /**
+     * Watch the current cluster assignment data.
+     *
+     * @param watcher current cluster assignment data watcher
+     * @param executor the executor to run the <tt>watcher</tt>
+     */
+    void watchClusterAssignmentData(Consumer<Void> watcher, Executor executor);
+
+    /**
+     * Unwatch the current cluster assignment data.
+     *
+     * @param watcher current cluster assignment data watcher to remove
+     */
+    void unwatchClusterAssignmentData(Consumer<Void> watcher);
+
+    /**
+     * Returns the cluster metadata presents in the system.
+     *
+     * @return cluster metadata.
+     */
+    ClusterMetadata getClusterMetadata();
+
+    /**
+     * Update the current cluster metadata.
+     *
+     * @param clusterMetadata cluster metadata to update.
+     */
+    void updateClusterMetadata(ClusterMetadata clusterMetadata);
+
+    /**
+     * Close the metadata store.
+     */
+    @Override
+    void close();
+
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/package-info.java
similarity index 51%
copy from bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
copy to stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/package-info.java
index 938bf49..ccf4fac 100644
--- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/package-info.java
@@ -16,30 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bookkeeper.common.testing;
-
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
-import java.util.Set;
 
 /**
- * Assertion utils.
+ * Define the cluster operations for operating the storage.
  */
-public final class MoreAsserts {
-
-    private MoreAsserts() {}
-
-    public static <T> void assertSetEquals(Set<T> expected, Set<T> actual) {
-        SetView<T> diff = Sets.difference(expected, actual);
-        assertTrue(
-            "Expected set contains items not exist at actual set : " + diff.immutableCopy(),
-            diff.isEmpty());
-        diff = Sets.difference(actual, expected);
-        assertTrue(
-            "Actual set contains items not exist at expected set : " + diff.immutableCopy(),
-            diff.isEmpty());
-    }
-
-}
+package org.apache.bookkeeper.stream.storage.api.cluster;
\ No newline at end of file
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/controller/StorageController.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/controller/StorageController.java
deleted file mode 100644
index 28dd31a..0000000
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/controller/StorageController.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.api.controller;
-
-import java.util.Optional;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-
-/**
- * Interface for managing the storage cluster.
- */
-public interface StorageController extends AutoCloseable {
-
-    /**
-     * Create the cluster.
-     *
-     * @param clusterName          cluster name.
-     * @param numStorageContainers num storage containers.
-     * @param numReplicas          num replicas per storage container.
-     */
-    void createCluster(String clusterName,
-                       int numStorageContainers,
-                       int numReplicas);
-
-    /**
-     * Add a node to a cluster.
-     *
-     * @param clusterName  cluster name.
-     * @param endpointName node endpoint name.
-     */
-    void addNode(String clusterName,
-                 Endpoint endpoint,
-                 Optional<String> endpointName);
-
-}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/controller/package-info.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/controller/package-info.java
deleted file mode 100644
index 27f7cfe..0000000
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/controller/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Define the controller operations for operating the storage.
- */
-package org.apache.bookkeeper.stream.storage.api.controller;
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
index 5d5c75d..47855b0 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainer.java
@@ -41,7 +41,7 @@ public interface StorageContainer
      *
      * @return a future represents the result of starting a storage container.
      */
-    CompletableFuture<Void> start();
+    CompletableFuture<StorageContainer> start();
 
     /**
      * Stop the storage container.
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
index c793742..57634d3 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
@@ -41,15 +41,26 @@ public interface StorageContainerRegistry extends AutoCloseable {
      * @param scId storage container id
      * @return a future represents the started storage container or exception if failed to start.
      */
-    CompletableFuture<Void> startStorageContainer(long scId);
+    CompletableFuture<StorageContainer> startStorageContainer(long scId);
+
+    /**
+     * Stop the storage container in this registry blindly.
+     *
+     * @param scId storage container id
+     * @return a future represents the result of stopping a storage container or exception if failed to start.
+     */
+    default CompletableFuture<Void> stopStorageContainer(long scId) {
+        return stopStorageContainer(scId, null);
+    }
 
     /**
      * Stop the storage container in this registry.
      *
      * @param scId storage container id
+     * @param container storage container instance to stop
      * @return a future represents the result of stopping a storage container or exception if failed to start.
      */
-    CompletableFuture<Void> stopStorageContainer(long scId);
+    CompletableFuture<Void> stopStorageContainer(long scId, StorageContainer container);
 
     /**
      * Close the registry.
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
index b0f8909..0699a13 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
@@ -14,6 +14,8 @@
 package org.apache.bookkeeper.stream.storage.conf;
 
 import java.io.File;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.common.conf.ComponentConfiguration;
 import org.apache.commons.configuration.CompositeConfiguration;
 
@@ -28,6 +30,8 @@ public class StorageConfiguration extends ComponentConfiguration {
 
     private static final String SERVE_READONLY_TABLES = "serve.readonly.tables";
 
+    private static final String CONTROLLER_SCHEDULE_INTERVAL_MS = "cluster.controller.schedule.interval.ms";
+
     public StorageConfiguration(CompositeConfiguration conf) {
         super(conf, COMPONENT_PREFIX);
     }
@@ -65,4 +69,25 @@ public class StorageConfiguration extends ComponentConfiguration {
         return getBoolean(SERVE_READONLY_TABLES, false);
     }
 
+    /**
+     * Get the cluster controller schedule interval in milliseconds. The default value is 30 seconds.
+     *
+     * @return cluster controller schedule interval, in milliseconds.
+     */
+    public long getClusterControllerScheduleIntervalMs() {
+        return getLong(CONTROLLER_SCHEDULE_INTERVAL_MS, TimeUnit.SECONDS.toMillis(30));
+    }
+
+    /**
+     * Set the cluster controller schedule interval.
+     *
+     * @param time time value
+     * @param timeUnit time unit
+     * @return storage configuration
+     */
+    public StorageConfiguration setClusterControllerScheduleInterval(long time, TimeUnit timeUnit) {
+        setProperty(CONTROLLER_SCHEDULE_INTERVAL_MS, timeUnit.toMillis(time));
+        return this;
+    }
+
 }
diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml
index addd582..ae4a5e8 100644
--- a/stream/storage/impl/pom.xml
+++ b/stream/storage/impl/pom.xml
@@ -39,10 +39,19 @@
     </dependency>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-java-client-base</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
       <artifactId>statelib</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.helix</groupId>
       <artifactId>helix-core</artifactId>
       <version>${helix-core.version}</version>
@@ -54,6 +63,13 @@
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+      <version>${project.parent.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageConstants.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageConstants.java
new file mode 100644
index 0000000..d6ec8a3
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageConstants.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage;
+
+import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+
+/**
+ * Defines the storage constants
+ */
+public final class StorageConstants {
+
+    private StorageConstants() {}
+
+    //
+    // metadata related
+    //
+
+    public static final String CONTROLLER_PATH = "controller";
+    public static final String SERVERS_PATH = "servers";
+    public static final String CLUSTER_METADATA_PATH = "metadata";
+    public static final String CLUSTER_ASSIGNMENT_PATH = "assignment";
+    public static final String SEGMENTS_PATH = "segments";
+    public static final String STORAGE_PATH = "storage";
+
+    //
+    // ZooKeeper metadata related.
+    //
+
+    public static final String ZK_METADATA_ROOT_PATH = "/stream";
+
+    public static String getControllerPath(String rootPath) {
+        return rootPath + "/" + CONTROLLER_PATH;
+    }
+
+    public static String getServersPath(String rootPath) {
+        return rootPath + "/" + SERVERS_PATH;
+    }
+
+    public static String getWritableServersPath(String rootPath) {
+        return getServersPath(rootPath) + "/" + AVAILABLE_NODE;
+    }
+
+    public static String getClusterMetadataPath(String rootPath) {
+        return rootPath + "/" + CLUSTER_METADATA_PATH;
+    }
+
+    public static String getClusterAssignmentPath(String rootPath) {
+        return rootPath + "/" + CLUSTER_ASSIGNMENT_PATH;
+    }
+
+    public static String getSegmentsRootPath(String rootPath) {
+        return rootPath + "/" + SEGMENTS_PATH;
+    }
+
+    public static String getStoragePath(String rootPath) {
+        return rootPath + "/" + STORAGE_PATH;
+    }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerImpl.java
new file mode 100644
index 0000000..598e9b2
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerImpl.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import java.time.Duration;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterController;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeaderSelector;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerController;
+
+/**
+ * A service that elects a cluster controller leader for performing cluster actions,
+ * such as assigning containers to servers.
+ */
+public class ClusterControllerImpl implements ClusterController {
+
+    private final ClusterControllerLeaderSelector controllerLeaderSelector;
+
+    public ClusterControllerImpl(ClusterMetadataStore clusterMetadataStore,
+                                 RegistrationClient registrationClient,
+                                 StorageContainerController scController,
+                                 ClusterControllerLeaderSelector clusterControllerLeaderSelector,
+                                 StorageConfiguration conf) {
+        ClusterControllerLeader controllerLeader = new ClusterControllerLeaderImpl(
+            clusterMetadataStore,
+            scController,
+            registrationClient,
+            Duration.ofMillis(conf.getClusterControllerScheduleIntervalMs()));
+
+        this.controllerLeaderSelector = clusterControllerLeaderSelector;
+        this.controllerLeaderSelector.initialize(controllerLeader);
+    }
+
+
+    @Override
+    public void start() {
+        this.controllerLeaderSelector.start();
+    }
+
+    @Override
+    public void stop() {
+        this.controllerLeaderSelector.close();
+    }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImpl.java
new file mode 100644
index 0000000..807bf69
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImpl.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerController;
+import org.apache.bookkeeper.versioning.Versioned;
+
+@Slf4j
+public class ClusterControllerLeaderImpl implements ClusterControllerLeader, RegistrationListener {
+
+    // the metadata store for reading and writing cluster metadata.
+    private final ClusterMetadataStore clusterMetadataStore;
+
+    // the controller logic for assigning containers
+    private final StorageContainerController scController;
+
+    // permits that the controller leader can perform server changes.
+    @Getter(AccessLevel.PACKAGE)
+    private final Semaphore performServerChangesPermits;
+
+    // registration client that watch/unwatch registered servers.
+    private final RegistrationClient regClient;
+
+    // keep a reference to a set of available servers
+    private volatile Set<BookieSocketAddress> availableServers;
+
+    // variables for suspending controller
+    @Getter(AccessLevel.PACKAGE)
+    private final Object suspensionLock = new Object();
+    private volatile boolean suspended = false;
+
+    // last successful assignment happened at (timestamp)
+    @Getter(AccessLevel.PACKAGE)
+    private long lastSuccessfulAssigmentAt;
+
+    // the min interval that controller is scheduled to assign containers
+    private final Duration scheduleDuration;
+
+    ClusterControllerLeaderImpl(ClusterMetadataStore clusterMetadataStore,
+                                StorageContainerController scController,
+                                RegistrationClient regClient,
+                                Duration scheduleDuration) {
+        this.clusterMetadataStore = clusterMetadataStore;
+        this.scController = scController;
+        this.regClient = regClient;
+        this.performServerChangesPermits = new Semaphore(0);
+        this.lastSuccessfulAssigmentAt = -1L;
+        this.scheduleDuration = scheduleDuration;
+    }
+
+    /**
+     * Suspend the controller if the leader disconnects from zookeeper
+     */
+    @Override
+    public void suspend() {
+        synchronized (suspensionLock) {
+            suspended = true;
+            suspensionLock.notifyAll();
+        }
+    }
+
+    boolean isSuspended() {
+        return suspended;
+    }
+
+    /**
+     * Resume the controller.
+     */
+    @Override
+    public void resume() {
+        synchronized (suspensionLock) {
+            suspended = false;
+            suspensionLock.notifyAll();
+        }
+    }
+
+    @Override
+    public void processAsLeader() throws Exception {
+        log.info("Become controller leader to monitor servers for assigning storage containers.");
+
+        performServerChangesPermits.release();
+
+        // monitoring the servers
+        try {
+            this.regClient.watchWritableBookies(this).get();
+        } catch (Exception e) {
+            log.warn("Controller leader fails to watch servers : {}, giving up leadership", e.getMessage());
+            throw e;
+        }
+
+        // the leader is looping here to react to changes
+        while (true) {
+            try {
+                checkSuspension();
+
+                processServerChange();
+            } catch (InterruptedException ie) {
+                log.warn("Controller leader is interrupted, giving up leadership");
+
+                // stop monitoring the servers
+                this.regClient.unwatchWritableBookies(this);
+                throw ie;
+            } catch (Exception e) {
+                // if the leader is suspended due to losing connection to zookeeper
+                // we don't give leadership until it becomes leader again or being interrupted by curator
+                if (!suspended) {
+                    log.warn("Controller leader encountered exceptions on processing server changes," +
+                        " giving up leadership");
+
+                    // stop monitoring the servers
+                    this.regClient.unwatchWritableBookies(this);
+                    throw e;
+                }
+            }
+        }
+    }
+
+    private void checkSuspension() throws InterruptedException {
+        synchronized (suspensionLock) {
+            while (suspended) {
+                log.info("Controller leader is suspended, waiting for to be resumed");
+                suspensionLock.wait();
+                log.info("Controller leader is woke up from suspension");
+            }
+        }
+    }
+
+    private void processServerChange() throws InterruptedException {
+        // check if the leader can perform server changes
+        performServerChangesPermits.acquire();
+
+        long elapsedMs = System.currentTimeMillis() - lastSuccessfulAssigmentAt;
+        long remainingMs = scheduleDuration.toMillis() - elapsedMs;
+        if (remainingMs > 0) {
+            log.info("Waiting {} milliseconds for controller to assign containers", remainingMs);
+            TimeUnit.MILLISECONDS.sleep(remainingMs);
+        }
+
+        // now, the controller has permits and meet the time requirement for assigning containers.
+        performServerChangesPermits.drainPermits();
+
+        Set<BookieSocketAddress> availableServersSnapshot = availableServers;
+        if (null == availableServersSnapshot || availableServersSnapshot.isEmpty()) {
+            // haven't received any servers from registration service, wait for 200ms and retry.
+            if (lastSuccessfulAssigmentAt < 0) {
+                log.info("No servers is alive yet. Backoff 200ms and retry.");
+                TimeUnit.MILLISECONDS.sleep(200);
+                performServerChangesPermits.release();
+                return;
+            } else {
+                // there was already a successful assignment but all the servers are gone.
+                // it can be a registration service issue, so don't attempt to reassign the containers.
+                // return here direct to wait next server change
+                return;
+            }
+        }
+
+        ClusterMetadata clusterMetadata = clusterMetadataStore.getClusterMetadata();
+        ClusterAssignmentData currentState = clusterMetadataStore.getClusterAssignmentData();
+
+        // servers are changed, process the change.
+        ClusterAssignmentData newState = scController.computeIdealState(
+            clusterMetadata,
+            currentState,
+            availableServersSnapshot);
+
+        if (newState.equals(currentState)) {
+            // no assignment state is changed, so do nothing
+            if (log.isDebugEnabled()) {
+                log.debug("Assignment state is unchanged - {}", newState);
+            }
+        } else {
+            // update the assignment state
+            lastSuccessfulAssigmentAt = System.currentTimeMillis();
+            clusterMetadataStore.updateClusterAssignmentData(newState);
+        }
+    }
+
+    @Override
+    public void onBookiesChanged(Versioned<Set<BookieSocketAddress>> bookies) {
+        log.info("Cluster topology is changed - new cluster : {}", bookies);
+        // when bookies are changed, notify the leader to take actions
+        this.availableServers = bookies.getValue();
+        performServerChangesPermits.release();
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
new file mode 100644
index 0000000..da74867
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import lombok.AccessLevel;
+import lombok.Data;
+import lombok.Getter;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+
+/**
+ * An in-memory implementation of {@link ClusterMetadataStore}.
+ */
+public class InMemClusterMetadataStore implements ClusterMetadataStore {
+
+    @Data
+    private static class WatcherAndExecutor {
+        private final Consumer<Void> watcher;
+        private final Executor executor;
+    }
+
+    private final Map<Consumer<Void>, WatcherAndExecutor> watchers;
+
+    private ClusterMetadata metadata;
+    private ClusterAssignmentData assignmentData;
+
+    InMemClusterMetadataStore(int numStorageContainers) {
+        this.watchers = Maps.newHashMap();
+        initializeCluster(numStorageContainers);
+    }
+
+    synchronized int getNumWatchers() {
+        return watchers.size();
+    }
+
+    @Override
+    public synchronized void initializeCluster(int numStorageContainers) {
+        this.metadata = ClusterMetadata.newBuilder()
+            .setNumStorageContainers(numStorageContainers)
+            .build();
+        this.assignmentData = ClusterAssignmentData.newBuilder().build();
+    }
+
+    @Override
+    public synchronized ClusterAssignmentData getClusterAssignmentData() {
+        return assignmentData;
+    }
+
+    @Override
+    public synchronized void updateClusterAssignmentData(ClusterAssignmentData assignmentData) {
+        this.assignmentData = assignmentData;
+        watchers.values().forEach(wae -> wae.executor.execute(() -> wae.watcher.accept(null)));
+    }
+
+    @Override
+    public synchronized void watchClusterAssignmentData(Consumer<Void> watcher, Executor executor) {
+        WatcherAndExecutor wae = watchers.get(watcher);
+        if (null == wae) {
+            wae = new WatcherAndExecutor(watcher, executor);
+            watchers.put(watcher, wae);
+        }
+    }
+
+    @Override
+    public synchronized void unwatchClusterAssignmentData(Consumer<Void> watcher) {
+        watchers.remove(watcher);
+    }
+
+    @Override
+    public synchronized ClusterMetadata getClusterMetadata() {
+        return metadata;
+    }
+
+    @Override
+    public synchronized void updateClusterMetadata(ClusterMetadata clusterMetadata) {
+        this.metadata = clusterMetadata;
+    }
+
+    @Override
+    public void close() {
+        // no-op
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelector.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelector.java
new file mode 100644
index 0000000..4f4a096
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelector.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getControllerPath;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeaderSelector;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+
+/**
+ * A controller leader selector implemented using zookeeper.
+ */
+@Slf4j
+public class ZkClusterControllerLeaderSelector implements ClusterControllerLeaderSelector, ConnectionStateListener {
+
+    private final CuratorFramework client;
+    // the zookeeper path that controller is using for leader election
+    private final String controllerZkPath;
+
+    // leader selector to select leader
+    private ClusterControllerLeader leader;
+    private LeaderSelector leaderSelector;
+
+    public ZkClusterControllerLeaderSelector(CuratorFramework client,
+                                             String zkRootPath) {
+        this.client = client;
+        this.controllerZkPath = getControllerPath(zkRootPath);
+    }
+
+    @Override
+    public void initialize(ClusterControllerLeader leader) {
+        this.leader = leader;
+        ZkClusterControllerLeaderSelectorListener zkLeader = new ZkClusterControllerLeaderSelectorListener(leader);
+        this.leaderSelector = new LeaderSelector(client, controllerZkPath, zkLeader);
+        client.getConnectionStateListenable().addListener(this);
+    }
+
+    @Override
+    public void start() {
+        checkNotNull(leaderSelector, "leader selector is not initialized");
+        leaderSelector.autoRequeue();
+        leaderSelector.start();
+    }
+
+    @Override
+    public void close() {
+        if (null != leaderSelector) {
+            leaderSelector.interruptLeadership();
+            leaderSelector.close();
+        }
+    }
+
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState) {
+        switch (newState) {
+            case LOST:
+                log.warn("Connection to zookeeper is lost. So interrupt my current leadership.");
+                leaderSelector.interruptLeadership();
+                break;
+            case SUSPENDED:
+                if (leaderSelector.hasLeadership()) {
+                    log.info("Connection to zookeeper is disconnected, suspend the leader until it is reconnected.");
+                    leader.suspend();
+                }
+                break;
+            case RECONNECTED:
+                if (leaderSelector.hasLeadership()) {
+                    log.info("Connection to zookeeper is reconnected, resume the leader");
+                    leader.resume();
+                }
+                break;
+            default:
+                break;
+        }
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorListener.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorListener.java
new file mode 100644
index 0000000..87af0f4
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import org.apache.curator.framework.state.ConnectionState;
+
+/**
+ * This is the controller leader which watches the servers in the cluster and handles server level failures.
+ *
+ * <p>The controller leader is responsible for making sure all storage containers are assigned to servers, and load
+ * balancing storage containers if necessary.
+ */
+@Slf4j
+class ZkClusterControllerLeaderSelectorListener implements LeaderSelectorListener {
+
+    private final ClusterControllerLeader controller;
+
+
+    ZkClusterControllerLeaderSelectorListener(ClusterControllerLeader controller) {
+        this.controller = controller;
+    }
+
+    @Override
+    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
+        controller.processAsLeader();
+    }
+
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState) {
+        log.info("zookeeper connection state changed to {} for cluster controller", newState);
+    }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
new file mode 100644
index 0000000..bcf70c3
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getClusterAssignmentPath;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getClusterMetadataPath;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getSegmentsRootPath;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getServersPath;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getStoragePath;
+import static org.apache.bookkeeper.stream.storage.StorageConstants.getWritableServersPath;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.metadata.DLMetadata;
+
+/**
+ * A zookeeper based implementation of cluster metadata store.
+ */
+@Slf4j
+public class ZkClusterMetadataStore implements ClusterMetadataStore {
+
+    private final CuratorFramework client;
+
+    private final String zkServers;
+    private final String zkRootPath;
+    private final String zkClusterMetadataPath;
+    private final String zkClusterAssignmentPath;
+
+    private final Map<Consumer<Void>, NodeCacheListener> assignmentDataConsumers;
+    private NodeCache assignmentDataCache;
+
+    private volatile boolean closed = false;
+
+    public ZkClusterMetadataStore(CuratorFramework client, String zkServers, String zkRootPath) {
+        this.client = client;
+        this.zkServers = zkServers;
+        this.zkRootPath = zkRootPath;
+        this.zkClusterMetadataPath = getClusterMetadataPath(zkRootPath);
+        this.zkClusterAssignmentPath = getClusterAssignmentPath(zkRootPath);
+        this.assignmentDataConsumers = new HashMap<>();
+    }
+
+    synchronized int getNumWatchers() {
+        return assignmentDataConsumers.size();
+    }
+
+    @Override
+    public void close() {
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            closed = true;
+
+            if (null != assignmentDataCache) {
+                try {
+                    assignmentDataCache.close();
+                } catch (IOException e) {
+                    log.warn("Failed to close assignment data cache", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void initializeCluster(int numStorageContainers) {
+        ClusterMetadata metadata = ClusterMetadata.newBuilder()
+            .setNumStorageContainers(numStorageContainers)
+            .build();
+        ClusterAssignmentData assigmentData = ClusterAssignmentData.newBuilder()
+            .build();
+        try {
+            // we are using dlog for the storage backend, so we need to initialize the dlog namespace
+            BKDLConfig dlogConfig = new BKDLConfig(
+                zkServers, getSegmentsRootPath(zkRootPath));
+            DLMetadata dlogMetadata = DLMetadata.create(dlogConfig);
+
+            client.transaction()
+                .forOperations(
+                    client.transactionOp().create().forPath(zkRootPath),
+                    client.transactionOp().create().forPath(zkClusterMetadataPath, metadata.toByteArray()),
+                    client.transactionOp().create().forPath(zkClusterAssignmentPath, assigmentData.toByteArray()),
+                    client.transactionOp().create().forPath(getServersPath(zkRootPath)),
+                    client.transactionOp().create().forPath(getWritableServersPath(zkRootPath)),
+                    client.transactionOp().create().forPath(getStoragePath(zkRootPath), dlogMetadata.serialize()));
+        } catch (Exception e) {
+            throw new StorageRuntimeException("Failed to initialize storage cluster with "
+                + numStorageContainers + " storage containers", e);
+        }
+    }
+
+    @Override
+    public ClusterAssignmentData getClusterAssignmentData() {
+        try {
+            byte[] data = client.getData().forPath(zkClusterAssignmentPath);
+            return ClusterAssignmentData.parseFrom(data);
+        } catch (InvalidProtocolBufferException ie) {
+            throw new StorageRuntimeException("The cluster assignment data from zookeeper @"
+                + zkClusterAssignmentPath + " is corrupted", ie);
+        } catch (Exception e) {
+            throw new StorageRuntimeException("Failed to fetch cluster assignment data from zookeeper @"
+                + zkClusterAssignmentPath, e);
+        }
+    }
+
+    @Override
+    public void updateClusterAssignmentData(ClusterAssignmentData assigmentData) {
+        byte[] data = assigmentData.toByteArray();
+        try {
+            client.setData().forPath(zkClusterAssignmentPath, data);
+        } catch (Exception e) {
+            throw new StorageRuntimeException("Failed to update cluster assignment data to zookeeper @"
+                + zkClusterAssignmentPath, e);
+        }
+    }
+
+    @Override
+    public void watchClusterAssignmentData(Consumer<Void> watcher, Executor executor) {
+        synchronized (this) {
+            if (assignmentDataCache == null) {
+                assignmentDataCache = new NodeCache(client, zkClusterAssignmentPath);
+                try {
+                    assignmentDataCache.start();
+                } catch (Exception e) {
+                    throw new StorageRuntimeException("Failed to watch cluster assignment data", e);
+                }
+            }
+            NodeCacheListener listener = assignmentDataConsumers.get(watcher);
+            if (null == listener) {
+                listener = () -> watcher.accept(null);
+                assignmentDataConsumers.put(watcher, listener);
+                assignmentDataCache.getListenable().addListener(listener, executor);
+            }
+        }
+    }
+
+    @Override
+    public void unwatchClusterAssignmentData(Consumer<Void> watcher) {
+        synchronized (this) {
+            NodeCacheListener listener = assignmentDataConsumers.remove(watcher);
+            if (null != listener && null != assignmentDataCache) {
+                assignmentDataCache.getListenable().removeListener(listener);
+            }
+            if (assignmentDataConsumers.isEmpty() && null != assignmentDataCache) {
+                try {
+                    assignmentDataCache.close();
+                } catch (IOException e) {
+                    log.warn("Failed to close assignment data cache when there is no watcher", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public ClusterMetadata getClusterMetadata() {
+        try {
+            byte[] data = client.getData().forPath(zkClusterMetadataPath);
+            return ClusterMetadata.parseFrom(data);
+        } catch (InvalidProtocolBufferException ie) {
+            throw new StorageRuntimeException("The cluster metadata from zookeeper @"
+                + zkClusterMetadataPath + " is corrupted", ie);
+        } catch (Exception e) {
+            throw new StorageRuntimeException("Failed to fetch cluster metadata from zookeeper @"
+                + zkClusterMetadataPath, e);
+        }
+    }
+
+    @Override
+    public void updateClusterMetadata(ClusterMetadata metadata) {
+        byte[] data = metadata.toByteArray();
+        try {
+            client.setData().forPath(zkClusterMetadataPath, data);
+        } catch (Exception e) {
+            throw new StorageRuntimeException("Failed to update cluster metadata to zookeeper @"
+                + zkClusterMetadataPath, e);
+        }
+    }
+
+
+}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/package-info.java
similarity index 51%
copy from bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
copy to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/package-info.java
index 938bf49..b843c8e 100644
--- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/MoreAsserts.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/package-info.java
@@ -16,30 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bookkeeper.common.testing;
-
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
-import java.util.Set;
 
 /**
- * Assertion utils.
+ * Cluster operation implementation.
  */
-public final class MoreAsserts {
-
-    private MoreAsserts() {}
-
-    public static <T> void assertSetEquals(Set<T> expected, Set<T> actual) {
-        SetView<T> diff = Sets.difference(expected, actual);
-        assertTrue(
-            "Expected set contains items not exist at actual set : " + diff.immutableCopy(),
-            diff.isEmpty());
-        diff = Sets.difference(actual, expected);
-        assertTrue(
-            "Actual set contains items not exist at expected set : " + diff.immutableCopy(),
-            diff.isEmpty());
-    }
-
-}
+package org.apache.bookkeeper.stream.storage.impl.cluster;
\ No newline at end of file
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerController.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerController.java
new file mode 100644
index 0000000..2a3d704
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerController.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * The default implementation of storage container controller.
+ *
+ * <p>The goal of this controller is uniformly distributing storage containers across all alive servers in
+ * the cluster.
+ *
+ * <p>The algorithm here is based on the count-based stream balancer in distributedlog-proxy-server.
+ */
+@Slf4j
+public class DefaultStorageContainerController implements StorageContainerController {
+
+    static final class ServerAssignmentDataComparator
+        implements Comparator<Pair<BookieSocketAddress, LinkedList<Long>>> {
+
+        @Override
+        public int compare(Pair<BookieSocketAddress, LinkedList<Long>> o1, Pair<BookieSocketAddress, LinkedList<Long>> o2) {
+            int res = Integer.compare(o1.getValue().size(), o2.getValue().size());
+            if (0 == res) {
+                // two servers have same number of container
+                // the order of these two servers doesn't matter, so use any attribute than can provide deterministic
+                // ordering during state computation is good enough
+                return String.CASE_INSENSITIVE_ORDER.compare(
+                    o1.getKey().toString(),
+                    o2.getKey().toString());
+            } else {
+                return res;
+            }
+        }
+    }
+
+    @Override
+    public ClusterAssignmentData computeIdealState(ClusterMetadata clusterMetadata,
+                                                   ClusterAssignmentData currentState,
+                                                   Set<BookieSocketAddress> currentCluster) {
+
+        if (currentCluster.isEmpty()) {
+            log.info("Current cluster is empty. No alive server is found.");
+            return currentState;
+        }
+
+        // 1. get current server assignments
+        Map<BookieSocketAddress, Set<Long>> currentServerAssignments;
+        try {
+            currentServerAssignments = currentState.getServersMap()
+                .entrySet()
+                .stream()
+                .collect(Collectors.toMap(
+                    e1 -> {
+                        try {
+                            return new BookieSocketAddress(e1.getKey());
+                        } catch (UnknownHostException uhe) {
+                            log.error("Invalid cluster ");
+                            throw new UncheckedExecutionException("Invalid server found in current assignment map"
+                                + e1.getKey(), uhe);
+                        }
+                    },
+                    e2 -> e2.getValue().getContainersList().stream().collect(Collectors.toSet())
+                ));
+        } catch (UncheckedExecutionException uee) {
+            log.warn("Invalid cluster assignment data is found : {} - {}. Recompute assignment from empty state",
+                currentState, uee.getCause().getMessage());
+            currentServerAssignments = Maps.newHashMap();
+        }
+        Set<BookieSocketAddress> currentServersAssigned = currentServerAssignments.keySet();
+
+        // 2. if no servers is assigned, initialize the ideal state
+        if (currentServersAssigned.isEmpty()) {
+            return initializeIdealState(clusterMetadata, currentCluster);
+        }
+
+        // 3. get the cluster diffs
+        Set<BookieSocketAddress> serversAdded =
+            Sets.difference(currentCluster, currentServersAssigned).immutableCopy();
+        Set<BookieSocketAddress> serversRemoved =
+            Sets.difference(currentServersAssigned, currentCluster).immutableCopy();
+
+        if (serversAdded.isEmpty() && serversRemoved.isEmpty()) {
+            // cluster is unchanged, assuming the current state is ideal, no re-assignment is required.
+            return currentState;
+        }
+
+        log.info("Storage container controller detects cluster changed:\n"
+                + "\t {} servers added: {}\n\t {} servers removed: {}",
+            serversAdded.size(), serversAdded, serversRemoved.size(), serversRemoved);
+
+        // 4. compute the containers that owned by servers removed. these containers are needed to be reassigned.
+        Set<Long> containersToReassign = currentServerAssignments.entrySet().stream()
+            .filter(serverEntry -> !currentCluster.contains(serverEntry.getKey()))
+            .flatMap(serverEntry -> serverEntry.getValue().stream())
+            .collect(Collectors.toSet());
+
+        // 5. use an ordered set as priority deque to sort the servers by the number of assigned containers
+        TreeSet<Pair<BookieSocketAddress, LinkedList<Long>>> assignmentQueue
+            = new TreeSet<>(new ServerAssignmentDataComparator());
+        for (Map.Entry<BookieSocketAddress, Set<Long>> entry : currentServerAssignments.entrySet()) {
+            BookieSocketAddress host = entry.getKey();
+
+            if (!currentCluster.contains(host)) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Host {} is not in current cluster anymore", host);
+                }
+                continue;
+            } else {
+                if (log.isTraceEnabled()) {
+                    log.trace("Adding host {} to assignment queue", host);
+                }
+                assignmentQueue.add(Pair.of(host, Lists.newLinkedList(entry.getValue())));
+            }
+        }
+
+        // 6. add new servers
+        for (BookieSocketAddress server : serversAdded) {
+            assignmentQueue.add(Pair.of(server, Lists.newLinkedList()));
+        }
+
+        // 7. assign the containers that are needed to be reassigned.
+        for (Long containerId : containersToReassign) {
+            Pair<BookieSocketAddress, LinkedList<Long>> leastLoadedServer = assignmentQueue.pollFirst();
+            leastLoadedServer.getValue().add(containerId);
+            assignmentQueue.add(leastLoadedServer);
+        }
+
+        // 8. rebalance the containers if needed
+        int diffAllowed;
+        if (assignmentQueue.size() > clusterMetadata.getNumStorageContainers()) {
+            diffAllowed = 1;
+        } else {
+            diffAllowed = clusterMetadata.getNumStorageContainers() % assignmentQueue.size() == 0 ? 0 : 1;
+        }
+
+        Pair<BookieSocketAddress, LinkedList<Long>> leastLoaded = assignmentQueue.first();
+        Pair<BookieSocketAddress, LinkedList<Long>> mostLoaded = assignmentQueue.last();
+        while (mostLoaded.getValue().size() - leastLoaded.getValue().size() > diffAllowed) {
+            leastLoaded = assignmentQueue.pollFirst();
+            mostLoaded = assignmentQueue.pollLast();
+
+            // move container from mostLoaded to leastLoaded
+            Long containerId = mostLoaded.getValue().removeFirst();
+            // add the container to the end to avoid balancing this container again.
+            leastLoaded.getValue().addLast(containerId);
+
+            assignmentQueue.add(leastLoaded);
+            assignmentQueue.add(mostLoaded);
+
+            leastLoaded = assignmentQueue.first();
+            mostLoaded = assignmentQueue.last();
+        }
+
+        // 9. the new ideal state is computed, finalize it
+        Map<String, ServerAssignmentData> newAssignmentMap = Maps.newHashMap();
+        assignmentQueue.forEach(assignment -> newAssignmentMap.put(
+            assignment.getKey().toString(),
+            ServerAssignmentData.newBuilder()
+                .addAllContainers(assignment.getValue())
+                .build()));
+        return ClusterAssignmentData.newBuilder()
+            .putAllServers(newAssignmentMap)
+            .build();
+    }
+
+    static ClusterAssignmentData initializeIdealState(ClusterMetadata clusterMetadata,
+                                                      Set<BookieSocketAddress> currentCluster) {
+        List<BookieSocketAddress> serverList = Lists.newArrayListWithExpectedSize(currentCluster.size());
+        serverList.addAll(currentCluster);
+        Collections.shuffle(serverList);
+
+        int numServers = currentCluster.size();
+        int numTotalContainers = (int) clusterMetadata.getNumStorageContainers();
+        int numContainersPerServer = numTotalContainers / currentCluster.size();
+
+        Map<String, ServerAssignmentData> assignmentMap = Maps.newHashMap();
+        for (int serverIdx = 0; serverIdx < serverList.size(); serverIdx++) {
+            BookieSocketAddress server = serverList.get(serverIdx);
+
+            int finalServerIdx = serverIdx;
+            ServerAssignmentData assignmentData = ServerAssignmentData.newBuilder()
+                .addAllContainers(
+                    LongStream.rangeClosed(0, numContainersPerServer).boxed()
+                        .map(j -> j * numServers + finalServerIdx)
+                        .filter(containerId -> containerId < numTotalContainers)
+                        .collect(Collectors.toSet()))
+                .build();
+            assignmentMap.put(server.toString(), assignmentData);
+        }
+
+        return ClusterAssignmentData.newBuilder()
+            .putAllServers(assignmentMap)
+            .build();
+    }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
index 39e00c0..8217e89 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java
@@ -63,8 +63,8 @@ public final class FailRequestStorageContainer implements StorageContainer {
     }
 
     @Override
-    public CompletableFuture<Void> start() {
-        return CompletableFuture.completedFuture(null);
+    public CompletableFuture<StorageContainer> start() {
+        return CompletableFuture.completedFuture(this);
     }
 
     @Override
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/LocalStorageContainerManager.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/LocalStorageContainerManager.java
index edb1e5f..10469ae 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/LocalStorageContainerManager.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/LocalStorageContainerManager.java
@@ -22,6 +22,7 @@ import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -54,7 +55,7 @@ public class LocalStorageContainerManager
 
     @Override
     protected void doStart() {
-        List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(numStorageContainers);
+        List<CompletableFuture<StorageContainer>> futures = Lists.newArrayListWithExpectedSize(numStorageContainers);
         for (int scId = 0; scId < numStorageContainers; scId++) {
             futures.add(this.registry.startStorageContainer(scId));
         }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerController.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerController.java
new file mode 100644
index 0000000..246f52c
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerController.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import java.util.Set;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+
+/**
+ * Storage container controller is used for assigning containers to servers.
+ */
+public interface StorageContainerController {
+
+    /**
+     * Compute the ideal container assignment state based on hosts alive in the cluster.
+     *
+     * @param clusterMetadata cluster metadata
+     * @param currentState current container assignment state
+     * @param currentCluster current servers alive in the cluster
+     * @return the compute ideal assignment state
+     */
+    ClusterAssignmentData computeIdealState(ClusterMetadata clusterMetadata,
+                                            ClusterAssignmentData currentState,
+                                            Set<BookieSocketAddress> currentCluster);
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
index 0905ce9..52fa060 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
@@ -172,7 +172,7 @@ public class StorageContainerImpl
     }
 
     @Override
-    public CompletableFuture<Void> start() {
+    public CompletableFuture<StorageContainer> start() {
         log.info("Starting storage container ({}) ...", getId());
 
         List<CompletableFuture<Void>> futures = Lists.newArrayList(
@@ -181,7 +181,7 @@ public class StorageContainerImpl
 
         return FutureUtils.collect(futures).thenApply(ignored -> {
             log.info("Successfully started storage container ({}).", getId());
-            return null;
+            return StorageContainerImpl.this;
         });
     }
 
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
index 6d27f66..4499801 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
@@ -37,7 +37,7 @@ public class StorageContainerRegistryImpl implements StorageContainerRegistry {
     private static final String COMPONENT_NAME = StorageContainerRegistry.class.getSimpleName();
 
     private final StorageContainerFactory scFactory;
-    private final ConcurrentMap<Long, StorageContainer> groups;
+    private final ConcurrentMap<Long, StorageContainer> containers;
     private final ReentrantReadWriteLock closeLock;
     private final StorageContainer failRequestStorageContainer;
     private boolean closed = false;
@@ -46,27 +46,27 @@ public class StorageContainerRegistryImpl implements StorageContainerRegistry {
                                         OrderedScheduler scheduler) {
         this.scFactory = factory;
         this.failRequestStorageContainer = FailRequestStorageContainer.of(scheduler);
-        this.groups = Maps.newConcurrentMap();
+        this.containers = Maps.newConcurrentMap();
         this.closeLock = new ReentrantReadWriteLock();
     }
 
     @VisibleForTesting
     public void setStorageContainer(long scId, StorageContainer group) {
-        groups.put(scId, group);
+        containers.put(scId, group);
     }
 
     @Override
     public int getNumStorageContainers() {
-        return groups.size();
+        return containers.size();
     }
 
     @Override
     public StorageContainer getStorageContainer(long storageContainerId) {
-        return groups.getOrDefault(storageContainerId, failRequestStorageContainer);
+        return containers.getOrDefault(storageContainerId, failRequestStorageContainer);
     }
 
     @Override
-    public CompletableFuture<Void> startStorageContainer(long scId) {
+    public CompletableFuture<StorageContainer> startStorageContainer(long scId) {
         closeLock.readLock().lock();
         try {
             return unsafeStartStorageContainer(scId);
@@ -75,49 +75,71 @@ public class StorageContainerRegistryImpl implements StorageContainerRegistry {
         }
     }
 
-    private CompletableFuture<Void> unsafeStartStorageContainer(long scId) {
+    private CompletableFuture<StorageContainer> unsafeStartStorageContainer(long scId) {
         if (closed) {
             return FutureUtils.exception(new ObjectClosedException(COMPONENT_NAME));
         }
 
-        if (groups.containsKey(scId)) {
+        if (containers.containsKey(scId)) {
             return FutureUtils.exception(new StorageException("StorageContainer " + scId + " already registered"));
         }
 
         StorageContainer newStorageContainer = scFactory.createStorageContainer(scId);
-        StorageContainer oldStorageContainer = groups.putIfAbsent(scId, newStorageContainer);
+        StorageContainer oldStorageContainer = containers.putIfAbsent(scId, newStorageContainer);
         if (null != oldStorageContainer) {
             newStorageContainer.close();
             return FutureUtils.exception(new StorageException("StorageContainer " + scId + " already registered"));
         }
 
         log.info("Registered StorageContainer ('{}').", scId);
-        return newStorageContainer.start();
+        return newStorageContainer.start()
+            .whenComplete((container, cause) -> {
+                if (null != cause) {
+                    if (containers.remove(scId, newStorageContainer)) {
+                        log.warn("De-registered StorageContainer ('{}') when failed to start", scId, cause);
+                    } else {
+                        log.warn("Fail to de-register StorageContainer ('{}') when failed to start", scId, cause);
+                    }
+                } else {
+                    log.info("Successfully started registered StorageContainer ('{}').", scId);
+                }
+            });
     }
 
     @Override
-    public CompletableFuture<Void> stopStorageContainer(long scId) {
+    public CompletableFuture<Void> stopStorageContainer(long scId, StorageContainer container) {
         closeLock.readLock().lock();
         try {
-            return unsafeStopStorageContainer(scId);
+            return unsafeStopStorageContainer(scId, container);
         } finally {
             closeLock.readLock().unlock();
         }
     }
 
-    private CompletableFuture<Void> unsafeStopStorageContainer(long scId) {
+    private CompletableFuture<Void> unsafeStopStorageContainer(long scId, StorageContainer container) {
         if (closed) {
             return FutureUtils.exception(new ObjectClosedException(COMPONENT_NAME));
         }
 
-        StorageContainer group = groups.remove(scId);
+        if (null == container) {
+            StorageContainer existingContainer = containers.remove(scId);
+            if (null != existingContainer) {
+                log.info("Unregistered StorageContainer ('{}').", scId);
+                return existingContainer.stop();
+            } else {
+                return FutureUtils.Void();
+            }
+        } else {
+            boolean removed = containers.remove(scId, container);
 
-        if (null == group) {
-            return FutureUtils.value(null);
-        }
+            if (removed) {
+                log.info("Unregistered StorageContainer ('{}').", scId);
+            }
 
-        log.info("Unregistered StorageContainer ('{}').", scId);
-        return group.stop();
+            // no matter we successfully removed the containers or not, we need to close the current container.
+            // this ensure the resources held by the passed-in container are released.
+            return container.stop();
+        }
     }
 
     @Override
@@ -133,7 +155,7 @@ public class StorageContainerRegistryImpl implements StorageContainerRegistry {
         }
 
         // close all the ranges
-        groups.values().forEach(StorageContainer::close);
-        groups.clear();
+        containers.values().forEach(StorageContainer::close);
+        containers.clear();
     }
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java
new file mode 100644
index 0000000..f77244c
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManager.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+
+/**
+ * A zookeeper based implementation of {@link StorageContainerManager}.
+ */
+@Slf4j
+public class ZkStorageContainerManager
+    extends AbstractLifecycleComponent<StorageConfiguration>
+    implements StorageContainerManager, Consumer<Void> {
+
+    private final Endpoint endpoint;
+    private final ClusterMetadataStore metadataStore;
+    private final StorageContainerRegistry registry;
+    private final ScheduledExecutorService executor;
+
+    // for routing purpose
+    private volatile ClusterAssignmentData clusterAssignmentData;
+    private volatile Map<Endpoint, ServerAssignmentData> clusterAssignmentMap;
+    private volatile ServerAssignmentData myAssignmentData;
+    private volatile ConcurrentLongHashMap<Endpoint> containerAssignmentMap;
+
+    // a probe task to probe containers and make sure this manager running containers as assigned
+    private ScheduledFuture<?> containerProbeTask;
+    private final Duration probeInterval;
+
+    @Getter(AccessLevel.PACKAGE)
+    private final Map<Long, StorageContainer> liveContainers;
+    @Getter(AccessLevel.PACKAGE)
+    private final Set<Long> pendingStartStopContainers;
+
+    public ZkStorageContainerManager(Endpoint myEndpoint,
+                                     StorageConfiguration conf,
+                                     ClusterMetadataStore clusterMetadataStore,
+                                     StorageContainerRegistry registry,
+                                     StatsLogger statsLogger) {
+        super("zk-storage-container-manager", conf, statsLogger);
+        this.endpoint = myEndpoint;
+        this.metadataStore = clusterMetadataStore;
+        this.registry = registry;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+            new ThreadFactoryBuilder().setNameFormat("zk-storage-container-manager").build());
+        this.liveContainers = Collections.synchronizedMap(Maps.newConcurrentMap());
+        this.pendingStartStopContainers = Collections.synchronizedSet(Sets.newConcurrentHashSet());
+        this.containerAssignmentMap = new ConcurrentLongHashMap<>();
+        this.clusterAssignmentMap = Maps.newHashMap();
+        // probe the containers every 1/2 of controller scheduling interval. this ensures the manager
+        // can attempt to start containers before controller reassign them.
+        this.probeInterval = Duration.ofMillis(conf.getClusterControllerScheduleIntervalMs() / 2);
+    }
+
+    @Override
+    protected void doStart() {
+        // watch the cluster assignment data
+        metadataStore.watchClusterAssignmentData(this, executor);
+        log.info("Watched cluster assignment data.");
+
+        // schedule the container probe task
+        containerProbeTask = executor.scheduleAtFixedRate(
+            this::probeContainers, 0, probeInterval.toMillis(), TimeUnit.MILLISECONDS);
+        log.info("Scheduled storage container probe task at every {} ms", probeInterval.toMillis());
+    }
+
+    @Override
+    protected void doStop() {
+        // unwatch the cluster assignment data
+        metadataStore.unwatchClusterAssignmentData(this);
+
+        // cancel the probe task
+        if (!containerProbeTask.cancel(true)) {
+            log.warn("Failed to cancel the container probe task.");
+        }
+
+        stopContainers();
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        // close the registry to shutdown the containers
+        registry.close();
+        // shutdown the scheduler
+        executor.shutdown();
+    }
+
+    @Override
+    public Endpoint getStorageContainer(long scId) {
+        return containerAssignmentMap.get(scId);
+    }
+
+    void probeContainers() {
+        boolean isMyAssignmentRefreshed = refreshMyAssignment();
+        if (!isMyAssignmentRefreshed) {
+            // no change to my assignment, quitting
+            return;
+        }
+
+        if (myAssignmentData == null) {
+            // I don't have any containers assigned to me, so stop containers that I am running.
+            stopContainers();
+        } else {
+            processMyAssignment(myAssignmentData);
+        }
+    }
+
+    private boolean refreshMyAssignment() {
+        ClusterAssignmentData clusterAssignmentData = metadataStore.getClusterAssignmentData();
+
+        if (null == clusterAssignmentData) {
+            log.info("Cluster assignment data is empty, so skip refreshing");
+            return false;
+        }
+
+        Map<Endpoint, ServerAssignmentData> newAssignmentMap = clusterAssignmentData.getServersMap().entrySet()
+            .stream()
+            .collect(Collectors.toMap(
+                e -> NetUtils.parseEndpoint(e.getKey()),
+                e -> e.getValue()));
+
+        Set<Endpoint> oldAssignedServers = clusterAssignmentMap.keySet();
+        Set<Endpoint> newAssignedServers = newAssignmentMap.keySet();
+
+        Set<Endpoint> serversJoined = Sets.difference(newAssignedServers, oldAssignedServers).immutableCopy();
+        Set<Endpoint> serversLeft = Sets.difference(oldAssignedServers, newAssignedServers).immutableCopy();
+        Set<Endpoint> commonServers = Sets.intersection(newAssignedServers, oldAssignedServers).immutableCopy();
+
+        processServersLeft(serversLeft, clusterAssignmentMap);
+        processServersJoined(serversJoined, newAssignmentMap);
+        processServersAssignmentChanged(commonServers, clusterAssignmentMap, newAssignmentMap);
+
+        this.clusterAssignmentMap = newAssignmentMap;
+        myAssignmentData = newAssignmentMap.get(endpoint);
+        return true;
+    }
+
+    private void processServersJoined(Set<Endpoint> serversJoined,
+                                      Map<Endpoint, ServerAssignmentData> newAssignmentMap) {
+        log.info("Servers joined : {}", serversJoined);
+        serversJoined.forEach(ep -> {
+            ServerAssignmentData sad = newAssignmentMap.get(ep);
+            if (null != sad) {
+                sad.getContainersList().forEach(container -> containerAssignmentMap.put(container, ep));
+            }
+        });
+    }
+
+    private void processServersLeft(Set<Endpoint> serversLeft,
+                                    Map<Endpoint, ServerAssignmentData> oldAssignmentMap) {
+        log.info("Servers left : {}", serversLeft);
+        serversLeft.forEach(ep -> {
+            ServerAssignmentData sad = oldAssignmentMap.get(ep);
+            if (null != sad) {
+                sad.getContainersList().forEach(container -> containerAssignmentMap.remove(container, ep));
+            }
+        });
+    }
+
+    private void processServersAssignmentChanged(Set<Endpoint> commonServers,
+                                                 Map<Endpoint, ServerAssignmentData> oldAssignmentMap,
+                                                 Map<Endpoint, ServerAssignmentData> newAssignmentMap) {
+        commonServers.forEach(ep -> {
+
+            ServerAssignmentData oldSad = oldAssignmentMap.getOrDefault(ep, ServerAssignmentData.getDefaultInstance());
+            ServerAssignmentData newSad = newAssignmentMap.getOrDefault(ep, ServerAssignmentData.getDefaultInstance());
+
+            if (oldSad.equals(newSad)) {
+                return;
+            } else {
+                log.info("Server assignment is change for {}:\nold assignment: {}\nnew assignment: {}",
+                    NetUtils.endpointToString(ep), oldSad, newSad);
+                oldSad.getContainersList().forEach(container -> containerAssignmentMap.remove(container, ep));
+                newSad.getContainersList().forEach(container -> containerAssignmentMap.put(container, ep));
+            }
+
+        });
+    }
+
+
+    private void stopContainers() {
+        Set<Long> liveContainerSet = ImmutableSet.copyOf(liveContainers.keySet());
+        liveContainerSet.forEach(this::stopStorageContainer);
+    }
+
+    private void processMyAssignment(ServerAssignmentData myAssignment) {
+        Set<Long> assignedContainerSet = myAssignment.getContainersList().stream().collect(Collectors.toSet());
+        Set<Long> liveContainerSet = Sets.newHashSet(liveContainers.keySet());
+
+        Set<Long> containersToStart =
+            Sets.newHashSet(Sets.difference(assignedContainerSet, liveContainerSet).immutableCopy());
+        Set<Long> containersToStop =
+            Sets.newHashSet(Sets.difference(liveContainerSet, assignedContainerSet).immutableCopy());
+
+        // if the containers are already in the pending start/stop list, we don't touch it until they are completed.
+
+        containersToStart =
+            Sets.filter(containersToStart, container -> !pendingStartStopContainers.contains(container));
+        containersToStop =
+            Sets.filter(containersToStop, container -> !pendingStartStopContainers.contains(container));
+
+        log.info("Process container changes:\n\tIdeal = {}\n\tLive = {}\n\t"
+            + "Pending = {}\n\tToStart = {}\n\tToStop = {}",
+            assignedContainerSet, liveContainerSet, pendingStartStopContainers, containersToStart, containersToStop);
+
+        containersToStart.forEach(this::startStorageContainer);
+        containersToStop.forEach(this::stopStorageContainer);
+    }
+
+    private CompletableFuture<StorageContainer> startStorageContainer(long scId) {
+        log.info("Starting storage container ({})", scId);
+        StorageContainer sc = liveContainers.get(scId);
+        if (null != sc) {
+            log.warn("Storage container ({}) is already started", scId);
+            return FutureUtils.value(sc);
+        } else {
+            // register the container to pending list
+            pendingStartStopContainers.add(scId);
+            return registry
+                .startStorageContainer(scId)
+                .whenComplete((container, cause ) -> {
+                    try {
+                        if (null != cause) {
+                            log.warn("Failed to start storage container ({})", scId, cause);
+                        } else {
+                            log.info("Successfully started storage container ({})", scId);
+                            addStorageContainer(scId, container);
+                        }
+                    } finally {
+                        pendingStartStopContainers.remove(scId);
+                    }
+                });
+        }
+    }
+
+    private CompletableFuture<Void> stopStorageContainer(long scId) {
+        log.info("Stopping storage container ({})", scId);
+        StorageContainer sc = liveContainers.get(scId);
+        if (null == sc) {
+            log.warn("Storage container ({}) is not alive anymore", scId);
+            return FutureUtils.Void();
+        } else {
+            // register the container to pending list
+            pendingStartStopContainers.add(scId);
+            return registry
+                .stopStorageContainer(scId, sc)
+                .whenComplete((container, cause) -> {
+                    try {
+                        if (cause != null) {
+                            log.warn("Failed to stop storage container ({})", scId, cause);
+                        } else {
+                            log.info("Successfully stopped storage container ({})", scId);
+                            removeStorageContainer(scId, sc);
+                        }
+
+                    } finally {
+                        pendingStartStopContainers.remove(scId);
+                    }
+                });
+        }
+    }
+
+    private StorageContainer addStorageContainer(long scId, StorageContainer sc) {
+        StorageContainer oldSc = liveContainers.putIfAbsent(scId, sc);
+        if (null == oldSc) {
+            log.info("Storage container ({}) is added to live set.", sc);
+            return sc;
+        } else {
+            log.warn("Storage container ({}) has already been added to live set", sc);
+            sc.stop();
+            return oldSc;
+        }
+    }
+
+    private void removeStorageContainer(long scId, StorageContainer sc) {
+        if (liveContainers.remove(scId, sc)) {
+            log.info("Storage container ({}) is removed from live set.", scId);
+        } else {
+            log.warn("Storage container ({}) can't be removed from live set.", scId);
+        }
+    }
+
+    //
+    // Callback on cluster assignment data changed.
+    //
+
+    @Override
+    public void accept(Void aVoid) {
+        // any time if the cluster assignment data is changed, schedule a probe task
+        this.executor.submit(() -> probeContainers());
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/HelixStorageContainerManager.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/HelixStorageContainerManager.java
deleted file mode 100644
index 62a0f25..0000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/HelixStorageContainerManager.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.sc.helix;
-
-import static org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageController.RESOURCE_NAME;
-import static org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageController.getEndpointName;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.spectator.RoutingTableProvider;
-
-/**
- * A storage container manager implemented using Helix.
- */
-@Slf4j
-public class HelixStorageContainerManager
-    extends AbstractLifecycleComponent<StorageConfiguration>
-    implements StorageContainerManager {
-
-    private final String zkServers;
-    private final String clusterName;
-    private final StorageContainerRegistry registry;
-    private final Endpoint endpoint;
-    private final Optional<String> endpointName;
-    private final HelixManager manager;
-    private final RoutingTableProvider rtProvider;
-
-    public HelixStorageContainerManager(String zkServers,
-                                        String clusterName,
-                                        StorageConfiguration conf,
-                                        StorageContainerRegistry registry,
-                                        Endpoint endpoint,
-                                        Optional<String> endpointName,
-                                        StatsLogger statsLogger) {
-        super("helix-storage-container-manager", conf, statsLogger);
-        this.zkServers = zkServers;
-        this.clusterName = clusterName;
-        this.registry = registry;
-        this.endpoint = endpoint;
-        this.endpointName = endpointName;
-        // create the helix manager
-        this.manager = HelixManagerFactory.getZKHelixManager(
-            clusterName,
-            endpointName.orElse(getEndpointName(endpoint)),
-            InstanceType.CONTROLLER_PARTICIPANT,
-            zkServers);
-        this.rtProvider = new RoutingTableProvider();
-    }
-
-    @Override
-    public Endpoint getStorageContainer(long scId) {
-        List<InstanceConfig> instances = this.rtProvider.getInstances(
-            RESOURCE_NAME, RESOURCE_NAME + "_" + scId, "WRITE");
-        if (instances.isEmpty()) {
-            return null;
-        } else {
-            InstanceConfig instance = instances.get(0);
-            return Endpoint.newBuilder()
-                .setHostname(instance.getHostName())
-                .setPort(Integer.parseInt(instance.getPort()))
-                .build();
-        }
-    }
-
-    @Override
-    protected void doStart() {
-        // create the controller
-        try (HelixStorageController controller = new HelixStorageController(zkServers)) {
-            controller.addNode(clusterName, endpoint, endpointName);
-        }
-        StateMachineEngine sme = this.manager.getStateMachineEngine();
-        StateModelFactory<StateModel> smFactory = new WriteReadStateModelFactory(registry);
-        sme.registerStateModelFactory(WriteReadSMD.NAME, smFactory);
-        try {
-            manager.connect();
-            manager.addExternalViewChangeListener(rtProvider);
-        } catch (Exception e) {
-            throw new StorageRuntimeException(e);
-        }
-    }
-
-    @Override
-    protected void doStop() {
-        manager.disconnect();
-    }
-
-    @Override
-    protected void doClose() throws IOException {
-
-    }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/HelixStorageController.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/HelixStorageController.java
deleted file mode 100644
index f76c21d..0000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/HelixStorageController.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.sc.helix;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.List;
-import java.util.Optional;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-import org.apache.bookkeeper.stream.storage.api.controller.StorageController;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.model.InstanceConfig;
-
-/**
- * A storage controller client based on Helix.
- */
-@Slf4j
-public class HelixStorageController implements StorageController {
-
-    static final String RESOURCE_NAME = "storagecontainers";
-
-    private final ZKHelixAdmin admin;
-
-    public HelixStorageController(String zkServers) {
-        this.admin = new ZKHelixAdmin(zkServers);
-    }
-
-    @VisibleForTesting
-    ZKHelixAdmin getAdmin() {
-        return admin;
-    }
-
-    static long getStorageContainerFromPartitionName(String partitionName) {
-        return Long.parseLong(partitionName.replace(RESOURCE_NAME + "_", ""));
-    }
-
-    @Override
-    public void createCluster(String clusterName,
-                              int numStorageContainers,
-                              int numReplicas) {
-        List<String> clusters = admin.getClusters();
-        if (clusters.contains(clusterName)) {
-            return;
-        }
-        log.info("Creating new cluster : {}", clusterName);
-
-        admin.addCluster(clusterName);
-        admin.addStateModelDef(clusterName, "WriteRead", WriteReadSMD.build());
-        admin.addResource(
-            clusterName,
-            RESOURCE_NAME,
-            numStorageContainers,
-            WriteReadSMD.NAME,
-            "FULL_AUTO");
-        admin.rebalance(clusterName, RESOURCE_NAME, 3);
-
-        log.info("Created new cluster : {}", clusterName);
-    }
-
-    static String getEndpointName(Endpoint endpoint) {
-        return endpoint.getHostname() + "_" + endpoint.getPort();
-    }
-
-    @Override
-    public void addNode(String clusterName,
-                        Endpoint endpoint,
-                        Optional<String> endpointNameOptional) {
-        String endpointName = endpointNameOptional.orElse(getEndpointName(endpoint));
-        if (admin.getInstancesInCluster(clusterName)
-            .contains(endpointName)) {
-            log.info("Instance {} already exists in cluster {}, skip creating the instance",
-                endpointName, clusterName);
-            return;
-        }
-        log.info("Adding a new instance {} ({}) to the cluster {}.",
-            new Object[]{endpointName, endpoint, clusterName});
-        InstanceConfig config = new InstanceConfig(endpointName);
-        config.setHostName(endpoint.getHostname());
-        config.setPort(Integer.toString(endpoint.getPort()));
-        admin.addInstance(clusterName, config);
-    }
-
-    @Override
-    public void close() {
-        admin.close();
-    }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/WriteReadSMD.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/WriteReadSMD.java
deleted file mode 100644
index 6104919..0000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/WriteReadSMD.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.sc.helix;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.model.StateModelDefinition;
-
-/**
- * The state model definition for write-read transition.
- */
-public final class WriteReadSMD extends StateModelDefinition {
-
-    public static final String NAME = "WriteRead";
-
-    /**
-     * The state for write and read.
-     */
-    public enum States {
-        WRITE,
-        READ,
-        OFFLINE
-    }
-
-    public WriteReadSMD() {
-        super(generateConfigForWriteRead());
-    }
-
-    /**
-     * Build Write-Read state model definition.
-     *
-     * @return State model definition.
-     */
-    public static StateModelDefinition build() {
-        StateModelDefinition.Builder builder = new StateModelDefinition.Builder(NAME);
-        // init state
-        builder.initialState(States.OFFLINE.name());
-
-        // add states
-        builder.addState(States.WRITE.name(), 0);
-        builder.addState(States.READ.name(), 1);
-        builder.addState(States.OFFLINE.name(), 2);
-        for (HelixDefinedState state : HelixDefinedState.values()) {
-            builder.addState(state.name());
-        }
-
-        // add transitions
-        builder.addTransition(States.WRITE.name(), States.READ.name(), 0);
-        builder.addTransition(States.READ.name(), States.WRITE.name(), 1);
-        builder.addTransition(States.OFFLINE.name(), States.READ.name(), 2);
-        builder.addTransition(States.READ.name(), States.OFFLINE.name(), 3);
-        builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());
-
-        // bounds
-        builder.upperBound(States.WRITE.name(), 1);
-        builder.dynamicUpperBound(States.READ.name(), "R");
-
-        return builder.build();
-    }
-
-    /**
-     * Generate Write-slave state model definition.
-     *
-     * @return ZNRecord.
-     */
-    public static ZNRecord generateConfigForWriteRead() {
-        ZNRecord record = new ZNRecord("WriteRead");
-        record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), "OFFLINE");
-        List<String> statePriorityList = new ArrayList<String>();
-        statePriorityList.add("WRITE");
-        statePriorityList.add("READ");
-        statePriorityList.add("OFFLINE");
-        statePriorityList.add("DROPPED");
-        statePriorityList.add("ERROR");
-        record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
-            statePriorityList);
-        for (String state : statePriorityList) {
-            String key = state + ".meta";
-            Map<String, String> metadata = new HashMap<String, String>();
-            if (state.equals("WRITE")) {
-                metadata.put("count", "1");
-                record.setMapField(key, metadata);
-            } else if (state.equals("READ")) {
-                metadata.put("count", "R");
-                record.setMapField(key, metadata);
-            } else if (state.equals("OFFLINE")) {
-                metadata.put("count", "-1");
-                record.setMapField(key, metadata);
-            } else if (state.equals("DROPPED")) {
-                metadata.put("count", "-1");
-                record.setMapField(key, metadata);
-            } else if (state.equals("ERROR")) {
-                metadata.put("count", "-1");
-                record.setMapField(key, metadata);
-            }
-        }
-        for (String state : statePriorityList) {
-            String key = state + ".next";
-            if (state.equals("WRITE")) {
-                Map<String, String> metadata = new HashMap<String, String>();
-                metadata.put("READ", "READ");
-                metadata.put("OFFLINE", "READ");
-                metadata.put("DROPPED", "READ");
-                record.setMapField(key, metadata);
-            } else if (state.equals("READ")) {
-                Map<String, String> metadata = new HashMap<String, String>();
-                metadata.put("WRITE", "WRITE");
-                metadata.put("OFFLINE", "OFFLINE");
-                metadata.put("DROPPED", "OFFLINE");
-                record.setMapField(key, metadata);
-            } else if (state.equals("OFFLINE")) {
-                Map<String, String> metadata = new HashMap<String, String>();
-                metadata.put("READ", "READ");
-                metadata.put("WRITE", "READ");
-                metadata.put("DROPPED", "DROPPED");
-                record.setMapField(key, metadata);
-            } else if (state.equals("ERROR")) {
-                Map<String, String> metadata = new HashMap<String, String>();
-                metadata.put("OFFLINE", "OFFLINE");
-                record.setMapField(key, metadata);
-            }
-        }
-        List<String> stateTransitionPriorityList = new ArrayList<String>();
-        stateTransitionPriorityList.add("WRITE-READ");
-        stateTransitionPriorityList.add("READ-WRITE");
-        stateTransitionPriorityList.add("OFFLINE-READ");
-        stateTransitionPriorityList.add("READ-OFFLINE");
-        stateTransitionPriorityList.add("OFFLINE-DROPPED");
-        record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
-            stateTransitionPriorityList);
-        return record;
-    }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/WriteReadStateModelFactory.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/WriteReadStateModelFactory.java
deleted file mode 100644
index eb6e86b..0000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/WriteReadStateModelFactory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.sc.helix;
-
-import static org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageController.getStorageContainerFromPartitionName;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-
-/**
- * Storage Container State Model Factory.
- *
- * <p>It is used by helix for managing the state transition.
- */
-@Slf4j
-public class WriteReadStateModelFactory extends StateModelFactory<StateModel> {
-
-    private final StorageContainerRegistry registry;
-
-    public WriteReadStateModelFactory(StorageContainerRegistry registry) {
-        this.registry = registry;
-    }
-
-    @Override
-    public StateModel createNewStateModel(String resourceName, String partitionName) {
-        return new WriteReadStateModel(registry);
-    }
-
-    /**
-     * The state model for storage container.
-     */
-    @StateModelInfo(states = "{'OFFLINE','READ','WRITE'}", initialState = "OFFLINE")
-    public static class WriteReadStateModel extends StateModel {
-
-        private final StorageContainerRegistry registry;
-
-        WriteReadStateModel(StorageContainerRegistry registry) {
-            this.registry = registry;
-        }
-
-        @Transition(from = "OFFLINE", to = "READ")
-        public void offlineToRead(Message msg,
-                                  NotificationContext ctx) {
-            log.info("----- [OFFLINE --> READ] {} / {}", msg.getResourceName(), msg.getPartitionName());
-            // do nothing now
-        }
-
-        @Transition(from = "READ", to = "WRITE")
-        public void readToWrite(Message msg,
-                                NotificationContext ctx) throws Exception {
-            log.info("----- [READ --> WRITE] {} / {}", msg.getResourceName(), msg.getPartitionName());
-            try {
-                FutureUtils.result(registry.startStorageContainer(
-                    getStorageContainerFromPartitionName(msg.getPartitionName())));
-            } catch (Exception e) {
-                log.error("----- [READ --> WRITE] {} / {} failed",
-                    new Object[]{msg.getResourceName(), msg.getPartitionName(), e});
-                throw e;
-            }
-        }
-
-        @Transition(from = "WRITE", to = "READ")
-        public void writeToRead(Message msg,
-                                NotificationContext ctx) throws Exception {
-            log.info("----- [WRITE --> READ] {} / {}", msg.getResourceName(), msg.getPartitionName());
-            try {
-                FutureUtils.result(registry.stopStorageContainer(
-                    getStorageContainerFromPartitionName(msg.getPartitionName())));
-            } catch (Exception e) {
-                log.error("----- [WRITE --> READ] {} / {} failed",
-                    new Object[]{msg.getResourceName(), msg.getPartitionName(), e});
-                throw e;
-            }
-        }
-
-        @Transition(from = "READ", to = "OFFLINE")
-        public void readToOffline(Message msg,
-                                  NotificationContext ctx) {
-            log.info("----- [READ --> OFFLINE] {} / {}", msg.getResourceName(), msg.getPartitionName());
-            // do nothing now
-        }
-
-    }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/package-info.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/package-info.java
deleted file mode 100644
index 018df17..0000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Helix based storage container management.
- */
-package org.apache.bookkeeper.stream.storage.impl.sc.helix;
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerImplTest.java
new file mode 100644
index 0000000..63967a6
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerImplTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterController;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeaderSelector;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerController;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ClusterControllerImpl}.
+ */
+public class ClusterControllerImplTest {
+
+    private ClusterControllerLeaderSelector leaderSelector;
+
+    private ClusterController service;
+
+    @Before
+    public void setup() {
+        this.leaderSelector = mock(ClusterControllerLeaderSelector.class);
+        this.service = new ClusterControllerImpl(
+            mock(ClusterMetadataStore.class),
+            mock(RegistrationClient.class),
+            mock(StorageContainerController.class),
+            leaderSelector,
+            new StorageConfiguration(new CompositeConfiguration()));
+    }
+
+    @Test
+    public void testInitialize() {
+        verify(leaderSelector, times(1))
+            .initialize(any(ClusterControllerLeader.class));
+    }
+
+    @Test
+    public void testStart() {
+        service.start();
+        verify(leaderSelector, times(1)).start();
+    }
+
+    @Test
+    public void testStop() {
+        service.start();
+        service.stop();
+        verify(leaderSelector, times(1)).close();
+    }
+
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
new file mode 100644
index 0000000..4d2828b
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerController;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.collections.Sets;
+
+/**
+ * Unit test {@link ClusterControllerLeaderImpl}.
+ */
+@Slf4j
+public class ClusterControllerLeaderImplTest {
+
+    private static final int NUM_STORAGE_CONTAINERS = 32;
+
+    private ClusterMetadataStore metadataStore;
+    private ClusterControllerLeaderImpl clusterController;
+    private ExecutorService leaderExecutor;
+
+    private final Semaphore coordSem = new Semaphore(0);
+    private final AtomicReference<RegistrationListener> regListenerRef
+        = new AtomicReference<>(null);
+    private final CompletableFuture<Void> watchFuture = new CompletableFuture<>();
+
+    @Before
+    public void setup() {
+        this.metadataStore = spy(new InMemClusterMetadataStore(NUM_STORAGE_CONTAINERS));
+        this.metadataStore.initializeCluster(NUM_STORAGE_CONTAINERS);
+        // wrap the metadata store with the coord sem with coordinating testing
+        ClusterMetadataStore originalStore = metadataStore;
+        this.metadataStore = new ClusterMetadataStore() {
+            @Override
+            public void initializeCluster(int numStorageContainers) {
+                originalStore.initializeCluster(numStorageContainers);
+            }
+
+            @Override
+            public ClusterAssignmentData getClusterAssignmentData() {
+                return originalStore.getClusterAssignmentData();
+            }
+
+            @Override
+            public void updateClusterAssignmentData(ClusterAssignmentData assignmentData) {
+                originalStore.updateClusterAssignmentData(assignmentData);
+
+                // notify when cluster assignment data is updated.
+                coordSem.release();
+            }
+
+            @Override
+            public void watchClusterAssignmentData(Consumer<Void> watcher, Executor executor) {
+                originalStore.watchClusterAssignmentData(watcher, executor);
+            }
+
+            @Override
+            public void unwatchClusterAssignmentData(Consumer<Void> watcher) {
+                originalStore.unwatchClusterAssignmentData(watcher);
+            }
+
+            @Override
+            public ClusterMetadata getClusterMetadata() {
+                return originalStore.getClusterMetadata();
+            }
+
+            @Override
+            public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
+                originalStore.updateClusterMetadata(clusterMetadata);
+            }
+
+            @Override
+            public void close() {
+                originalStore.close();
+            }
+        };
+
+        StorageContainerController scController = spy(new DefaultStorageContainerController());
+        RegistrationClient mockRegClient = mock(RegistrationClient.class);
+        when(mockRegClient.watchWritableBookies(any(RegistrationListener.class)))
+            .thenAnswer(invocationOnMock -> {
+                RegistrationListener listener = invocationOnMock.getArgument(0);
+                regListenerRef.set(listener);
+                return watchFuture;
+            });
+        doAnswer(invocationOnMock -> {
+            RegistrationListener listener = invocationOnMock.getArgument(0);
+            regListenerRef.compareAndSet(listener, null);
+            return null;
+        }).when(mockRegClient).unwatchWritableBookies(any(RegistrationListener.class));
+
+        this.clusterController = new ClusterControllerLeaderImpl(
+            metadataStore,
+            scController,
+            mockRegClient,
+            Duration.ofMillis(10));
+        this.leaderExecutor = Executors.newSingleThreadExecutor();
+    }
+
+    @After
+    public void teardown() {
+        if (null != metadataStore) {
+            metadataStore.close();
+        }
+        if (null != leaderExecutor) {
+            leaderExecutor.shutdown();
+        }
+    }
+
+    @Test
+    public void testProcessAsLeader() throws Exception {
+        clusterController.suspend();
+        assertTrue(clusterController.isSuspended());
+
+        // start the leader controller
+        leaderExecutor.submit(() -> {
+            try {
+                clusterController.processAsLeader();
+            } catch (Exception e) {
+                log.info("Encountered exception when cluster controller processes as a leader", e);
+            }
+        });
+
+        // resume the controller
+        clusterController.resume();
+        assertFalse(clusterController.isSuspended());
+
+        // simulate `watchWritableBookies` is done, the listener should be registered
+        FutureUtils.complete(watchFuture, null);
+        assertNotNull(regListenerRef);
+
+        // once the controller is resumed, it should start processing server change
+        // but since there is no servers available, the storage controller will not compute any ideal state
+        // for the assignment and `lastSuccessfulAssignmentAt` will remain negative.
+        assertFalse(coordSem.tryAcquire(1, TimeUnit.SECONDS));
+        assertTrue(clusterController.getLastSuccessfulAssigmentAt() < 0);
+
+        // notify the registration client that a new host is added
+        Set<BookieSocketAddress> cluster = Sets.newSet(
+            new BookieSocketAddress("127.0.0.1", 4181));
+        Version version = new LongVersion(0L);
+
+        regListenerRef.get().onBookiesChanged(new Versioned<>(cluster, version));
+        // the cluster controller will be notified with cluster change and storage controller will compute
+        // the assignment state. cluster metadata store should be used for updating cluster assignment data.
+        coordSem.acquire();
+        assertTrue(clusterController.getLastSuccessfulAssigmentAt() > 0);
+        long lastSuccessfulAssignmentAt = clusterController.getLastSuccessfulAssigmentAt();
+
+        // notify the cluster controller with same cluster, cluster controller should not attempt to update
+        // the assignment
+        regListenerRef.get().onBookiesChanged(new Versioned<>(cluster, version));
+        assertFalse(coordSem.tryAcquire(200, TimeUnit.MILLISECONDS));
+        assertEquals(lastSuccessfulAssignmentAt, clusterController.getLastSuccessfulAssigmentAt());
+
+        // multiple hosts added and removed
+        cluster.add(new BookieSocketAddress("127.0.0.1", 4182));
+        cluster.add(new BookieSocketAddress("127.0.0.1", 4183));
+        cluster.add(new BookieSocketAddress("127.0.0.1", 4184));
+        cluster.add(new BookieSocketAddress("127.0.0.1", 4185));
+        version = new LongVersion(1L);
+
+        regListenerRef.get().onBookiesChanged(new Versioned<>(cluster, version));
+        // the cluster controller should update assignment data if cluster is changed
+        coordSem.acquire();
+        assertTrue(clusterController.getLastSuccessfulAssigmentAt() > lastSuccessfulAssignmentAt);
+        lastSuccessfulAssignmentAt = clusterController.getLastSuccessfulAssigmentAt();
+
+        // if cluster information is changed to empty, cluster controller should not be eager to change
+        // the assignment.
+        regListenerRef.get().onBookiesChanged(new Versioned<>(Collections.emptySet(), new LongVersion(2L)));
+        assertFalse(coordSem.tryAcquire(1, TimeUnit.SECONDS));
+        assertEquals(lastSuccessfulAssignmentAt, clusterController.getLastSuccessfulAssigmentAt());
+    }
+
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStoreTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStoreTest.java
new file mode 100644
index 0000000..3da8791
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStoreTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import lombok.Cleanup;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link InMemClusterMetadataStore}.
+ */
+public class InMemClusterMetadataStoreTest {
+
+    private static final int NUM_STORAGE_CONTAINERS = 1024;
+
+    private InMemClusterMetadataStore store;
+
+    @Before
+    public void setup() {
+        store = new InMemClusterMetadataStore(NUM_STORAGE_CONTAINERS);
+    }
+
+    @After
+    public void teardown() {
+        if (null != store) {
+            store.close();
+        }
+    }
+
+    @Test
+    public void testUnitialized() {
+        assertEquals(
+            ClusterMetadata.newBuilder().setNumStorageContainers(NUM_STORAGE_CONTAINERS).build(),
+            store.getClusterMetadata());
+        assertEquals(
+            ClusterAssignmentData.newBuilder().build(),
+            store.getClusterAssignmentData());
+    }
+
+    @Test
+    public void testInitialize() {
+        int numStorageContainers = 2048;
+        store.initializeCluster(numStorageContainers);
+        assertEquals(
+            ClusterMetadata.newBuilder().setNumStorageContainers(numStorageContainers).build(),
+            store.getClusterMetadata());
+        assertEquals(
+            ClusterAssignmentData.newBuilder().build(),
+            store.getClusterAssignmentData());
+    }
+
+    @Test
+    public void testUpdateClusterMetadata() {
+       int numStorageContainers = 4096;
+       ClusterMetadata metadata = ClusterMetadata.newBuilder()
+           .setNumStorageContainers(numStorageContainers)
+           .build();
+       store.updateClusterMetadata(metadata);
+       assertEquals(metadata, store.getClusterMetadata());
+    }
+
+    @Test
+    public void testUpdateClusterAssignmentData() {
+        ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder()
+            .putServers(
+                "server-0",
+                ServerAssignmentData.newBuilder()
+                    .addContainers(1L)
+                    .addContainers(2L)
+                    .build())
+            .build();
+        store.updateClusterAssignmentData(assignmentData);
+        assertEquals(assignmentData, store.getClusterAssignmentData());
+    }
+
+    @Test
+    public void testWatchClusterAssignmentData() {
+        ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder()
+            .putServers(
+                "server-0",
+                ServerAssignmentData.newBuilder()
+                    .addContainers(1L)
+                    .addContainers(2L)
+                    .build())
+            .build();
+
+        @Cleanup("shutdown")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CompletableFuture<Void> watchFuture = new CompletableFuture<>();
+
+        store.watchClusterAssignmentData(data -> {
+            FutureUtils.complete(watchFuture, null);
+        }, executor);
+
+        store.updateClusterAssignmentData(assignmentData);
+
+        watchFuture.join();
+        assertEquals(assignmentData, store.getClusterAssignmentData());
+    }
+
+    @Test
+    public void testUnwatchClusterAssignmentData() throws Exception {
+        ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder()
+            .putServers(
+                "server-0",
+                ServerAssignmentData.newBuilder()
+                    .addContainers(1L)
+                    .addContainers(2L)
+                    .build())
+            .build();
+
+        @Cleanup("shutdown")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        CompletableFuture<Void> watchFuture = new CompletableFuture<>();
+        CountDownLatch latch = new CountDownLatch(2);
+
+        Consumer<Void> dataConsumer = ignored -> {
+            latch.countDown();
+            FutureUtils.complete(watchFuture, null);
+        };
+
+        assertEquals(0, store.getNumWatchers());
+        store.watchClusterAssignmentData(dataConsumer, executor);
+        assertEquals(1, store.getNumWatchers());
+        store.updateClusterAssignmentData(assignmentData);
+
+        watchFuture.join();
+        assertEquals(1, latch.getCount());
+        assertEquals(assignmentData, store.getClusterAssignmentData());
+
+        store.unwatchClusterAssignmentData(dataConsumer);
+        assertEquals(0, store.getNumWatchers());
+        store.updateClusterAssignmentData(assignmentData);
+
+        watchFuture.get(100, TimeUnit.MILLISECONDS);
+        assertEquals(1, latch.getCount());
+    }
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorListenerTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorListenerTest.java
new file mode 100644
index 0000000..bebed2b
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorListenerTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ZkClusterControllerLeaderSelectorListener}.
+ */
+public class ZkClusterControllerLeaderSelectorListenerTest {
+
+    @Test
+    public void testTakeLeadership() throws Exception {
+        ClusterControllerLeader leaderLogic = mock(ClusterControllerLeader.class);
+
+        ZkClusterControllerLeaderSelectorListener listener = new ZkClusterControllerLeaderSelectorListener(leaderLogic);
+        CuratorFramework curator = mock(CuratorFramework.class);
+        listener.takeLeadership(curator);
+
+        // if the listener is taking the leadership, it should execute the leader logic
+        verify(leaderLogic, times(1)).processAsLeader();
+    }
+
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorTest.java
new file mode 100644
index 0000000..c072d00
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterControllerLeaderSelectorTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterControllerLeader;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.distributedlog.ZooKeeperClusterTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Unit test of {@link ZkClusterControllerLeaderSelector}.
+ */
+@Slf4j
+public class ZkClusterControllerLeaderSelectorTest extends ZooKeeperClusterTestCase {
+
+    @Rule
+    public final TestName runtime = new TestName();
+
+    private CuratorFramework curatorClient;
+    private String zkRootPath;
+    private ZkClusterControllerLeaderSelector selector;
+
+    @Before
+    public void setup() throws Exception {
+        curatorClient = CuratorFrameworkFactory.newClient(
+            zkServers,
+            new ExponentialBackoffRetry(200, 10, 5000));
+        curatorClient.start();
+
+        zkRootPath = "/" + runtime.getMethodName();
+        curatorClient.create().forPath(zkRootPath);
+
+        selector = new ZkClusterControllerLeaderSelector(curatorClient, zkRootPath);
+    }
+
+    @After
+    public void teardown() {
+        if (null != selector) {
+            selector.close();
+        }
+        curatorClient.close();
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testStartBeforeInitialize() {
+        selector.start();
+    }
+
+    @Test
+    public void testLeaderElection() throws InterruptedException {
+        CountDownLatch leaderLatch = new CountDownLatch(1);
+        selector.initialize(new ClusterControllerLeader() {
+            @Override
+            public void processAsLeader() throws Exception {
+                log.info("Become leader");
+                leaderLatch.countDown();
+                try {
+                    TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
+                } catch (InterruptedException ie) {
+                    log.info("Leadership is interrupted");
+                    Thread.currentThread().interrupt();
+                }
+                log.info("Ended leadership");
+            }
+
+            @Override
+            public void suspend() {
+
+            }
+
+            @Override
+            public void resume() {
+
+            }
+        });
+
+        selector.start();
+        leaderLatch.await();
+        assertTrue("Should successfully become leader", true);
+
+        log.info("Ended test");
+    }
+
+    @Test
+    public void testStateChangedToLost() throws InterruptedException {
+        CountDownLatch leaderLatch = new CountDownLatch(1);
+        CountDownLatch interruptedLatch = new CountDownLatch(1);
+        selector.initialize(new ClusterControllerLeader() {
+            @Override
+            public void processAsLeader() throws Exception {
+                log.info("Become leader");
+                leaderLatch.countDown();
+                try {
+                    TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
+                } catch (InterruptedException ie) {
+                    log.info("Leader is interrupted", ie);
+                    Thread.currentThread().interrupt();
+                    interruptedLatch.countDown();
+                }
+            }
+
+            @Override
+            public void suspend() {
+            }
+
+            @Override
+            public void resume() {
+            }
+        });
+
+        selector.start();
+
+        leaderLatch.await();
+
+        assertTrue("Should successfully become leader", true);
+
+        selector.stateChanged(curatorClient, ConnectionState.LOST);
+
+        interruptedLatch.await();
+
+        assertTrue("Leader should be interrupted", true);
+    }
+
+    @Test
+    public void testStateChangedToSuspendedResumed() throws InterruptedException {
+        CountDownLatch leaderLatch = new CountDownLatch(1);
+        CountDownLatch suspendedLatch = new CountDownLatch(1);
+        CountDownLatch resumeLatch = new CountDownLatch(1);
+        selector.initialize(new ClusterControllerLeader() {
+            @Override
+            public void processAsLeader() throws Exception {
+                log.info("Become leader");
+                leaderLatch.countDown();
+                try {
+                    TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
+                } catch (InterruptedException ie) {
+                    log.info("Leader is interrupted", ie);
+                    Thread.currentThread().interrupt();
+                }
+            }
+
+            @Override
+            public void suspend() {
+                suspendedLatch.countDown();
+            }
+
+            @Override
+            public void resume() {
+                resumeLatch.countDown();
+            }
+        });
+
+        selector.start();
+        leaderLatch.await();
+        assertTrue("Should successfully become leader", true);
+
+        selector.stateChanged(curatorClient, ConnectionState.SUSPENDED);
+        suspendedLatch.await();
+
+        assertTrue("Leader should be suspended", true);
+
+        selector.stateChanged(curatorClient, ConnectionState.RECONNECTED);
+        resumeLatch.await();
+
+        assertTrue("Leader should be resumed", true);
+    }
+
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStoreTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStoreTest.java
new file mode 100644
index 0000000..1c224db
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStoreTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
+import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.distributedlog.ZooKeeperClusterTestCase;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Unit test {@link ZkClusterMetadataStore}.
+ */
+@Slf4j
+public class ZkClusterMetadataStoreTest extends ZooKeeperClusterTestCase {
+
+    private static final int NUM_STORAGE_CONTAINERS = 1024;
+
+    @Rule
+    public final TestName runtime = new TestName();
+
+    private CuratorFramework curatorClient;
+    private ZkClusterMetadataStore store;
+
+    @Before
+    public void setup() {
+        curatorClient = CuratorFrameworkFactory.newClient(
+            zkServers,
+            new ExponentialBackoffRetry(200, 10, 5000));
+        curatorClient.start();
+        store = new ZkClusterMetadataStore(curatorClient, zkServers,"/" + runtime.getMethodName());
+        store.initializeCluster(NUM_STORAGE_CONTAINERS);
+    }
+
+    @After
+    public void teardown() {
+        if (null != store) {
+            store.close();
+        }
+        if (null != curatorClient) {
+            curatorClient.close();
+        }
+    }
+
+    @Test
+    public void testUnitialized() {
+        ZkClusterMetadataStore newStore = new ZkClusterMetadataStore(
+            curatorClient, zkServers, "/" + runtime.getMethodName() + "-new");
+
+        try {
+            newStore.getClusterMetadata();
+            fail("Should fail to get cluster metadata if not initialized");
+        } catch (StorageRuntimeException sre) {
+            assertTrue(sre.getCause() instanceof KeeperException);
+            KeeperException cause = (KeeperException) sre.getCause();
+            assertEquals(Code.NONODE, cause.code());
+        }
+
+        try {
+            newStore.getClusterAssignmentData();
+            fail("Should fail to get cluster assignment data if not initialized");
+        } catch (StorageRuntimeException sre) {
+            assertTrue(sre.getCause() instanceof KeeperException);
+            KeeperException cause = (KeeperException) sre.getCause();
+            assertEquals(Code.NONODE, cause.code());
+        }
+    }
+
+    @Test
+    public void testInitialize() {
+        int numStorageContainers = 2048;
+        try {
+            store.initializeCluster(numStorageContainers);
+            fail("Should fail to initialize cluster if already initialized");
+        } catch (StorageRuntimeException sre) {
+            assertTrue(sre.getCause() instanceof KeeperException);
+            KeeperException cause = (KeeperException) sre.getCause();
+            assertEquals(Code.NODEEXISTS, cause.code());
+        }
+    }
+
+    @Test
+    public void testUpdateClusterMetadata() {
+       int numStorageContainers = 4096;
+       ClusterMetadata metadata = ClusterMetadata.newBuilder()
+           .setNumStorageContainers(numStorageContainers)
+           .build();
+       store.updateClusterMetadata(metadata);
+       assertEquals(metadata, store.getClusterMetadata());
+    }
+
+    @Test
+    public void testUpdateClusterAssignmentData() {
+        ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder()
+            .putServers(
+                "server-0",
+                ServerAssignmentData.newBuilder()
+                    .addContainers(1L)
+                    .addContainers(2L)
+                    .build())
+            .build();
+        store.updateClusterAssignmentData(assignmentData);
+        assertEquals(assignmentData, store.getClusterAssignmentData());
+    }
+
+    @Test
+    public void testWatchClusterAssignmentData() {
+        ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder()
+            .putServers(
+                "server-0",
+                ServerAssignmentData.newBuilder()
+                    .addContainers(1L)
+                    .addContainers(2L)
+                    .build())
+            .build();
+
+        @Cleanup("shutdown")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CompletableFuture<Void> watchFuture = new CompletableFuture<>();
+
+        store.watchClusterAssignmentData(data -> {
+            FutureUtils.complete(watchFuture, null);
+        }, executor);
+
+        store.updateClusterAssignmentData(assignmentData);
+
+        watchFuture.join();
+        assertEquals(assignmentData, store.getClusterAssignmentData());
+    }
+
+    @Test
+    public void testUnwatchClusterAssignmentData() throws Exception {
+        ClusterAssignmentData assignmentData = ClusterAssignmentData.newBuilder()
+            .putServers(
+                "server-0",
+                ServerAssignmentData.newBuilder()
+                    .addContainers(1L)
+                    .addContainers(2L)
+                    .build())
+            .build();
+
+        @Cleanup("shutdown")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        CompletableFuture<Void> watchFuture = new CompletableFuture<>();
+        CountDownLatch latch = new CountDownLatch(2);
+
+        Consumer<Void> dataConsumer = ignored -> {
+            log.info("Notify cluster assignment data changed");
+            latch.countDown();
+            FutureUtils.complete(watchFuture, null);
+        };
+
+        assertEquals(0, store.getNumWatchers());
+        store.watchClusterAssignmentData(dataConsumer, executor);
+        assertEquals(1, store.getNumWatchers());
+        store.updateClusterAssignmentData(assignmentData);
+
+        watchFuture.join();
+        assertEquals(1, latch.getCount());
+        assertEquals(assignmentData, store.getClusterAssignmentData());
+
+        store.unwatchClusterAssignmentData(dataConsumer);
+        assertEquals(0, store.getNumWatchers());
+    }
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerControllerTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerControllerTest.java
new file mode 100644
index 0000000..e152998
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/DefaultStorageContainerControllerTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
+import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController.ServerAssignmentDataComparator;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+
+/**
+ * Unit test {@link DefaultStorageContainerController}.
+ */
+@Slf4j
+public class DefaultStorageContainerControllerTest {
+
+    private static final int NUM_STORAGE_CONTAINERS = 32;
+
+    private final ClusterMetadata clusterMetadata;
+    private final StorageContainerController controller;
+    private final ClusterAssignmentData currentAssignment;
+
+    public DefaultStorageContainerControllerTest() {
+        this.controller = new DefaultStorageContainerController();
+        this.clusterMetadata = ClusterMetadata.newBuilder()
+            .setNumStorageContainers(NUM_STORAGE_CONTAINERS)
+            .build();
+        this.currentAssignment = ClusterAssignmentData.newBuilder()
+            .putServers("default-server", ServerAssignmentData.newBuilder()
+                .addContainers(0L)
+                .addContainers(1L)
+                .addContainers(3L)
+                .build())
+            .build();
+    }
+
+    @Test
+    public void testServerAssignmentDataComparator() {
+        ServerAssignmentDataComparator comparator = new ServerAssignmentDataComparator();
+
+        LinkedList<Long> serverList1 = new LinkedList<>();
+        serverList1.add(1L);
+        LinkedList<Long> serverList2 = new LinkedList<>();
+        serverList2.add(2L);
+        serverList2.add(3L);
+
+        BookieSocketAddress address1 = new BookieSocketAddress("127.0.0.1", 4181);
+        BookieSocketAddress address2 = new BookieSocketAddress("127.0.0.1", 4182);
+
+        Pair<BookieSocketAddress, LinkedList<Long>> pair1 = Pair.of(address1, serverList1);
+        Pair<BookieSocketAddress, LinkedList<Long>> pair2 = Pair.of(address1, serverList2);
+        Pair<BookieSocketAddress, LinkedList<Long>> pair3 = Pair.of(address2, serverList2);
+
+        assertEquals(-1, comparator.compare(pair1, pair2));
+        assertEquals(-1, comparator.compare(pair1, pair2));
+        assertEquals(
+            Integer.compare(address1.hashCode(), address2.hashCode()),
+            comparator.compare(pair2, pair3));
+    }
+
+    @Test
+    public void testComputeIdealStateEmptyCluster() {
+        assertSame(
+            currentAssignment,
+            controller.computeIdealState(
+                clusterMetadata,
+                currentAssignment,
+                Collections.emptySet()));
+    }
+
+    private static Set<BookieSocketAddress> newCluster(int numServers) {
+        Set<BookieSocketAddress> cluster = IntStream.range(0, numServers)
+            .mapToObj(idx -> new BookieSocketAddress("127.0.0.1", 4181 + idx))
+            .collect(Collectors.toSet());
+        return ImmutableSet.copyOf(cluster);
+    }
+
+    private static Set<BookieSocketAddress> newCluster(int numServers, int startServerIdx) {
+        Set<BookieSocketAddress> cluster = IntStream.range(0, numServers)
+            .mapToObj(idx -> new BookieSocketAddress("127.0.0.1", 4181 + startServerIdx + idx))
+            .collect(Collectors.toSet());
+        return ImmutableSet.copyOf(cluster);
+    }
+
+    private static void verifyAssignmentData(ClusterAssignmentData newAssignment,
+                                             Set<BookieSocketAddress> currentCluster,
+                                             boolean isInitialIdealState)
+            throws Exception {
+        int numServers = currentCluster.size();
+
+        assertEquals(numServers, newAssignment.getServersCount());
+        Set<Long> assignedContainers = Sets.newHashSet();
+        Set<BookieSocketAddress> assignedServers = Sets.newHashSet();
+
+        int numContainersPerServer = NUM_STORAGE_CONTAINERS / numServers;
+        int serverIdx = 0;
+        for (Map.Entry<String, ServerAssignmentData> entry : newAssignment.getServersMap().entrySet()) {
+            log.info("Check assignment for server {} = {}", entry.getKey(), entry.getValue());
+
+            BookieSocketAddress address = new BookieSocketAddress(entry.getKey());
+            assignedServers.add(address);
+            assertEquals(serverIdx + 1, assignedServers.size());
+
+            ServerAssignmentData serverData = entry.getValue();
+            assertEquals(numContainersPerServer, serverData.getContainersCount());
+            List<Long> containers = Lists.newArrayList(serverData.getContainersList());
+            Collections.sort(containers);
+            assignedContainers.addAll(containers);
+
+            if (isInitialIdealState) {
+                long startContainerId = containers.get(0);
+                for (int i = 0; i < containers.size(); i++) {
+                    assertEquals(startContainerId + i * numServers, containers.get(i).longValue());
+                }
+            }
+            ++serverIdx;
+        }
+
+        // each server should be assigned with equal number of containers
+        assertTrue(Sets.difference(currentCluster, assignedServers).isEmpty());
+        // all containers should be assigned
+        Set<Long> expectedContainers = LongStream.range(0L, NUM_STORAGE_CONTAINERS)
+            .mapToObj(scId -> Long.valueOf(scId))
+            .collect(Collectors.toSet());
+        assertTrue(Sets.difference(expectedContainers, assignedContainers).isEmpty());
+    }
+
+    private static void verifyAssignmentDataWhenHasMoreServers(ClusterAssignmentData newAssignment,
+                                                               Set<BookieSocketAddress> currentCluster)
+            throws Exception {
+        int numServers = currentCluster.size();
+
+        assertEquals(numServers, newAssignment.getServersCount());
+        Set<Long> assignedContainers = Sets.newHashSet();
+        Set<BookieSocketAddress> assignedServers = Sets.newHashSet();
+
+        int numEmptyServers = 0;
+        int numAssignedServers = 0;
+        int serverIdx = 0;
+        for (Map.Entry<String, ServerAssignmentData> entry : newAssignment.getServersMap().entrySet()) {
+            log.info("Check assignment for server {} = {}", entry.getKey(), entry.getValue());
+
+            BookieSocketAddress address = new BookieSocketAddress(entry.getKey());
+            assignedServers.add(address);
+            assertEquals(serverIdx + 1, assignedServers.size());
+
+            ServerAssignmentData serverData = entry.getValue();
+            if (serverData.getContainersCount() > 0) {
+                assertEquals(1, serverData.getContainersCount());
+                ++numAssignedServers;
+            } else {
+                ++numEmptyServers;
+            }
+            List<Long> containers = Lists.newArrayList(serverData.getContainersList());
+            Collections.sort(containers);
+            assignedContainers.addAll(containers);
+
+            ++serverIdx;
+        }
+
+        assertEquals(numServers / 2, numEmptyServers);
+        assertEquals(numServers / 2, numAssignedServers);
+
+        // each server should be assigned with equal number of containers
+        assertTrue(Sets.difference(currentCluster, assignedServers).isEmpty());
+        // all containers should be assigned
+        Set<Long> expectedContainers = LongStream.range(0L, NUM_STORAGE_CONTAINERS)
+            .mapToObj(scId -> Long.valueOf(scId))
+            .collect(Collectors.toSet());
+        assertTrue(Sets.difference(expectedContainers, assignedContainers).isEmpty());
+    }
+
+    @Test
+    public void testComputeIdealStateFromEmptyAssignment() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 8;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+
+        ClusterAssignmentData newAssignment = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+
+        verifyAssignmentData(newAssignment, currentCluster, true);
+    }
+
+    @Test
+    public void testComputeIdealStateIfClusterUnchanged() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 8;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+        ClusterAssignmentData newAssignment = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+        verifyAssignmentData(newAssignment, currentCluster, true);
+
+        ClusterAssignmentData newAssignment2 = controller.computeIdealState(
+            clusterMetadata,
+            newAssignment,
+            currentCluster);
+
+        // the state should not change if cluster is unchanged.
+        assertSame(newAssignment, newAssignment2);
+    }
+
+    @Test
+    public void testComputeIdealStateWhenHostsRemoved() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 8;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+
+        ClusterAssignmentData assignmentData = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+        verifyAssignmentData(assignmentData, currentCluster, true);
+
+        int newNumServers = 4;
+        Set<BookieSocketAddress> newCluster = newCluster(newNumServers);
+
+        ClusterAssignmentData newAssignmentData = controller.computeIdealState(
+            clusterMetadata,
+            assignmentData,
+            newCluster);
+        verifyAssignmentData(newAssignmentData, newCluster, false);
+    }
+
+    @Test
+    public void testComputeIdealStateWhenHostsAdded() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 4;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+
+        ClusterAssignmentData assignmentData = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+        verifyAssignmentData(assignmentData, currentCluster, true);
+
+        int newNumServers = 8;
+        Set<BookieSocketAddress> newCluster = newCluster(newNumServers);
+
+        ClusterAssignmentData newAssignmentData = controller.computeIdealState(
+            clusterMetadata,
+            assignmentData,
+            newCluster);
+        verifyAssignmentData(newAssignmentData, newCluster, false);
+    }
+
+    @Test
+    public void testComputeIdealStateWhenHostsRemovedAdded() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 4;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+
+        ClusterAssignmentData assignmentData = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+        verifyAssignmentData(assignmentData, currentCluster, true);
+
+        Set<BookieSocketAddress> serversToAdd = newCluster(6, numServers);
+        Set<BookieSocketAddress> serversToRemove = newCluster(2);
+
+        Set<BookieSocketAddress> newCluster = Sets.newHashSet(currentCluster);
+        newCluster.addAll(serversToAdd);
+        serversToRemove.forEach(newCluster::remove);
+
+        ClusterAssignmentData newAssignmentData = controller.computeIdealState(
+            clusterMetadata,
+            assignmentData,
+            newCluster);
+        verifyAssignmentData(newAssignmentData, newCluster, false);
+    }
+
+    @Test
+    public void testComputeIdealStateWhenHasMoreServers() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 2 * NUM_STORAGE_CONTAINERS;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+
+        ClusterAssignmentData assignmentData = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+        verifyAssignmentDataWhenHasMoreServers(assignmentData, currentCluster);
+    }
+
+    @Test
+    public void testComputeIdealStateWhenScaleToMoreServers() throws Exception {
+        ClusterAssignmentData emptyAssignment = ClusterAssignmentData.newBuilder().build();
+
+        int numServers = 4;
+        Set<BookieSocketAddress> currentCluster = newCluster(numServers);
+
+        ClusterAssignmentData assignmentData = controller.computeIdealState(
+            clusterMetadata,
+            emptyAssignment,
+            currentCluster);
+        verifyAssignmentData(assignmentData, currentCluster, true);
+
+        numServers = 2 * NUM_STORAGE_CONTAINERS;
+        Set<BookieSocketAddress> newCluster = newCluster(numServers);
+        ClusterAssignmentData newAssignment = controller.computeIdealState(
+            clusterMetadata,
+            assignmentData,
+            newCluster);
+        verifyAssignmentDataWhenHasMoreServers(newAssignment, newCluster);
+    }
+
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java
index 34a1474..7031629 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/TestStorageContainerRegistryImpl.java
@@ -69,7 +69,7 @@ public class TestStorageContainerRegistryImpl {
 
     private StorageContainer createStorageContainer() {
         StorageContainer sc = mock(StorageContainer.class);
-        when(sc.start()).thenReturn(FutureUtils.value(null));
+        when(sc.start()).thenReturn(FutureUtils.value(sc));
         when(sc.stop()).thenReturn(FutureUtils.value(null));
         return sc;
     }
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java
new file mode 100644
index 0000000..193503b
--- /dev/null
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.stream.storage.impl.sc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.testing.MoreAsserts;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
+import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerFactory;
+import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
+import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.distributedlog.ZooKeeperClusterTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Unit test {@link ZkStorageContainerManager}.
+ */
+public class ZkStorageContainerManagerTest extends ZooKeeperClusterTestCase {
+
+    private static final int NUM_STORAGE_CONTAINERS = 32;
+
+    @Rule
+    public final TestName runtime = new TestName();
+
+    private final Endpoint myEndpoint = Endpoint.newBuilder()
+        .setHostname("127.0.0.1")
+        .setPort(4181)
+        .build();
+
+    private CuratorFramework curatorClient;
+    private StorageContainerFactory mockScFactory;
+    private StorageContainerRegistry scRegistry;
+    private ZkClusterMetadataStore clusterMetadataStore;
+    private ZkStorageContainerManager scManager;
+    private OrderedScheduler scheduler;
+
+    @Before
+    public void setup() {
+        curatorClient = CuratorFrameworkFactory.newClient(
+            zkServers,
+            new ExponentialBackoffRetry(200, 10, 5000));
+        curatorClient.start();
+
+        clusterMetadataStore = spy(new ZkClusterMetadataStore(
+            curatorClient, zkServers, "/" + runtime.getMethodName()));
+        clusterMetadataStore.initializeCluster(NUM_STORAGE_CONTAINERS);
+
+        scheduler = OrderedScheduler.newSchedulerBuilder()
+            .name("test-scheduler")
+            .numThreads(1)
+            .build();
+
+        mockScFactory = mock(StorageContainerFactory.class);
+        scRegistry = spy(new StorageContainerRegistryImpl(mockScFactory, scheduler));
+
+        scManager = new ZkStorageContainerManager(
+            myEndpoint,
+            new StorageConfiguration(new CompositeConfiguration())
+                .setClusterControllerScheduleInterval(1, TimeUnit.SECONDS),
+            clusterMetadataStore,
+            scRegistry,
+            NullStatsLogger.INSTANCE);
+    }
+
+    @After
+    public void teardown() {
+        if (null != scManager) {
+            scManager.close();
+        }
+
+        if (null != scheduler) {
+            scheduler.shutdown();
+        }
+
+        if (null != curatorClient) {
+            curatorClient.close();
+        }
+
+        if (null != clusterMetadataStore) {
+            clusterMetadataStore.close();
+        }
+    }
+
+    private static StorageContainer createStorageContainer(long scId,
+                                                           CompletableFuture<StorageContainer> startFuture,
+                                                           CompletableFuture<Void> stopFuture) {
+        StorageContainer sc = mock(StorageContainer.class);
+        when(sc.getId()).thenReturn(scId);
+        when(sc.start()).thenReturn(startFuture);
+        when(sc.stop()).thenReturn(stopFuture);
+        return sc;
+    }
+
+    /**
+     * Test basic operations such as starting or stopping containers.
+     */
+    @Test
+    public void testBasicOps() throws Exception {
+        // start the storage container manager
+        scManager.start();
+
+        long containerId = 11L;
+        long containerId2 = 22L;
+
+        // mock a container and start it in the registry
+        CompletableFuture<StorageContainer> startFuture = new CompletableFuture<>();
+        CompletableFuture<Void> stopFuture = new CompletableFuture<>();
+        CompletableFuture<StorageContainer> startFuture2 = new CompletableFuture<>();
+        CompletableFuture<Void> stopFuture2 = new CompletableFuture<>();
+
+        StorageContainer mockSc = createStorageContainer(containerId, startFuture, stopFuture);
+        when(mockScFactory.createStorageContainer(eq(containerId)))
+            .thenReturn(mockSc);
+
+        StorageContainer mockSc2 = createStorageContainer(containerId2, startFuture2, stopFuture2);
+        when(mockScFactory.createStorageContainer(eq(containerId2)))
+            .thenReturn(mockSc2);
+
+        // update assignment map
+        ClusterAssignmentData cad = ClusterAssignmentData.newBuilder()
+            .putServers(
+                NetUtils.endpointToString(myEndpoint),
+                ServerAssignmentData.newBuilder()
+                    .addContainers(containerId)
+                    .build())
+            .build();
+        clusterMetadataStore.updateClusterAssignmentData(cad);
+
+        // notify the container to complete startup
+        startFuture.complete(mockSc);
+        verify(scRegistry, timeout(10000).times(1)).startStorageContainer(eq(containerId));
+        MoreAsserts.assertUtil(
+            ignored -> scManager.getLiveContainers().size() >= 1,
+            () -> null);
+        assertEquals(1, scManager.getLiveContainers().size());
+        assertTrue(scManager.getLiveContainers().containsKey(containerId));
+
+
+        // update assignment map to remove containerId and add containerId2
+        ClusterAssignmentData newCad = ClusterAssignmentData.newBuilder()
+            .putServers(
+                NetUtils.endpointToString(myEndpoint),
+                ServerAssignmentData.newBuilder()
+                    .addContainers(22L)
+                    .build())
+            .build();
+        clusterMetadataStore.updateClusterAssignmentData(newCad);
+
+        // notify the container1 to stop and container2 to start
+        FutureUtils.complete(stopFuture, null);
+        startFuture2.complete(mockSc2);
+        verify(scRegistry, timeout(10000).times(1)).stopStorageContainer(eq(containerId), same(mockSc));
+        verify(scRegistry, timeout(10000).times(1)).startStorageContainer(eq(containerId2));
+        MoreAsserts.assertUtil(
+            ignored -> !scManager.getLiveContainers().containsKey(containerId)
+                && scManager.getLiveContainers().containsKey(containerId2),
+            () -> null);
+        assertEquals(1, scManager.getLiveContainers().size());
+        assertFalse(scManager.getLiveContainers().containsKey(containerId));
+        assertTrue(scManager.getLiveContainers().containsKey(containerId2));
+    }
+
+    @Test
+    public void testShutdownPendingStartStorageContainer() throws Exception {
+        // start the storage container manager
+        scManager.start();
+
+        long containerId = 11L;
+
+        // mock a container and start it in the registry
+        CompletableFuture<StorageContainer> startFuture = new CompletableFuture<>();
+        CompletableFuture<Void> stopFuture = new CompletableFuture<>();
+
+        StorageContainer mockSc = createStorageContainer(
+            containerId, startFuture, stopFuture);
+        when(mockScFactory.createStorageContainer(eq(containerId)))
+            .thenReturn(mockSc);
+
+        // update assignment map
+        ClusterAssignmentData cad = ClusterAssignmentData.newBuilder()
+            .putServers(
+                NetUtils.endpointToString(myEndpoint),
+                ServerAssignmentData.newBuilder()
+                    .addContainers(containerId)
+                    .build())
+            .build();
+        clusterMetadataStore.updateClusterAssignmentData(cad);
+
+        // wait until container start is called
+        verify(scRegistry, timeout(10000).times(1)).startStorageContainer(eq(containerId));
+        assertEquals(0, scManager.getLiveContainers().size());
+        assertEquals(1, scManager.getPendingStartStopContainers().size());
+        assertTrue(scManager.getPendingStartStopContainers().contains(containerId));
+
+        // now shutting the manager down
+        cad = ClusterAssignmentData.newBuilder().build();
+        clusterMetadataStore.updateClusterAssignmentData(cad);
+
+        // the container should not be stopped since it is pending starting.
+        Thread.sleep(200);
+        verify(scRegistry, timeout(10000).times(0)).stopStorageContainer(eq(containerId), same(mockSc));
+        assertEquals(1, scManager.getPendingStartStopContainers().size());
+        assertTrue(scManager.getPendingStartStopContainers().contains(containerId));
+
+        // now complete the start future and the container is eventually going to shutdown
+        FutureUtils.complete(startFuture, mockSc);
+        FutureUtils.complete(stopFuture, null);
+
+        verify(scRegistry, timeout(10000).times(1)).stopStorageContainer(eq(containerId), same(mockSc));
+        MoreAsserts.assertUtil(
+            ignored -> scManager.getPendingStartStopContainers().size() == 0,
+            () -> null);
+        assertEquals(0, scManager.getLiveContainers().size());
+        assertEquals(0, scManager.getPendingStartStopContainers().size());
+    }
+
+    @Test
+    public void testStartContainerOnFailures() throws Exception {
+        scManager.close();
+
+        long containerId = 11L;
+        AtomicBoolean returnGoodContainer = new AtomicBoolean(false);
+
+        CompletableFuture<StorageContainer> startFuture = new CompletableFuture<>();
+        StorageContainer goodSc = createStorageContainer(containerId, startFuture, FutureUtils.Void());
+        mockScFactory = (scId) -> {
+            if (returnGoodContainer.get()) {
+                return goodSc;
+            } else {
+                return createStorageContainer(
+                    scId,
+                    FutureUtils.exception(new Exception("Failed to start")),
+                    FutureUtils.Void()
+                );
+            }
+        };
+        scRegistry = spy(new StorageContainerRegistryImpl(mockScFactory, scheduler));
+
+        scManager = new ZkStorageContainerManager(
+            myEndpoint,
+            new StorageConfiguration(new CompositeConfiguration())
+                .setClusterControllerScheduleInterval(1, TimeUnit.SECONDS),
+            clusterMetadataStore,
+            scRegistry,
+            NullStatsLogger.INSTANCE);
+
+
+        // start the storage container manager
+        scManager.start();
+
+        // update assignment map
+        ClusterAssignmentData cad = ClusterAssignmentData.newBuilder()
+            .putServers(
+                NetUtils.endpointToString(myEndpoint),
+                ServerAssignmentData.newBuilder()
+                    .addContainers(containerId)
+                    .build())
+            .build();
+        clusterMetadataStore.updateClusterAssignmentData(cad);
+
+        // wait until container start is called and verify it is not started.
+        verify(scRegistry, timeout(10000).atLeastOnce()).startStorageContainer(eq(containerId));
+        assertEquals(0, scManager.getLiveContainers().size());
+
+        // flip the flag to return a good container to simulate successful startup
+        returnGoodContainer.set(true);
+        FutureUtils.complete(startFuture, goodSc);
+
+        // wait until container start is called again and the container is started
+        MoreAsserts.assertUtil(
+            ignored -> scManager.getLiveContainers().size() >= 1,
+            () -> null);
+        assertEquals(1, scManager.getLiveContainers().size());
+        assertTrue(scManager.getLiveContainers().containsKey(containerId));
+    }
+
+
+}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/TestHelixStorageContainerManager.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/TestHelixStorageContainerManager.java
deleted file mode 100644
index e3efe94..0000000
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/sc/helix/TestHelixStorageContainerManager.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.sc.helix;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
-import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
-import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.distributedlog.ZooKeeperClusterTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Unit test for {@link HelixStorageContainerManager}.
- */
-@Slf4j
-public class TestHelixStorageContainerManager extends ZooKeeperClusterTestCase {
-
-    @Rule
-    public TestName runtime = new TestName();
-
-    private HelixStorageController controller;
-    private OrderedScheduler scheduler;
-
-    @Before
-    public void setup() {
-        controller = new HelixStorageController(zkServers);
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .numThreads(1)
-            .name("test-helix-storagecontainer-manager")
-            .build();
-    }
-
-    @After
-    public void tearDown() {
-        if (null != controller) {
-            controller.close();
-        }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
-    }
-
-    private static Endpoint createEndpoint(String hostname,
-                                           int port) {
-        return Endpoint.newBuilder()
-            .setHostname(hostname)
-            .setPort(port)
-            .build();
-    }
-
-    @Test
-    public void testCreateCluster() {
-        String clusterName = runtime.getMethodName();
-
-        controller.createCluster(clusterName, 1024, 3);
-        List<String> clusters = controller.getAdmin().getClusters();
-        assertTrue(clusters.contains(clusterName));
-
-        // create an existing cluster
-        controller.createCluster(clusterName, 1024, 3);
-        clusters = controller.getAdmin().getClusters();
-        assertTrue(clusters.contains(clusterName));
-    }
-
-    @Test
-    public void testAddNode() {
-        String clusterName = runtime.getMethodName();
-        controller.createCluster(clusterName, 1024, 3);
-        List<String> instances = controller.getAdmin().getInstancesInCluster(clusterName);
-        assertEquals(0, instances.size());
-
-        Endpoint endpoint = createEndpoint("127.0.0.1", 4181);
-        controller.addNode(clusterName, endpoint, Optional.empty());
-        instances = controller.getAdmin().getInstancesInCluster(clusterName);
-        assertEquals(1, instances.size());
-        assertTrue(instances.contains(HelixStorageController.getEndpointName(endpoint)));
-
-        // add the instance again
-        controller.addNode(clusterName, endpoint, Optional.empty());
-        instances = controller.getAdmin().getInstancesInCluster(clusterName);
-        assertEquals(1, instances.size());
-        assertTrue(instances.contains(HelixStorageController.getEndpointName(endpoint)));
-
-        // add a different instance
-        Endpoint endpoint2 = createEndpoint("127.0.0.1", 4481);
-        controller.addNode(clusterName, endpoint2, Optional.empty());
-        instances = controller.getAdmin().getInstancesInCluster(clusterName);
-        assertEquals(2, instances.size());
-        assertTrue(instances.contains(HelixStorageController.getEndpointName(endpoint)));
-        assertTrue(instances.contains(HelixStorageController.getEndpointName(endpoint2)));
-    }
-
-    private StorageContainerRegistryImpl createRegistry() {
-        return new StorageContainerRegistryImpl(
-            (scId) -> {
-                StorageContainer sc = mock(StorageContainer.class);
-                when(sc.getId()).thenReturn(scId);
-                when(sc.start()).thenReturn(FutureUtils.value(null));
-                when(sc.stop()).thenReturn(FutureUtils.value(null));
-                return sc;
-            },
-            scheduler);
-    }
-
-    private HelixStorageContainerManager createManager(String clusterName,
-                                                       StorageConfiguration conf,
-                                                       StorageContainerRegistryImpl registry,
-                                                       Endpoint endpoint) {
-        return new HelixStorageContainerManager(
-            zkServers,
-            clusterName,
-            conf,
-            registry,
-            endpoint,
-            Optional.empty(),
-            NullStatsLogger.INSTANCE);
-    }
-
-    @Ignore
-    @Test
-    public void testStorageContainerManager() throws Exception {
-        String clusterName = runtime.getMethodName();
-        int numStorageContainers = 12;
-        int numHosts = 3;
-        controller.createCluster(clusterName, numStorageContainers, 3);
-
-        StorageConfiguration conf = new StorageConfiguration(new CompositeConfiguration());
-        Endpoint[] endpoints = new Endpoint[numHosts];
-        StorageContainerRegistryImpl[] registries = new StorageContainerRegistryImpl[numHosts];
-        HelixStorageContainerManager[] managers = new HelixStorageContainerManager[numHosts];
-
-        int basePort = 80;
-        for (int i = 0; i < numHosts; i++) {
-            endpoints[i] = createEndpoint("127.0.0.1", basePort + i);
-            registries[i] = createRegistry();
-            managers[i] = createManager(
-                clusterName, conf, registries[i], endpoints[i]);
-        }
-
-        managers[0].start();
-        while (registries[0].getNumStorageContainers() < numStorageContainers) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-
-        assertEquals(numStorageContainers, registries[0].getNumStorageContainers());
-        assertEquals(0, registries[1].getNumStorageContainers());
-        assertEquals(0, registries[2].getNumStorageContainers());
-
-        // start the second node
-        managers[1].start();
-        while (registries[0].getNumStorageContainers() > numStorageContainers / 2) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-        while (registries[1].getNumStorageContainers() < numStorageContainers / 2) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-
-        assertEquals(numStorageContainers / 2, registries[0].getNumStorageContainers());
-        assertEquals(numStorageContainers / 2, registries[1].getNumStorageContainers());
-        assertEquals(0, registries[2].getNumStorageContainers());
-
-        // start the third node
-        managers[2].start();
-        while (registries[0].getNumStorageContainers() > numStorageContainers / 3) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-        while (registries[1].getNumStorageContainers() > numStorageContainers / 3) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-        while (registries[2].getNumStorageContainers() < numStorageContainers / 3) {
-            TimeUnit.MILLISECONDS.sleep(20);
-        }
-
-        int totalStorageContainers = registries[0].getNumStorageContainers()
-            + registries[1].getNumStorageContainers()
-            + registries[2].getNumStorageContainers();
-        assertEquals("Expected " + numStorageContainers + "But " + totalStorageContainers + " found",
-            numStorageContainers, totalStorageContainers);
-        assertEquals(numStorageContainers / 3, registries[0].getNumStorageContainers());
-        assertEquals(numStorageContainers / 3, registries[1].getNumStorageContainers());
-        assertEquals(numStorageContainers / 3, registries[2].getNumStorageContainers());
-
-        for (int i = 0; i < 10; i++) {
-            int nid = ThreadLocalRandom.current().nextInt(numHosts);
-            long scId = ThreadLocalRandom.current().nextLong(numStorageContainers);
-            Endpoint endpoint = managers[nid].getStorageContainer(scId);
-            if (null != endpoint) {
-                assertTrue(endpoint.equals(endpoints[0])
-                    || endpoint.equals(endpoints[1])
-                    || endpoint.equals(endpoints[2]));
-            }
-        }
-
-        for (HelixStorageContainerManager manager : managers) {
-            manager.close();
-        }
-
-    }
-
-
-}
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
index 03bd10f..03edfb8 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
+++ b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.net.URI;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
@@ -54,7 +53,6 @@ public class StorageAdminClientTest extends StorageServerTestBase {
 
     private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
-    private URI defaultBackendUri;
 
     @Override
     protected void doSetup() throws Exception {
@@ -69,7 +67,6 @@ public class StorageAdminClientTest extends StorageServerTestBase {
         adminClient = StorageClientBuilder.newBuilder()
             .withSettings(settings)
             .buildAdmin();
-        defaultBackendUri = URI.create("distributedlog://" + cluster.getZkServers() + "/stream/storage");
     }
 
     @Override
@@ -157,7 +154,7 @@ public class StorageAdminClientTest extends StorageServerTestBase {
         assertEquals(streamName, streamProps.getStreamName());
         assertEquals(
             StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(defaultBackendUri.toString())
+                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
                 .build(),
             streamProps.getStreamConf());
 
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
index 8a8eee4..ebe99f0 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
+++ b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
@@ -19,7 +19,6 @@ import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SE
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SPLIT_POLICY;
 import static org.junit.Assert.assertEquals;
 
-import java.net.URI;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
@@ -59,11 +58,9 @@ public class StorageClientTest extends StorageServerTestBase {
     private final NamespaceConfiguration colConf = NamespaceConfiguration.newBuilder()
         .setDefaultStreamConf(streamConf)
         .build();
-    private URI defaultBackendUri;
 
     @Override
     protected void doSetup() throws Exception {
-        defaultBackendUri = URI.create("distributedlog://" + cluster.getZkServers() + "/stream/storage");
         StorageClientSettings settings = StorageClientSettings.newBuilder()
             .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
             .usePlaintext(true)
@@ -106,7 +103,7 @@ public class StorageClientTest extends StorageServerTestBase {
             FutureUtils.result(adminClient.getStream(nsName, streamName));
         assertEquals(
             StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(defaultBackendUri.toString())
+                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
                 .build()
             , properties.getStreamConf());
     }
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
index e5a9a7d..21dc004 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
+++ b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
@@ -43,7 +43,7 @@ public abstract class StorageServerTestBase {
     @Rule
     public final TemporaryFolder testDir = new TemporaryFolder();
 
-    protected final StreamClusterSpec spec;
+    protected StreamClusterSpec spec;
     protected StreamCluster cluster;
 
     protected StorageServerTestBase() {
@@ -59,7 +59,7 @@ public abstract class StorageServerTestBase {
 
     @Before
     public void setUp() throws Exception {
-        spec.storageRootDir(testDir.newFolder("tests"));
+        spec = spec.storageRootDir(testDir.newFolder("tests"));
         this.cluster = StreamCluster.build(spec);
         this.cluster.start();
         doSetup();
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
index 00813ce..7add6d7 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
+++ b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
@@ -31,7 +31,6 @@ import static org.junit.Assert.fail;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
-import java.net.URI;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.StorageClient;
@@ -65,11 +64,9 @@ public class TableClientSimpleTest extends StorageServerTestBase {
     private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
     private StorageClient storageClient;
-    private URI defaultBackendUri;
 
     @Override
     protected void doSetup() throws Exception {
-        defaultBackendUri = URI.create("distributedlog://" + cluster.getZkServers() + "/stream/storage");
         scheduler = OrderedScheduler.newSchedulerBuilder()
             .name("table-client-test")
             .numThreads(1)
@@ -128,7 +125,7 @@ public class TableClientSimpleTest extends StorageServerTestBase {
         assertEquals(streamName, streamProps.getStreamName());
         assertEquals(
             StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(defaultBackendUri.toString())
+                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
                 .build(),
             streamProps.getStreamConf());
 
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
index d5fcde6..7db265b 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
+++ b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
@@ -28,7 +28,6 @@ import static org.junit.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
-import java.net.URI;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.StorageClient;
@@ -76,11 +75,9 @@ public class TableClientTest extends StorageServerTestBase {
     private StorageAdminClient adminClient;
     private StorageClient storageClient;
     private final OptionFactory<ByteBuf> optionFactory = new OptionFactoryImpl<>();
-    private URI defaultBackendUri;
 
     @Override
     protected void doSetup() throws Exception {
-        defaultBackendUri = URI.create("distributedlog://" + cluster.getZkServers() + "/stream/storage");
         scheduler = OrderedScheduler.newSchedulerBuilder()
             .name("table-client-test")
             .numThreads(1)
@@ -139,7 +136,7 @@ public class TableClientTest extends StorageServerTestBase {
         assertEquals(streamName, streamProps.getStreamName());
         assertEquals(
             StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(defaultBackendUri.toString())
+                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
                 .build(),
             streamProps.getStreamConf());
 

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.