You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 21:01:24 UTC

[bookkeeper] 03/10: [TABLE SERVICE] start table service as an extra component of bookie

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit b25fc5f3414622b4179e8ea24013b8146d7cf9f4
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue May 22 11:43:31 2018 -0700

    [TABLE SERVICE] start table service as an extra component of bookie
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    table service was built over bookkeeper ledgers. it is an extension to bookies' functionalities, so it is much convenient to start the service as one additional component with bookie, just like how we start `AutoRecovery` along with bookie.
    
    *Solution*
    
    - include `stream/server` module as part of bookkeeper server/all binary distributions
    - abstract `StorageServer` to allow controlling whether to start bookie or not
    - provide a ServerLifecycleComponent of `StorageServer`, so it can be used as an extra component of bookie
    
    *Tests*
    
    - improve the integration tests to start table service as part of bookie containers
    - move `LocationClientTest` to container based integration tests
    
    Master Issue: #1205
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1422 from sijie/start_table_service
---
 bookkeeper-dist/all/pom.xml                        |   7 ++
 bookkeeper-dist/server/pom.xml                     |   7 ++
 bookkeeper-dist/src/assemble/bin-all.xml           |   3 +
 bookkeeper-dist/src/assemble/bin-server.xml        |   4 +
 .../src/main/resources/LICENSE-all.bin.txt         |  47 +++++++-
 .../src/main/resources/LICENSE-server.bin.txt      |  44 +++++++-
 .../google-auth-library-credentials-0.4.0/LICENSE  |  28 +++++
 .../src/main/resources/deps/protobuf-3.0.0/LICENSE |  32 ++++++
 .../src/main/resources/deps/protobuf-3.3.1/LICENSE |  32 ++++++
 conf/bk_server.conf                                |  28 ++++-
 pom.xml                                            |   8 +-
 .../bookkeeper/stream/cluster/StreamCluster.java   |  38 ++-----
 .../bookkeeper/stream/server/StorageServer.java    | 118 ++++++++++++++-------
 .../server/StreamStorageLifecycleComponent.java    |  64 +++++++++++
 .../stream/server/conf/BookieConfiguration.java    |   4 +-
 .../server/conf/StorageServerConfiguration.java    |  11 ++
 .../stream/server/service/BookieWatchService.java  |  95 +++++++++++++++++
 .../server/service/CuratorProviderService.java     |   3 +
 .../server/service/DLNamespaceProviderService.java |   1 +
 .../service/RegistrationServiceProvider.java       |   2 +
 .../storage/api/cluster/ClusterInitializer.java    |  47 ++++++++
 .../storage/api/cluster/ClusterMetadataStore.java  |   9 +-
 stream/storage/impl/pom.xml                        |  10 --
 .../impl/cluster/InMemClusterMetadataStore.java    |   6 +-
 .../storage/impl/cluster/ZkClusterInitializer.java |  95 +++++++++++++++++
 .../impl/cluster/ZkClusterMetadataStore.java       |   5 +-
 .../cluster/ClusterControllerLeaderImplTest.java   |   3 +-
 .../tests/containers/BookieContainer.java          |  34 ++++--
 .../cluster/BookKeeperClusterTestBase.java         |  23 +++-
 .../integration/stream}/LocationClientTest.java    |  26 +++--
 .../integration/stream/StreamClusterTestBase.java  |  73 +++++++++++++
 .../tests/integration/topologies/BKCluster.java    |  77 ++++++++++++--
 .../integration/topologies/BKClusterSpec.java      |  67 ++++++++++++
 33 files changed, 924 insertions(+), 127 deletions(-)

diff --git a/bookkeeper-dist/all/pom.xml b/bookkeeper-dist/all/pom.xml
index 253b356..d1f5b25 100644
--- a/bookkeeper-dist/all/pom.xml
+++ b/bookkeeper-dist/all/pom.xml
@@ -92,6 +92,13 @@
       <version>${project.version}</version>
     </dependency>
 
+    <!-- stream.storage -->
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-server</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <!-- bookkeeper benchmark -->
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
diff --git a/bookkeeper-dist/server/pom.xml b/bookkeeper-dist/server/pom.xml
index 75e70e9..abcd0ef 100644
--- a/bookkeeper-dist/server/pom.xml
+++ b/bookkeeper-dist/server/pom.xml
@@ -76,6 +76,13 @@
       <version>${project.version}</version>
     </dependency>
 
+    <!-- stream.storage -->
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-server</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <!-- slf4j binding -->
     <dependency>
       <groupId>org.slf4j</groupId>
diff --git a/bookkeeper-dist/src/assemble/bin-all.xml b/bookkeeper-dist/src/assemble/bin-all.xml
index fd45d67..4a7c4dc 100644
--- a/bookkeeper-dist/src/assemble/bin-all.xml
+++ b/bookkeeper-dist/src/assemble/bin-all.xml
@@ -54,11 +54,14 @@
       <directory>../src/main/resources/deps</directory>
       <outputDirectory>/deps</outputDirectory>
       <includes>
+        <include>google-auth-library-credentials-0.4.0/LICENSE</include>
         <include>javax.servlet-api-3.1.0/CDDL+GPL-1.1</include>
         <include>jsr-305/LICENSE</include>
         <include>netty-3.10.1.Final/*</include>
         <include>netty-4.1.12.Final/*</include>
         <include>paranamer-2.8/LICENSE.txt</include>
+        <include>protobuf-3.0.0/LICENSE</include>
+        <include>protobuf-3.3.1/LICENSE</include>
         <include>protobuf-3.4.0/LICENSE</include>
         <include>scala-library-2.11.7/LICENSE.md</include>
         <include>scala-parser-combinators_2.11-1.0.4/LICENSE.md</include>
diff --git a/bookkeeper-dist/src/assemble/bin-server.xml b/bookkeeper-dist/src/assemble/bin-server.xml
index 7bbd3b2..ce5a646 100644
--- a/bookkeeper-dist/src/assemble/bin-server.xml
+++ b/bookkeeper-dist/src/assemble/bin-server.xml
@@ -49,8 +49,11 @@
       <directory>../src/main/resources/deps</directory>
       <outputDirectory>/deps</outputDirectory>
       <includes>
+        <include>google-auth-library-credentials-0.4.0/LICENSE</include>
         <include>javax.servlet-api-3.1.0/CDDL+GPL-1.1</include>
         <include>netty-4.1.12.Final/*</include>
+        <include>protobuf-3.0.0/LICENSE</include>
+        <include>protobuf-3.3.1/LICENSE</include>
         <include>protobuf-3.4.0/LICENSE</include>
         <include>slf4j-1.7.25/LICENSE.txt</include>
       </includes>
@@ -85,6 +88,7 @@
       <!-- Include 'groupId' in the dependencies Jar names to better identify the provenance of the jar -->
       <outputFileNameMapping>${artifact.groupId}-${artifact.artifactId}-${artifact.version}${dashClassifier?}.${artifact.extension}</outputFileNameMapping>
       <excludes>
+        <exclude>com.google.code.findbugs:jsr305</exclude>
         <!-- All these dependencies are already included in netty-all -->
         <exclude>io.netty:netty-buffer</exclude>
         <exclude>io.netty:netty-codec</exclude>
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
index 7f523db..289916d 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
@@ -276,6 +276,25 @@ Apache Software License, Version 2.
 - lib/net.jpountz.lz4-lz4-1.3.0.jar [38]
 - lib/org.codehaus.jackson-jackson-core-asl-1.9.11.jar [39]
 - lib/org.codehaus.jackson-jackson-mapper-asl-1.9.11.jar [40]
+- lib/com.google.api.grpc-proto-google-common-protos-0.1.9.jar [41]
+- lib/com.google.code.gson-gson-2.7.jar [42]
+- lib/com.google.instrumentation-instrumentation-api-0.4.3.jar [43]
+- lib/com.squareup.okhttp-okhttp-2.5.0.jar [44]
+- lib/com.squareup.okio-okio-1.6.0.jar [45]
+- lib/io.grpc-grpc-all-1.5.0.jar [46]
+- lib/io.grpc-grpc-auth-1.5.0.jar [46]
+- lib/io.grpc-grpc-context-1.5.0.jar [46]
+- lib/io.grpc-grpc-core-1.5.0.jar [46]
+- lib/io.grpc-grpc-netty-1.5.0.jar [46]
+- lib/io.grpc-grpc-okhttp-1.5.0.jar [46]
+- lib/io.grpc-grpc-protobuf-1.5.0.jar [46]
+- lib/io.grpc-grpc-protobuf-lite-1.5.0.jar [46]
+- lib/io.grpc-grpc-protobuf-nano-1.5.0.jar [46]
+- lib/io.grpc-grpc-stub-1.5.0.jar [46]
+- lib/org.apache.curator-curator-client-4.0.1.jar [47]
+- lib/org.apache.curator-curator-framework-4.0.1.jar [47]
+- lib/org.apache.curator-curator-recipes-4.0.1.jar [47]
+- lib/org.inferred-freebuilder-1.14.9.jar [48]
 
 [1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -316,6 +335,15 @@ Apache Software License, Version 2.
 [38] Source available at https://github.com/lz4/lz4-java/tree/1.3.0
 [39] Source available at https://github.com/codehaus/jackson/tree/1.9
 [40] Source available at https://github.com/codehaus/jackson/tree/1.9
+[41] Source available at https://github.com/googleapis/googleapis
+[42] Source available at https://github.com/google/gson/tree/gson-parent-2.7
+[43] Source available at https://github.com/census-instrumentation/opencensus-java/tree/v0.4.3
+[44] Source available at https://github.com/square/okhttp/tree/parent-2.5.0
+[45] Source available at https://github.com/square/okio/tree/okio-parent-1.6.0
+[46] Source available at https://github.com/grpc/grpc-java/tree/v1.5.0
+[47] Source available at https://github.com/apache/curator/tree/apache-curator-4.0.1
+[48] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
+
 
 ------------------------------------------------------------------------------------
 lib/io.netty-netty-3.10.1.Final.jar contains the extensions to Java Collections Framework which has
@@ -459,10 +487,20 @@ Bundled as lib/com.google.code.findbugs-jsr305-3.0.2.jar
 Source available at https://storage.googleapis.com/google-code-archive-source/v2/code.google.com/jsr-305/source-archive.zip
 ------------------------------------------------------------------------------------
 This product bundles Google Protocal Buffers, which is available under a "3-clause BSD"
-license. For details, see deps/protobuf-3.4.0/LICENSE.
+license.
 
 Bundled as lib/com.google.protobuf-protobuf-java-3.4.0.jar
 Source available at https://github.com/google/protobuf/tree/v3.4.0
+For details, see deps/protobuf-3.4.0/LICENSE.
+
+Bundled as lib/com.google.protobuf.nano-protobuf-javanano-3.0.0-alpha-5.jar
+Source available at https://github.com/google/protobuf/tree/3.0.0-pre
+For details, see deps/protobuf-3.0.0/LICENSE.
+
+Bundled as com.google.protobuf-protobuf-java-util-3.3.1.jar
+Source available at https://github.com/google/protobuf/tree/v3.3.1
+For details, see deps/protobuf-3.3.1/LICENSE.
+
 ------------------------------------------------------------------------------------
 This product bundles Paranamer, which is available under a "3-clause BSD" license.
 For details, see deps/paranamer-2.8/LICENSE.txt.
@@ -501,4 +539,11 @@ Bundled as
   - lib/org.slf4j-slf4j-api-1.7.25.jar
   - lib/org.slf4j-slf4j-log4j12-1.7.25.jar
 Source available at https://github.com/qos-ch/slf4j/tree/v_1.7.25
+------------------------------------------------------------------------------------
+This product bundles the Google Auth Library, which is available under a "3-clause BSD"
+license. For details, see deps/google-auth-library-credentials-0.4.0/LICENSE
+
+Bundled as
+  - lib/com.google.auth-google-auth-library-credentials-0.4.0.jar
+Source available at https://github.com/google/google-auth-library-java/tree/0.4.0
 
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
index 1f437ab..168f8fd 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
@@ -241,6 +241,25 @@ Apache Software License, Version 2.
 - lib/net.jpountz.lz4-lz4-1.3.0.jar [25]
 - lib/org.codehaus.jackson-jackson-core-asl-1.9.11.jar [26]
 - lib/org.codehaus.jackson-jackson-mapper-asl-1.9.11.jar [27]
+- lib/com.google.api.grpc-proto-google-common-protos-0.1.9.jar [28]
+- lib/com.google.code.gson-gson-2.7.jar [29]
+- lib/com.google.instrumentation-instrumentation-api-0.4.3.jar [30]
+- lib/com.squareup.okhttp-okhttp-2.5.0.jar [31]
+- lib/com.squareup.okio-okio-1.6.0.jar [32]
+- lib/io.grpc-grpc-all-1.5.0.jar [33]
+- lib/io.grpc-grpc-auth-1.5.0.jar [33]
+- lib/io.grpc-grpc-context-1.5.0.jar [33]
+- lib/io.grpc-grpc-core-1.5.0.jar [33]
+- lib/io.grpc-grpc-netty-1.5.0.jar [33]
+- lib/io.grpc-grpc-okhttp-1.5.0.jar [33]
+- lib/io.grpc-grpc-protobuf-1.5.0.jar [33]
+- lib/io.grpc-grpc-protobuf-lite-1.5.0.jar [33]
+- lib/io.grpc-grpc-protobuf-nano-1.5.0.jar [33]
+- lib/io.grpc-grpc-stub-1.5.0.jar [33]
+- lib/org.apache.curator-curator-client-4.0.1.jar [34]
+- lib/org.apache.curator-curator-framework-4.0.1.jar [34]
+- lib/org.apache.curator-curator-recipes-4.0.1.jar [34]
+- lib/org.inferred-freebuilder-1.14.9.jar [35]
 
 [1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -269,6 +288,14 @@ Apache Software License, Version 2.
 [25] Source available at https://github.com/lz4/lz4-java/tree/1.3.0
 [26] Source available at https://github.com/codehaus/jackson/tree/1.9
 [27] Source available at https://github.com/codehaus/jackson/tree/1.9
+[28] Source available at https://github.com/googleapis/googleapis
+[29] Source available at https://github.com/google/gson/tree/gson-parent-2.7
+[30] Source available at https://github.com/census-instrumentation/opencensus-java/tree/v0.4.3
+[31] Source available at https://github.com/square/okhttp/tree/parent-2.5.0
+[32] Source available at https://github.com/square/okio/tree/okio-parent-1.6.0
+[33] Source available at https://github.com/grpc/grpc-java/tree/v1.5.0
+[34] Source available at https://github.com/apache/curator/tree/apache-curator-4.0.1
+[35] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
 
 ------------------------------------------------------------------------------------
 lib/io.netty-netty-all-4.1.12.Final.jar bundles some 3rd party dependencies
@@ -372,10 +399,19 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 ------------------------------------------------------------------------------------
 This product bundles Google Protocal Buffers, which is available under a "3-clause BSD"
-license. For details, see deps/protobuf-3.4.0/LICENSE.
+license.
 
 Bundled as lib/com.google.protobuf-protobuf-java-3.4.0.jar
 Source available at https://github.com/google/protobuf/tree/v3.4.0
+For details, see deps/protobuf-3.4.0/LICENSE.
+
+Bundled as lib/com.google.protobuf.nano-protobuf-javanano-3.0.0-alpha-5.jar
+Source available at https://github.com/google/protobuf/tree/3.0.0-pre
+For details, see deps/protobuf-3.0.0/LICENSE.
+
+Bundled as com.google.protobuf-protobuf-java-util-3.3.1.jar
+Source available at https://github.com/google/protobuf/tree/v3.3.1
+For details, see deps/protobuf-3.3.1/LICENSE.
 ------------------------------------------------------------------------------------
 This product bundles the JCP Standard Java Servlet API, which is available under a
 CDDL 1.1 license. For details, see deps/javax.servlet-api-3.1.0/CDDL+GPL-1.1.
@@ -390,4 +426,10 @@ Bundled as
   - lib/org.slf4j-slf4j-api-1.7.25.jar
   - lib/org.slf4j-slf4j-log4j12-1.7.25.jar
 Source available at https://github.com/qos-ch/slf4j/tree/v_1.7.25
+------------------------------------------------------------------------------------
+This product bundles the Google Auth Library, which is available under a "3-clause BSD"
+license. For details, see deps/google-auth-library-credentials-0.4.0/LICENSE
 
+Bundled as
+  - lib/com.google.auth-google-auth-library-credentials-0.4.0.jar
+Source available at https://github.com/google/google-auth-library-java/tree/0.4.0
diff --git a/bookkeeper-dist/src/main/resources/deps/google-auth-library-credentials-0.4.0/LICENSE b/bookkeeper-dist/src/main/resources/deps/google-auth-library-credentials-0.4.0/LICENSE
new file mode 100644
index 0000000..12edf23
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/google-auth-library-credentials-0.4.0/LICENSE
@@ -0,0 +1,28 @@
+Copyright 2014, Google Inc. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+   * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+   * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+
+   * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/bookkeeper-dist/src/main/resources/deps/protobuf-3.0.0/LICENSE b/bookkeeper-dist/src/main/resources/deps/protobuf-3.0.0/LICENSE
new file mode 100644
index 0000000..2dcab42
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/protobuf-3.0.0/LICENSE
@@ -0,0 +1,32 @@
+Copyright 2014, Google Inc.  All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+    * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Code generated by the Protocol Buffer compiler is owned by the owner
+of the input file used when generating it.  This code is not
+standalone and requires a support library to be linked with it.  This
+support library is itself covered by the above license.
diff --git a/bookkeeper-dist/src/main/resources/deps/protobuf-3.3.1/LICENSE b/bookkeeper-dist/src/main/resources/deps/protobuf-3.3.1/LICENSE
new file mode 100644
index 0000000..2dcab42
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/protobuf-3.3.1/LICENSE
@@ -0,0 +1,32 @@
+Copyright 2014, Google Inc.  All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+    * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Code generated by the Protocol Buffer compiler is owned by the owner
+of the input file used when generating it.  This code is not
+standalone and requires a support library to be linked with it.  This
+support library is itself covered by the above license.
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 2080b68..6e3c1b4 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -92,7 +92,10 @@ bookiePort=3181
 # Configure a list of server components to enable and load on a bookie server.
 # This provides the plugin run extra services along with a bookie server.
 #
-# extraServerComponents=
+# NOTE: if bookie fails to load any of extra components configured below, bookie will continue
+#       function by ignoring the components configured below.
+# extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
+extraServerComponents=
 
 #############################################################################
 ## Thread settings
@@ -878,3 +881,26 @@ zkEnableSecurity=false
 
 # The time to backoff when replication worker encounters exceptions on replicating a ledger, in milliseconds.
 # rwRereplicateBackoffMs=5000
+
+
+##################################################################
+##################################################################
+# Settings below are used by stream/table service
+##################################################################
+##################################################################
+
+### Grpc Server ###
+
+# the grpc server port to listen on. default is 4181
+storageserver.grpc.port=4181
+
+### Storage ###
+
+# local storage directories for storing table ranges data (e.g. rocksdb sst files)
+storage.range.store.dirs=data/bookkeeper/ranges
+
+# whether the storage server capable of serving readonly tables. default is false.
+storage.serve.readonly.tables=false
+
+# the cluster controller schedule interval, in milliseconds. default is 30 seconds.
+storage.cluster.controller.schedule.interval.ms=30000
diff --git a/pom.xml b/pom.xml
index 3c35d8d..8a4ef12 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
     <curator.version>4.0.1</curator.version>
     <dropwizard.version>3.1.0</dropwizard.version>
     <finagle.version>6.44.0</finagle.version>
-    <freebuilder.version>1.12.3</freebuilder.version>
+    <freebuilder.version>1.14.9</freebuilder.version>
     <google.code.version>3.0.2</google.code.version>
     <google.errorprone.version>2.1.2</google.errorprone.version>
     <grpc.version>1.5.0</grpc.version>
@@ -362,6 +362,12 @@
         <groupId>io.grpc</groupId>
         <artifactId>grpc-all</artifactId>
         <version>${grpc.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.google.errorprone</groupId>
+            <artifactId>error_prone_annotations</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
 
       <!-- rocksdb dependencies -->
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 63e2679..11bc8af 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.net.BindException;
 import java.net.URI;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -43,20 +42,15 @@ import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
-import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.StorageServer;
 import org.apache.bookkeeper.stream.storage.StorageConstants;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
-import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LocalDLMEmulator;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * A Cluster that runs a few storage nodes.
@@ -141,28 +135,9 @@ public class StreamCluster
     }
 
     private void initializeCluster() throws Exception {
-        try (CuratorFramework client = CuratorFrameworkFactory.newClient(
-            zkEnsemble,
-            new ExponentialBackoffRetry(100, Integer.MAX_VALUE, 10000)
-        )) {
-            client.start();
-
-            ZkClusterMetadataStore store = new ZkClusterMetadataStore(client, zkEnsemble, ZK_METADATA_ROOT_PATH);
-
-            ClusterMetadata metadata;
-            try {
-                metadata = store.getClusterMetadata();
-                log.info("Loaded cluster metadata : \n{}", metadata);
-            } catch (StorageRuntimeException sre) {
-                if (sre.getCause() instanceof KeeperException.NoNodeException) {
-                    log.info("Initializing the stream cluster.");
-                    store.initializeCluster(spec.numServers() * 2);
-                    log.info("Successfully initialized the stream cluster : \n{}", store.getClusterMetadata());
-                } else {
-                    throw sre;
-                }
-            }
-        }
+        new ZkClusterInitializer(zkEnsemble).initializeCluster(
+            URI.create("zk://" + zkEnsemble),
+            spec.numServers() * 2);
 
         // format the bookkeeper cluster
         MetadataDrivers.runFunctionWithMetadataBookieDriver(newBookieConfiguration(zkEnsemble), driver -> {
@@ -210,11 +185,10 @@ public class StreamCluster
                 log.info("Attempting to start storage server at (bookie port = {}, grpc port = {})"
                         + " : bkDir = {}, rangesStoreDir = {}, serveReadOnlyTables = {}",
                     bookiePort, grpcPort, bkDir, rangesStoreDir, spec.serveReadOnlyTable);
-                server = StorageServer.startStorageServer(
+                server = StorageServer.buildStorageServer(
                     serverConf,
                     grpcPort,
-                    spec.numServers() * 2,
-                    Optional.empty());
+                    spec.numServers() * 2);
                 server.start();
                 log.info("Started storage server at (bookie port = {}, grpc port = {})",
                     bookiePort, grpcPort);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index d5ca397..e0b87ef 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -13,6 +13,7 @@
  */
 package org.apache.bookkeeper.stream.server;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
 
 import com.beust.jcommander.JCommander;
@@ -21,15 +22,16 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.UnknownHostException;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.component.ComponentStarter;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponentStack;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog.DLCheckpointStore;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
@@ -37,6 +39,7 @@ import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
 import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
 import org.apache.bookkeeper.stream.server.grpc.GrpcServerSpec;
 import org.apache.bookkeeper.stream.server.service.BookieService;
+import org.apache.bookkeeper.stream.server.service.BookieWatchService;
 import org.apache.bookkeeper.stream.server.service.ClusterControllerService;
 import org.apache.bookkeeper.stream.server.service.CuratorProviderService;
 import org.apache.bookkeeper.stream.server.service.DLNamespaceProviderService;
@@ -58,6 +61,7 @@ import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.distributedlog.DistributedLogConfiguration;
 
 /**
  * A storage server is a server that run storage service and serving rpc requests.
@@ -71,7 +75,7 @@ public class StorageServer {
         private String serverConfigFile;
 
         @Parameter(names = {"-p", "--port"}, description = "Port to listen on for gPRC server")
-        private int port = 3182;
+        private int port = 4181;
 
         @Parameter(names = {"-h", "--help"}, description = "Show this help message")
         private boolean help = false;
@@ -137,11 +141,10 @@ public class StorageServer {
 
         LifecycleComponent storageServer;
         try {
-            storageServer = startStorageServer(
+            storageServer = buildStorageServer(
                 conf,
                 grpcPort,
-                1024,
-                Optional.empty());
+                1024);
         } catch (ConfigurationException e) {
             log.error("Invalid storage configuration", e);
             return ExitCode.INVALID_CONF.code();
@@ -164,11 +167,23 @@ public class StorageServer {
         return ExitCode.OK.code();
     }
 
-    public static LifecycleComponent startStorageServer(CompositeConfiguration conf,
+    public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
+                                                        int grpcPort,
+                                                        int numStorageContainers)
+            throws UnknownHostException, ConfigurationException {
+        return buildStorageServer(conf, grpcPort, numStorageContainers, true, NullStatsLogger.INSTANCE);
+    }
+
+    public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
                                                         int grpcPort,
                                                         int numStorageContainers,
-                                                        Optional<String> instanceName)
+                                                        boolean startBookieAndStartProvider,
+                                                        StatsLogger externalStatsLogger)
         throws ConfigurationException, UnknownHostException {
+
+        LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder()
+            .withName("storage-server");
+
         BookieConfiguration bkConf = BookieConfiguration.of(conf);
         bkConf.validate();
 
@@ -188,42 +203,47 @@ public class StorageServer {
         StorageResources storageResources = StorageResources.create();
 
         // Create the stats provider
-        StatsProviderService statsProviderService = new StatsProviderService(bkConf);
-        StatsLogger rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");
+        StatsLogger rootStatsLogger;
+        if (startBookieAndStartProvider) {
+            StatsProviderService statsProviderService = new StatsProviderService(bkConf);
+            rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");
+            serverBuilder.addComponent(statsProviderService);
+        } else {
+            rootStatsLogger = checkNotNull(externalStatsLogger,
+                "External stats logger is not provided while not starting stats provider");
+        }
 
         // Create the bookie service
-        BookieService bookieService = new BookieService(bkConf, rootStatsLogger);
+        ServerConfiguration bkServerConf;
+        if (startBookieAndStartProvider) {
+            BookieService bookieService = new BookieService(bkConf, rootStatsLogger);
+            serverBuilder.addComponent(bookieService);
+            bkServerConf = bookieService.serverConf();
+        } else {
+            bkServerConf = new ServerConfiguration();
+            bkServerConf.loadConf(bkConf.getUnderlyingConf());
+        }
+
+        // Create the bookie watch service
+        BookieWatchService bkWatchService;
+        {
+            DistributedLogConfiguration dlogConf = new DistributedLogConfiguration();
+            bkWatchService = new BookieWatchService(
+                dlogConf.getEnsembleSize(),
+                bkConf,
+                NullStatsLogger.INSTANCE);
+        }
 
         // Create the curator provider service
         CuratorProviderService curatorProviderService = new CuratorProviderService(
-            bookieService.serverConf(), dlConf, rootStatsLogger.scope("curator"));
+            bkServerConf, dlConf, rootStatsLogger.scope("curator"));
 
         // Create the distributedlog namespace service
         DLNamespaceProviderService dlNamespaceProvider = new DLNamespaceProviderService(
-            bookieService.serverConf(),
+            bkServerConf,
             dlConf,
             rootStatsLogger.scope("dlog"));
 
-        // Create a registration service provider
-        RegistrationServiceProvider regService = new RegistrationServiceProvider(
-            bookieService.serverConf(),
-            dlConf,
-            rootStatsLogger.scope("registration").scope("provider"));
-
-        // Create a cluster controller service
-        ClusterControllerService clusterControllerService = new ClusterControllerService(
-            storageConf,
-            () -> new ClusterControllerImpl(
-                new ZkClusterMetadataStore(
-                    curatorProviderService.get(),
-                    ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
-                    ZK_METADATA_ROOT_PATH),
-                regService.get(),
-                new DefaultStorageContainerController(),
-                new ZkClusterControllerLeaderSelector(curatorProviderService.get(), ZK_METADATA_ROOT_PATH),
-                storageConf),
-            rootStatsLogger.scope("cluster_controller"));
-
         // Create range (stream) store
         RangeStoreBuilder rangeStoreBuilder = RangeStoreBuilder.newBuilder()
             .withStatsLogger(rootStatsLogger.scope("storage"))
@@ -241,7 +261,7 @@ public class StorageServer {
                     storageConf,
                     new ZkClusterMetadataStore(
                         curatorProviderService.get(),
-                        ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
+                        ZKMetadataDriverBase.resolveZkServers(bkServerConf),
                         ZK_METADATA_ROOT_PATH),
                     registry,
                     rootStatsLogger.scope("sc").scope("manager")))
@@ -267,26 +287,44 @@ public class StorageServer {
         GrpcService grpcService = new GrpcService(
             serverConf, serverSpec, rpcStatsLogger);
 
+        // Create a registration service provider
+        RegistrationServiceProvider regService = new RegistrationServiceProvider(
+            bkServerConf,
+            dlConf,
+            rootStatsLogger.scope("registration").scope("provider"));
+
         // Create a registration state service only when service is ready.
         RegistrationStateService regStateService = new RegistrationStateService(
             myEndpoint,
-            bookieService.serverConf(),
+            bkServerConf,
             bkConf,
             regService,
             rootStatsLogger.scope("registration"));
 
+        // Create a cluster controller service
+        ClusterControllerService clusterControllerService = new ClusterControllerService(
+            storageConf,
+            () -> new ClusterControllerImpl(
+                new ZkClusterMetadataStore(
+                    curatorProviderService.get(),
+                    ZKMetadataDriverBase.resolveZkServers(bkServerConf),
+                    ZK_METADATA_ROOT_PATH),
+                regService.get(),
+                new DefaultStorageContainerController(),
+                new ZkClusterControllerLeaderSelector(curatorProviderService.get(), ZK_METADATA_ROOT_PATH),
+                storageConf),
+            rootStatsLogger.scope("cluster_controller"));
+
         // Create all the service stack
-        return LifecycleComponentStack.newBuilder()
-            .withName("storage-server")
-            .addComponent(statsProviderService)     // stats provider
-            .addComponent(bookieService)            // bookie server
+        return serverBuilder
+            .addComponent(bkWatchService)           // service that watches bookies
             .addComponent(curatorProviderService)   // service that provides curator client
             .addComponent(dlNamespaceProvider)      // service that provides dl namespace
-            .addComponent(regService)               // service that provides registration client
-            .addComponent(clusterControllerService) // service that run cluster controller service
             .addComponent(storageService)           // range (stream) store
             .addComponent(grpcService)              // range (stream) server (gRPC)
+            .addComponent(regService)               // service that provides registration client
             .addComponent(regStateService)          // service that manages server state
+            .addComponent(clusterControllerService) // service that run cluster controller service
             .build();
     }
 
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
new file mode 100644
index 0000000..5e6b0b5
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.server;
+
+import java.net.UnknownHostException;
+import org.apache.bookkeeper.common.component.LifecycleComponent;
+import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
+import org.apache.bookkeeper.server.conf.BookieConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+
+/**
+ * This is a {@link ServerLifecycleComponent} to allow run stream storage component as part of bookie server.
+ */
+public class StreamStorageLifecycleComponent extends ServerLifecycleComponent {
+
+    private final LifecycleComponent streamStorage;
+
+    public StreamStorageLifecycleComponent(BookieConfiguration conf, StatsLogger statsLogger)
+            throws UnknownHostException, ConfigurationException {
+        super("stream-storage", conf, statsLogger);
+
+        StorageServerConfiguration ssConf = StorageServerConfiguration.of(conf.getUnderlyingConf());
+
+        this.streamStorage = StorageServer.buildStorageServer(
+            conf.getUnderlyingConf(),
+            ssConf.getGrpcPort(),
+            1024, /* indicator */
+            false,
+            statsLogger.scope("stream"));
+    }
+
+    @Override
+    protected void doStart() {
+        this.streamStorage.start();
+    }
+
+    @Override
+    protected void doStop() {
+        this.streamStorage.stop();
+    }
+
+    @Override
+    protected void doClose() {
+        this.streamStorage.close();
+    }
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
index b6803f2..e3c24c5 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
@@ -21,13 +21,11 @@ import org.apache.commons.configuration.CompositeConfiguration;
  */
 public class BookieConfiguration extends ComponentConfiguration {
 
-    private static final String COMPONENT_PREFIX = "bookie" + DELIMITER;
-
     public static BookieConfiguration of(CompositeConfiguration conf) {
         return new BookieConfiguration(conf);
     }
 
     protected BookieConfiguration(CompositeConfiguration conf) {
-        super(conf, COMPONENT_PREFIX);
+        super(conf, "");
     }
 }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
index a0ef963..bc300c9 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
@@ -24,6 +24,8 @@ public class StorageServerConfiguration extends ComponentConfiguration {
 
     private static final String COMPONENT_PREFIX = "storageserver" + DELIMITER;
 
+    private static final String GRPC_PORT = "grpc.port";
+
     public static StorageServerConfiguration of(CompositeConfiguration conf) {
         return new StorageServerConfiguration(conf);
     }
@@ -31,4 +33,13 @@ public class StorageServerConfiguration extends ComponentConfiguration {
     private StorageServerConfiguration(CompositeConfiguration conf) {
         super(conf, COMPONENT_PREFIX);
     }
+
+    /**
+     * Returns the grpc port that serves requests coming into the stream storage server.
+     *
+     * @return grpc port
+     */
+    public int getGrpcPort() {
+        return getInt(GRPC_PORT, 4181);
+    }
 }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java
new file mode 100644
index 0000000..cfac0da
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.server.service;
+
+import com.google.common.base.Stopwatch;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
+
+/**
+ * A service that watches bookies and wait for minimum number of bookies to be alive.
+ */
+@Slf4j
+public class BookieWatchService
+    extends AbstractLifecycleComponent<BookieConfiguration> {
+
+    private final int minNumBookies;
+
+    public BookieWatchService(int minNumBookies,
+                              BookieConfiguration conf,
+                              StatsLogger statsLogger) {
+        super("bookie-watcher", conf, statsLogger);
+        this.minNumBookies = minNumBookies;
+    }
+
+    @Override
+    protected void doStart() {
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.loadConf(conf.getUnderlyingConf());
+
+        @Cleanup("shutdown") ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+
+        try {
+            MetadataDrivers.runFunctionWithMetadataClientDriver(clientConf, clientDriver -> {
+                try {
+                    waitingForNumBookies(clientDriver.getRegistrationClient(), minNumBookies);
+                } catch (Exception e) {
+                    log.error("Encountered exceptions on waiting {} bookies to be alive", minNumBookies);
+                    throw new RuntimeException("Encountered exceptions on waiting "
+                        + minNumBookies + " bookies to be alive", e);
+                }
+                return (Void) null;
+            }, executorService);
+        } catch (MetadataException | ExecutionException  e) {
+            throw new RuntimeException("Failed to start bookie watch service", e);
+        }
+    }
+
+    private static void waitingForNumBookies(RegistrationClient client, int minNumBookies) throws Exception {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        Set<BookieSocketAddress> bookies = FutureUtils.result(client.getWritableBookies()).getValue();
+        while (bookies.size() < minNumBookies) {
+            TimeUnit.SECONDS.sleep(1);
+            bookies = FutureUtils.result(client.getWritableBookies()).getValue();
+            log.info("Only {} bookies are live since {} seconds elapsed, wait for another 1 second",
+                bookies.size(), stopwatch.elapsed(TimeUnit.SECONDS));
+        }
+    }
+
+    @Override
+    protected void doStop() {}
+
+    @Override
+    protected void doClose() {}
+
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
index 7d404f0..9bd6ab4 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.stream.server.service;
 
 import java.io.IOException;
 import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
@@ -33,6 +34,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 /**
  * A service to provide a curator client.
  */
+@Slf4j
 public class CuratorProviderService
     extends AbstractLifecycleComponent<DLConfiguration>
     implements Supplier<CuratorFramework> {
@@ -57,6 +59,7 @@ public class CuratorProviderService
     @Override
     protected void doStart() {
         curatorClient.start();
+        log.info("Provided curator clients to zookeeper {}.", zkServers);
     }
 
     @Override
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
index e0359ce..4550df5 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
@@ -69,6 +69,7 @@ public class DLNamespaceProviderService
     }
 
     private final ServerConfiguration bkServerConf;
+    @Getter
     private final DistributedLogConfiguration dlConf;
     private final DynamicDistributedLogConfiguration dlDynConf;
     @Getter
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
index 8db59fd..f1cda91 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
@@ -105,6 +105,8 @@ public class RegistrationServiceProvider
         }
     }
 
+
+
     @Override
     protected void doStop() {
     }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
new file mode 100644
index 0000000..61e5fa3
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.api.cluster;
+
+import java.net.URI;
+
+/**
+ * Initializing cluster metadata.
+ */
+public interface ClusterInitializer {
+
+    /**
+     * Retrieves whether the initializer thinks that it can initialize the metadata service
+     * specified by the given {@code metadataServiceUri}. Typically the implementations will
+     * return <tt>true</tt> if they understand the subprotocol specified in the URI and
+     * <tt>false</tt> if they do not.
+     *
+     * @param metatadataServiceUri the metadata service uri
+     * @return <tt>true</tt> if the implementation understands the given URI; <tt>false</tt> otherwise.
+     */
+    boolean acceptsURI(URI metatadataServiceUri);
+
+    /**
+     * Create a new cluster under metadata service specified by {@code metadataServiceUri}.
+     *
+     * @param metadataServiceUri metadata service uri
+     * @param numStorageContainers number storage containers
+     */
+    void initializeCluster(URI metadataServiceUri, int numStorageContainers);
+
+}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
index 935552a..d46f633 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.stream.storage.api.cluster;
 
+import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
@@ -29,12 +30,18 @@ import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
  */
 public interface ClusterMetadataStore extends AutoCloseable {
 
+
+    default void initializeCluster(int numStorageContainers) {
+        initializeCluster(numStorageContainers, Optional.empty());
+    }
+
     /**
      * Initialize the cluster metadata with the provided <i>numStorageContainers</i>.
      *
      * @param numStorageContainers number of storage containers.
+     * @param segmentStorePath segment store path
      */
-    void initializeCluster(int numStorageContainers);
+    void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath);
 
     /**
      * Get the current cluster assignment data.
diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml
index ae4a5e8..b178fc7 100644
--- a/stream/storage/impl/pom.xml
+++ b/stream/storage/impl/pom.xml
@@ -26,11 +26,6 @@
   <artifactId>stream-storage-service-impl</artifactId>
   <name>Apache BookKeeper :: Stream Storage :: Storage :: Impl</name>
 
-  <properties>
-    <!-- dependencies -->
-    <helix-core.version>0.6.7</helix-core.version>
-  </properties>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
@@ -52,11 +47,6 @@
       <artifactId>curator-recipes</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.helix</groupId>
-      <artifactId>helix-core</artifactId>
-      <version>${helix-core.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.distributedlog</groupId>
       <artifactId>distributedlog-core</artifactId>
       <version>${project.parent.version}</version>
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
index da74867..4313ce1 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
@@ -20,11 +20,10 @@ package org.apache.bookkeeper.stream.storage.impl.cluster;
 
 import com.google.common.collect.Maps;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
-import lombok.AccessLevel;
 import lombok.Data;
-import lombok.Getter;
 import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
 import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
 import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
@@ -55,7 +54,8 @@ public class InMemClusterMetadataStore implements ClusterMetadataStore {
     }
 
     @Override
-    public synchronized void initializeCluster(int numStorageContainers) {
+    public synchronized void initializeCluster(int numStorageContainers,
+                                               Optional<String> segmentStorePath) {
         this.metadata = ClusterMetadata.newBuilder()
             .setNumStorageContainers(numStorageContainers)
             .build();
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
new file mode 100644
index 0000000..832f15d
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
+
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
+import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * ZooKeeper Based Cluster Initializer.
+ */
+@Slf4j
+public class ZkClusterInitializer implements ClusterInitializer  {
+
+    private final String zkExternalConnectString;
+
+    public ZkClusterInitializer(String zkServers) {
+        this.zkExternalConnectString = zkServers;
+    }
+
+    @Override
+    public boolean acceptsURI(URI metadataServiceUri) {
+        return metadataServiceUri.getScheme().toLowerCase().startsWith("zk");
+    }
+
+    @Override
+    public void initializeCluster(URI metadataServiceUri, int numStorageContainers) {
+        String zkInternalConnectString = ZKMetadataDriverBase.getZKServersFromServiceUri(metadataServiceUri);
+        // 1) `zkExternalConnectString` are the public endpoints, where the tool can interact with.
+        //    It allows the tools running outside of the cluster. It is useful for being used in dockerized environment.
+        // 2) `zkInternalConnectString` are the internal endpoints, where the services can interact with.
+        //    It is used by dlog to bind a namespace.
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(
+            zkExternalConnectString,
+            new ExponentialBackoffRetry(100, Integer.MAX_VALUE, 10000)
+        )) {
+            client.start();
+
+            ZkClusterMetadataStore store =
+                new ZkClusterMetadataStore(client, zkInternalConnectString, ZK_METADATA_ROOT_PATH);
+
+            ClusterMetadata metadata;
+            try {
+                metadata = store.getClusterMetadata();
+                log.info("Loaded cluster metadata : \n{}", metadata);
+            } catch (StorageRuntimeException sre) {
+                if (sre.getCause() instanceof KeeperException.NoNodeException) {
+                    log.info("Initializing the stream cluster with {} storage containers with segment store path {}.",
+                        numStorageContainers);
+
+                    String ledgersPath = metadataServiceUri.getPath();
+                    Optional<String> segmentStorePath;
+                    if (Strings.isNullOrEmpty(ledgersPath) || "/" == ledgersPath) {
+                        segmentStorePath = Optional.empty();
+                    } else {
+                        segmentStorePath = Optional.of(ledgersPath);
+                    }
+
+                    store.initializeCluster(numStorageContainers, segmentStorePath);
+                    log.info("Successfully initialized the stream cluster : \n{}", store.getClusterMetadata());
+                } else {
+                    throw sre;
+                }
+            }
+        }
+
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
index bcf70c3..59af968 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
@@ -29,6 +29,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
@@ -92,7 +93,7 @@ public class ZkClusterMetadataStore implements ClusterMetadataStore {
     }
 
     @Override
-    public void initializeCluster(int numStorageContainers) {
+    public void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
         ClusterMetadata metadata = ClusterMetadata.newBuilder()
             .setNumStorageContainers(numStorageContainers)
             .build();
@@ -101,7 +102,7 @@ public class ZkClusterMetadataStore implements ClusterMetadataStore {
         try {
             // we are using dlog for the storage backend, so we need to initialize the dlog namespace
             BKDLConfig dlogConfig = new BKDLConfig(
-                zkServers, getSegmentsRootPath(zkRootPath));
+                zkServers, segmentStorePath.orElse(getSegmentsRootPath(zkRootPath)));
             DLMetadata dlogMetadata = DLMetadata.create(dlogConfig);
 
             client.transaction()
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
index 4d2828b..73d6e17 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.when;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -82,7 +83,7 @@ public class ClusterControllerLeaderImplTest {
         ClusterMetadataStore originalStore = metadataStore;
         this.metadataStore = new ClusterMetadataStore() {
             @Override
-            public void initializeCluster(int numStorageContainers) {
+            public void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
                 originalStore.initializeCluster(numStorageContainers);
             }
 
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
index 874ec2a..d70b4f2 100644
--- a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
@@ -20,11 +20,14 @@ package org.apache.bookkeeper.tests.containers;
 
 import static java.time.temporal.ChronoUnit.SECONDS;
 
+import com.google.common.base.Strings;
 import java.net.URI;
 import java.time.Duration;
 import java.util.Objects;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.tests.DockerUtils;
 import org.apache.bookkeeper.tests.containers.wait.HttpWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
 
 /**
  * Test Container for Bookies.
@@ -40,13 +43,16 @@ public class BookieContainer<SELF extends BookieContainer<SELF>> extends ChaosCo
 
     private final String hostname;
     private final String metadataServiceUri;
+    private final String extraServerComponents;
 
     public BookieContainer(String clusterName,
                            String hostname,
-                           String metadataServiceUri) {
+                           String metadataServiceUri,
+                           String extraServerComponents) {
         super(clusterName, IMAGE_NAME);
         this.hostname = hostname;
         this.metadataServiceUri = metadataServiceUri;
+        this.extraServerComponents = extraServerComponents;
     }
 
     @Override
@@ -54,6 +60,14 @@ public class BookieContainer<SELF extends BookieContainer<SELF>> extends ChaosCo
         return clusterName + "-" + hostname;
     }
 
+    public String getExternalGrpcEndpointStr() {
+        return getContainerIpAddress() + ":" + getMappedPort(BOOKIE_GRPC_PORT);
+    }
+
+    public String getInternalGrpcEndpointStr() {
+        return DockerUtils.getContainerIP(dockerClient, containerId) + ":" + BOOKIE_GRPC_PORT;
+    }
+
     @Override
     protected void configure() {
         addExposedPorts(
@@ -65,20 +79,28 @@ public class BookieContainer<SELF extends BookieContainer<SELF>> extends ChaosCo
         addEnv("BK_httpServerPort", "" + BOOKIE_HTTP_PORT);
         addEnv("BK_metadataServiceUri", metadataServiceUri);
         addEnv("BK_useHostNameAsBookieID", "true");
+        addEnv("BK_extraServerComponents", extraServerComponents);
         if (metadataServiceUri.toLowerCase().startsWith("zk")) {
             URI uri = URI.create(metadataServiceUri);
             addEnv("BK_zkServers", uri.getAuthority());
             addEnv("BK_zkLedgersRootPath", uri.getPath());
         }
+        // grpc port
+        addEnv("BK_storageserver.grpc.port", "" + BOOKIE_GRPC_PORT);
     }
 
     @Override
     public void start() {
-        this.waitStrategy = new HttpWaitStrategy()
-            .forPath("/heartbeat")
-            .forStatusCode(200)
-            .forPort(BOOKIE_HTTP_PORT)
-            .withStartupTimeout(Duration.of(60, SECONDS));
+        if (Strings.isNullOrEmpty(extraServerComponents)) {
+            this.waitStrategy = new HttpWaitStrategy()
+                .forPath("/heartbeat")
+                .forStatusCode(200)
+                .forPort(BOOKIE_HTTP_PORT)
+                .withStartupTimeout(Duration.of(60, SECONDS));
+        } else {
+            this.waitStrategy = new HostPortWaitStrategy()
+                .withStartupTimeout(Duration.of(300, SECONDS));
+        }
         this.withCreateContainerCmdModifier(createContainerCmd -> {
             createContainerCmd.withHostName(hostname);
             createContainerCmd.withName(getContainerName());
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
index 36d5d86..ebc0c87 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
@@ -33,6 +33,7 @@ import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.tests.integration.topologies.BKCluster;
+import org.apache.bookkeeper.tests.integration.topologies.BKClusterSpec;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -50,15 +51,31 @@ public abstract class BookKeeperClusterTestBase {
 
     @BeforeClass
     public static void setupCluster() throws Exception {
-        bkCluster = new BKCluster(RandomStringUtils.randomAlphabetic(8), 0);
+        BKClusterSpec spec = BKClusterSpec.builder()
+            .clusterName(RandomStringUtils.randomAlphabetic(8))
+            .numBookies(0)
+            .build();
+
+        setupCluster(spec);
+    }
+
+    protected static void setupCluster(BKClusterSpec spec) throws Exception {
+        log.info("Setting up cluster {} with {} bookies : extraServerComponents = {}",
+            spec.clusterName(), spec.numBookies(), spec.extraServerComponents());
+
+        bkCluster = BKCluster.forSpec(spec);
         bkCluster.start();
 
+        log.info("Cluster {} is setup", spec.clusterName());
+
         metadataServiceUri = URI.create(bkCluster.getExternalServiceUri());
         ClientConfiguration conf = new ClientConfiguration()
             .setMetadataServiceUri(metadataServiceUri.toString());
         executor = Executors.newSingleThreadScheduledExecutor();
         metadataClientDriver = MetadataDrivers.getClientDriver(metadataServiceUri);
         metadataClientDriver.initialize(conf, executor, NullStatsLogger.INSTANCE, Optional.empty());
+
+        log.info("Initialized metadata client driver : {}", metadataServiceUri);
     }
 
     @AfterClass
@@ -74,7 +91,7 @@ public abstract class BookKeeperClusterTestBase {
         }
     }
 
-    private boolean findIfBookieRegistered(String bookieName) throws Exception {
+    private static boolean findIfBookieRegistered(String bookieName) throws Exception {
         Set<BookieSocketAddress> bookies =
             FutureUtils.result(metadataClientDriver.getRegistrationClient().getWritableBookies()).getValue();
         Optional<BookieSocketAddress> registered =
@@ -82,7 +99,7 @@ public abstract class BookKeeperClusterTestBase {
         return registered.isPresent();
     }
 
-    protected void waitUntilBookieUnregistered(String bookieName) throws Exception {
+    protected static void waitUntilBookieUnregistered(String bookieName) throws Exception {
         Stopwatch sw = Stopwatch.createStarted();
         while (findIfBookieRegistered(bookieName)) {
             TimeUnit.MILLISECONDS.sleep(1000);
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/LocationClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
similarity index 83%
rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/LocationClientTest.java
rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
index bf31321..42b428e 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/LocationClientTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
 import static org.junit.Assert.assertEquals;
@@ -33,25 +33,27 @@ import org.apache.bookkeeper.common.util.Revisioned;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
  * Integration test for location test.
  */
 @Slf4j
-public class LocationClientTest extends StorageServerTestBase {
+public class LocationClientTest extends StreamClusterTestBase {
 
     private OrderedScheduler scheduler;
     private LocationClient client;
 
-    @Override
-    protected void doSetup() throws Exception {
+    @Before
+    public void setup() {
         scheduler = OrderedScheduler.newSchedulerBuilder()
             .name("location-client-test")
             .numThreads(1)
             .build();
         StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
+            .addEndpoints(getExsternalStreamEndpoints().toArray(new Endpoint[getNumBookies()]))
             .usePlaintext(true)
             .build();
         client = new LocationClientImpl(
@@ -59,8 +61,8 @@ public class LocationClientTest extends StorageServerTestBase {
             scheduler);
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != client) {
             client.close();
         }
@@ -80,13 +82,15 @@ public class LocationClientTest extends StorageServerTestBase {
         assertEquals(StatusCode.SUCCESS, oneResponse.getStatusCode());
 
         Endpoint endpoint = oneResponse.getEndpoint().getRwEndpoint();
-        log.info("Current cluster endpoints = {}", cluster.getRpcEndpoints());
+        log.info("Current cluster endpoints = {}", getInternalStreamEndpoints());
         log.info("Response : rw endpoint = {}", endpoint);
-        assertTrue(cluster.getRpcEndpoints().contains(endpoint));
+        assertTrue(getInternalStreamEndpoints().contains(endpoint));
 
         assertEquals(1, oneResponse.getEndpoint().getRoEndpointCount());
         endpoint = oneResponse.getEndpoint().getRoEndpoint(0);
         log.info("Response : ro endpoint = {}", endpoint);
-        assertTrue(cluster.getRpcEndpoints().contains(endpoint));
+        assertTrue(getInternalStreamEndpoints().contains(endpoint));
     }
+
+
 }
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
new file mode 100644
index 0000000..b86cc72
--- /dev/null
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.integration.stream;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase;
+import org.apache.bookkeeper.tests.integration.topologies.BKClusterSpec;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Similar as {@link org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase},
+ * but enabled stream storage for testing stream storage related features.
+ */
+@Slf4j
+public abstract class StreamClusterTestBase extends BookKeeperClusterTestBase {
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        BKClusterSpec spec = BKClusterSpec.builder()
+            .clusterName(RandomStringUtils.randomAlphabetic(8))
+            .numBookies(3)
+            .extraServerComponents("org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent")
+            .build();
+        BookKeeperClusterTestBase.setupCluster(spec);
+    }
+
+    @AfterClass
+    public static void teardownCluster() {
+        BookKeeperClusterTestBase.teardownCluster();
+    }
+
+    protected static int getNumBookies() {
+        return bkCluster.getBookieContainers().size();
+    }
+
+    protected static List<Endpoint> getExsternalStreamEndpoints() {
+        return bkCluster.getBookieContainers().values().stream()
+            .map(container ->
+                NetUtils.parseEndpoint(container.getExternalGrpcEndpointStr()))
+            .collect(Collectors.toList());
+    }
+
+    protected static List<Endpoint> getInternalStreamEndpoints() {
+        return bkCluster.getBookieContainers().values().stream()
+            .map(container ->
+                NetUtils.parseEndpoint(container.getInternalGrpcEndpointStr()))
+            .collect(Collectors.toList());
+    }
+
+
+}
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
index c1e9e84..80ae906 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
@@ -18,9 +18,11 @@
 
 package org.apache.bookkeeper.tests.integration.topologies;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -29,6 +31,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
 import org.apache.bookkeeper.tests.containers.BookieContainer;
 import org.apache.bookkeeper.tests.containers.MetadataStoreContainer;
 import org.apache.bookkeeper.tests.containers.ZKContainer;
@@ -40,21 +44,37 @@ import org.testcontainers.containers.Network;
 @Slf4j
 public class BKCluster {
 
+    /**
+     * BookKeeper Cluster Spec.
+     *
+     * @param spec bookkeeper cluster spec.
+     * @return the built bookkeeper cluster
+     */
+    public static BKCluster forSpec(BKClusterSpec spec) {
+        return new BKCluster(spec);
+    }
+
+    private final BKClusterSpec spec;
     @Getter
     private final String clusterName;
     private final Network network;
     private final MetadataStoreContainer metadataContainer;
     private final Map<String, BookieContainer> bookieContainers;
     private final int numBookies;
+    private final String extraServerComponents;
+    private volatile boolean enableContainerLog;
 
-    public BKCluster(String clusterName, int numBookies) {
-        this.clusterName = clusterName;
+    private BKCluster(BKClusterSpec spec) {
+        this.spec = spec;
+        this.clusterName = spec.clusterName();
         this.network = Network.newNetwork();
         this.metadataContainer = (MetadataStoreContainer) new ZKContainer(clusterName)
             .withNetwork(network)
             .withNetworkAliases(ZKContainer.HOST_NAME);
         this.bookieContainers = Maps.newTreeMap();
-        this.numBookies = numBookies;
+        this.numBookies = spec.numBookies();
+        this.extraServerComponents = spec.extraServerComponents();
+        this.enableContainerLog = spec.enableContainerLog();
     }
 
     public String getExternalServiceUri() {
@@ -67,10 +87,28 @@ public class BKCluster {
 
     public void start() throws Exception {
         // start the metadata store
+        if (enableContainerLog) {
+            this.metadataContainer.tailContainerLog();
+        }
         this.metadataContainer.start();
+        log.info("Successfully started metadata store container.");
 
         // init a new cluster
         initNewCluster(metadataContainer.getExternalServiceUri());
+        log.info("Successfully initialized metadata service uri : {}",
+            metadataContainer.getExternalServiceUri());
+
+        if (!Strings.isNullOrEmpty(extraServerComponents)) {
+            int numStorageContainers = numBookies > 0 ? 2 * numBookies : 8;
+            // initialize the stream storage.
+            new ZkClusterInitializer(
+                ZKMetadataDriverBase.getZKServersFromServiceUri(URI.create(metadataContainer.getExternalServiceUri()))
+            ).initializeCluster(
+                URI.create(metadataContainer.getInternalServiceUri()),
+                numStorageContainers);
+            log.info("Successfully initialized stream storage metadata with {} storage containers",
+                numStorageContainers);
+        }
 
         // create bookies
         createBookies("bookie", numBookies);
@@ -123,20 +161,36 @@ public class BKCluster {
         return container;
     }
 
-    public synchronized BookieContainer createBookie(String bookieName) {
-        BookieContainer container = getBookie(bookieName);
-        if (null == container) {
-            container = (BookieContainer) new BookieContainer(clusterName, bookieName, ZKContainer.SERVICE_URI)
-                .withNetwork(network)
-                .withNetworkAliases(bookieName);
+    public BookieContainer createBookie(String bookieName) {
+        boolean shouldStart = false;
+        BookieContainer container;
+        synchronized (this) {
+            container = getBookie(bookieName);
+            if (null == container) {
+                shouldStart = true;
+                log.info("Creating bookie {}", bookieName);
+                container = (BookieContainer) new BookieContainer(
+                    clusterName, bookieName, ZKContainer.SERVICE_URI, extraServerComponents
+                ).withNetwork(network).withNetworkAliases(bookieName);
+                if (enableContainerLog) {
+                    container.tailContainerLog();
+                }
+
+                log.info("Created bookie {}", bookieName);
+                bookieContainers.put(bookieName, container);
+            }
+        }
+
+        if (shouldStart) {
+            log.info("Starting bookie {}", bookieName);
             container.start();
-            bookieContainers.put(bookieName, container);
         }
         return container;
     }
 
-    public synchronized Map<String, BookieContainer> createBookies(String bookieNamePrefix, int numBookies)
+    public Map<String, BookieContainer> createBookies(String bookieNamePrefix, int numBookies)
             throws Exception {
+        log.info("Creating {} bookies with bookie name prefix '{}'", numBookies, bookieNamePrefix);
         List<CompletableFuture<Void>> startFutures = Lists.newArrayListWithExpectedSize(numBookies);
         Map<String, BookieContainer> containers = Maps.newHashMap();
         for (int i = 0; i < numBookies; i++) {
@@ -144,6 +198,7 @@ public class BKCluster {
             startFutures.add(
                 CompletableFuture.runAsync(() -> {
                     String bookieName = String.format("%s-%03d", bookieNamePrefix, idx);
+                    log.info("Starting bookie {} at cluster {}", bookieName, clusterName);
                     BookieContainer container = createBookie(bookieName);
                     synchronized (containers) {
                         containers.put(bookieName, container);
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java
new file mode 100644
index 0000000..e919d1a
--- /dev/null
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.integration.topologies;
+
+import lombok.Builder;
+import lombok.Builder.Default;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+/**
+ * Spec to build {@link BKCluster}.
+ */
+@Builder
+@Accessors(fluent = true)
+@Getter
+@Setter
+public class BKClusterSpec {
+
+    /**
+     * Returns the cluster name.
+     *
+     * @return the cluster name.
+     */
+    String clusterName;
+
+    /**
+     * Returns the number of bookies.
+     *
+     * @return the number of bookies;
+     */
+    @Default
+    int numBookies = 0;
+
+    /**
+     * Returns the extra server components.
+     *
+     * @return the extra server components.
+     */
+    @Default
+    String extraServerComponents = "";
+
+    /**
+     * Returns the flag whether to enable/disable container log.
+     *
+     * @return the flag whether to enable/disable container log.
+     */
+    @Default
+    boolean enableContainerLog = false;
+
+}

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.