You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/05/22 18:43:44 UTC

[GitHub] sijie closed pull request #1422: [table service] start table service as an extra component of bookie

sijie closed pull request #1422: [table service] start table service as an extra component of bookie
URL: https://github.com/apache/bookkeeper/pull/1422
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-dist/all/pom.xml b/bookkeeper-dist/all/pom.xml
index ef25654c9..eb69948e6 100644
--- a/bookkeeper-dist/all/pom.xml
+++ b/bookkeeper-dist/all/pom.xml
@@ -92,6 +92,13 @@
       <version>${project.version}</version>
     </dependency>
 
+    <!-- stream.storage -->
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-server</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <!-- bookkeeper benchmark -->
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
diff --git a/bookkeeper-dist/server/pom.xml b/bookkeeper-dist/server/pom.xml
index 5ae8fa1f4..7b00dbeaf 100644
--- a/bookkeeper-dist/server/pom.xml
+++ b/bookkeeper-dist/server/pom.xml
@@ -76,6 +76,13 @@
       <version>${project.version}</version>
     </dependency>
 
+    <!-- stream.storage -->
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-server</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <!-- slf4j binding -->
     <dependency>
       <groupId>org.slf4j</groupId>
diff --git a/bookkeeper-dist/src/assemble/bin-all.xml b/bookkeeper-dist/src/assemble/bin-all.xml
index fd45d673e..4a7c4dc32 100644
--- a/bookkeeper-dist/src/assemble/bin-all.xml
+++ b/bookkeeper-dist/src/assemble/bin-all.xml
@@ -54,11 +54,14 @@
       <directory>../src/main/resources/deps</directory>
       <outputDirectory>/deps</outputDirectory>
       <includes>
+        <include>google-auth-library-credentials-0.4.0/LICENSE</include>
         <include>javax.servlet-api-3.1.0/CDDL+GPL-1.1</include>
         <include>jsr-305/LICENSE</include>
         <include>netty-3.10.1.Final/*</include>
         <include>netty-4.1.12.Final/*</include>
         <include>paranamer-2.8/LICENSE.txt</include>
+        <include>protobuf-3.0.0/LICENSE</include>
+        <include>protobuf-3.3.1/LICENSE</include>
         <include>protobuf-3.4.0/LICENSE</include>
         <include>scala-library-2.11.7/LICENSE.md</include>
         <include>scala-parser-combinators_2.11-1.0.4/LICENSE.md</include>
diff --git a/bookkeeper-dist/src/assemble/bin-server.xml b/bookkeeper-dist/src/assemble/bin-server.xml
index 7bbd3b2cb..ce5a64627 100644
--- a/bookkeeper-dist/src/assemble/bin-server.xml
+++ b/bookkeeper-dist/src/assemble/bin-server.xml
@@ -49,8 +49,11 @@
       <directory>../src/main/resources/deps</directory>
       <outputDirectory>/deps</outputDirectory>
       <includes>
+        <include>google-auth-library-credentials-0.4.0/LICENSE</include>
         <include>javax.servlet-api-3.1.0/CDDL+GPL-1.1</include>
         <include>netty-4.1.12.Final/*</include>
+        <include>protobuf-3.0.0/LICENSE</include>
+        <include>protobuf-3.3.1/LICENSE</include>
         <include>protobuf-3.4.0/LICENSE</include>
         <include>slf4j-1.7.25/LICENSE.txt</include>
       </includes>
@@ -85,6 +88,7 @@
       <!-- Include 'groupId' in the dependencies Jar names to better identify the provenance of the jar -->
       <outputFileNameMapping>${artifact.groupId}-${artifact.artifactId}-${artifact.version}${dashClassifier?}.${artifact.extension}</outputFileNameMapping>
       <excludes>
+        <exclude>com.google.code.findbugs:jsr305</exclude>
         <!-- All these dependencies are already included in netty-all -->
         <exclude>io.netty:netty-buffer</exclude>
         <exclude>io.netty:netty-codec</exclude>
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
index 7f523dbe2..289916d93 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
@@ -276,6 +276,25 @@ Apache Software License, Version 2.
 - lib/net.jpountz.lz4-lz4-1.3.0.jar [38]
 - lib/org.codehaus.jackson-jackson-core-asl-1.9.11.jar [39]
 - lib/org.codehaus.jackson-jackson-mapper-asl-1.9.11.jar [40]
+- lib/com.google.api.grpc-proto-google-common-protos-0.1.9.jar [41]
+- lib/com.google.code.gson-gson-2.7.jar [42]
+- lib/com.google.instrumentation-instrumentation-api-0.4.3.jar [43]
+- lib/com.squareup.okhttp-okhttp-2.5.0.jar [44]
+- lib/com.squareup.okio-okio-1.6.0.jar [45]
+- lib/io.grpc-grpc-all-1.5.0.jar [46]
+- lib/io.grpc-grpc-auth-1.5.0.jar [46]
+- lib/io.grpc-grpc-context-1.5.0.jar [46]
+- lib/io.grpc-grpc-core-1.5.0.jar [46]
+- lib/io.grpc-grpc-netty-1.5.0.jar [46]
+- lib/io.grpc-grpc-okhttp-1.5.0.jar [46]
+- lib/io.grpc-grpc-protobuf-1.5.0.jar [46]
+- lib/io.grpc-grpc-protobuf-lite-1.5.0.jar [46]
+- lib/io.grpc-grpc-protobuf-nano-1.5.0.jar [46]
+- lib/io.grpc-grpc-stub-1.5.0.jar [46]
+- lib/org.apache.curator-curator-client-4.0.1.jar [47]
+- lib/org.apache.curator-curator-framework-4.0.1.jar [47]
+- lib/org.apache.curator-curator-recipes-4.0.1.jar [47]
+- lib/org.inferred-freebuilder-1.14.9.jar [48]
 
 [1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -316,6 +335,15 @@ Apache Software License, Version 2.
 [38] Source available at https://github.com/lz4/lz4-java/tree/1.3.0
 [39] Source available at https://github.com/codehaus/jackson/tree/1.9
 [40] Source available at https://github.com/codehaus/jackson/tree/1.9
+[41] Source available at https://github.com/googleapis/googleapis
+[42] Source available at https://github.com/google/gson/tree/gson-parent-2.7
+[43] Source available at https://github.com/census-instrumentation/opencensus-java/tree/v0.4.3
+[44] Source available at https://github.com/square/okhttp/tree/parent-2.5.0
+[45] Source available at https://github.com/square/okio/tree/okio-parent-1.6.0
+[46] Source available at https://github.com/grpc/grpc-java/tree/v1.5.0
+[47] Source available at https://github.com/apache/curator/tree/apache-curator-4.0.1
+[48] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
+
 
 ------------------------------------------------------------------------------------
 lib/io.netty-netty-3.10.1.Final.jar contains the extensions to Java Collections Framework which has
@@ -459,10 +487,20 @@ Bundled as lib/com.google.code.findbugs-jsr305-3.0.2.jar
 Source available at https://storage.googleapis.com/google-code-archive-source/v2/code.google.com/jsr-305/source-archive.zip
 ------------------------------------------------------------------------------------
 This product bundles Google Protocal Buffers, which is available under a "3-clause BSD"
-license. For details, see deps/protobuf-3.4.0/LICENSE.
+license.
 
 Bundled as lib/com.google.protobuf-protobuf-java-3.4.0.jar
 Source available at https://github.com/google/protobuf/tree/v3.4.0
+For details, see deps/protobuf-3.4.0/LICENSE.
+
+Bundled as lib/com.google.protobuf.nano-protobuf-javanano-3.0.0-alpha-5.jar
+Source available at https://github.com/google/protobuf/tree/3.0.0-pre
+For details, see deps/protobuf-3.0.0/LICENSE.
+
+Bundled as com.google.protobuf-protobuf-java-util-3.3.1.jar
+Source available at https://github.com/google/protobuf/tree/v3.3.1
+For details, see deps/protobuf-3.3.1/LICENSE.
+
 ------------------------------------------------------------------------------------
 This product bundles Paranamer, which is available under a "3-clause BSD" license.
 For details, see deps/paranamer-2.8/LICENSE.txt.
@@ -501,4 +539,11 @@ Bundled as
   - lib/org.slf4j-slf4j-api-1.7.25.jar
   - lib/org.slf4j-slf4j-log4j12-1.7.25.jar
 Source available at https://github.com/qos-ch/slf4j/tree/v_1.7.25
+------------------------------------------------------------------------------------
+This product bundles the Google Auth Library, which is available under a "3-clause BSD"
+license. For details, see deps/google-auth-library-credentials-0.4.0/LICENSE
+
+Bundled as
+  - lib/com.google.auth-google-auth-library-credentials-0.4.0.jar
+Source available at https://github.com/google/google-auth-library-java/tree/0.4.0
 
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
index 1f437abec..168f8fd26 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
@@ -241,6 +241,25 @@ Apache Software License, Version 2.
 - lib/net.jpountz.lz4-lz4-1.3.0.jar [25]
 - lib/org.codehaus.jackson-jackson-core-asl-1.9.11.jar [26]
 - lib/org.codehaus.jackson-jackson-mapper-asl-1.9.11.jar [27]
+- lib/com.google.api.grpc-proto-google-common-protos-0.1.9.jar [28]
+- lib/com.google.code.gson-gson-2.7.jar [29]
+- lib/com.google.instrumentation-instrumentation-api-0.4.3.jar [30]
+- lib/com.squareup.okhttp-okhttp-2.5.0.jar [31]
+- lib/com.squareup.okio-okio-1.6.0.jar [32]
+- lib/io.grpc-grpc-all-1.5.0.jar [33]
+- lib/io.grpc-grpc-auth-1.5.0.jar [33]
+- lib/io.grpc-grpc-context-1.5.0.jar [33]
+- lib/io.grpc-grpc-core-1.5.0.jar [33]
+- lib/io.grpc-grpc-netty-1.5.0.jar [33]
+- lib/io.grpc-grpc-okhttp-1.5.0.jar [33]
+- lib/io.grpc-grpc-protobuf-1.5.0.jar [33]
+- lib/io.grpc-grpc-protobuf-lite-1.5.0.jar [33]
+- lib/io.grpc-grpc-protobuf-nano-1.5.0.jar [33]
+- lib/io.grpc-grpc-stub-1.5.0.jar [33]
+- lib/org.apache.curator-curator-client-4.0.1.jar [34]
+- lib/org.apache.curator-curator-framework-4.0.1.jar [34]
+- lib/org.apache.curator-curator-recipes-4.0.1.jar [34]
+- lib/org.inferred-freebuilder-1.14.9.jar [35]
 
 [1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -269,6 +288,14 @@ Apache Software License, Version 2.
 [25] Source available at https://github.com/lz4/lz4-java/tree/1.3.0
 [26] Source available at https://github.com/codehaus/jackson/tree/1.9
 [27] Source available at https://github.com/codehaus/jackson/tree/1.9
+[28] Source available at https://github.com/googleapis/googleapis
+[29] Source available at https://github.com/google/gson/tree/gson-parent-2.7
+[30] Source available at https://github.com/census-instrumentation/opencensus-java/tree/v0.4.3
+[31] Source available at https://github.com/square/okhttp/tree/parent-2.5.0
+[32] Source available at https://github.com/square/okio/tree/okio-parent-1.6.0
+[33] Source available at https://github.com/grpc/grpc-java/tree/v1.5.0
+[34] Source available at https://github.com/apache/curator/tree/apache-curator-4.0.1
+[35] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
 
 ------------------------------------------------------------------------------------
 lib/io.netty-netty-all-4.1.12.Final.jar bundles some 3rd party dependencies
@@ -372,10 +399,19 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 ------------------------------------------------------------------------------------
 This product bundles Google Protocal Buffers, which is available under a "3-clause BSD"
-license. For details, see deps/protobuf-3.4.0/LICENSE.
+license.
 
 Bundled as lib/com.google.protobuf-protobuf-java-3.4.0.jar
 Source available at https://github.com/google/protobuf/tree/v3.4.0
+For details, see deps/protobuf-3.4.0/LICENSE.
+
+Bundled as lib/com.google.protobuf.nano-protobuf-javanano-3.0.0-alpha-5.jar
+Source available at https://github.com/google/protobuf/tree/3.0.0-pre
+For details, see deps/protobuf-3.0.0/LICENSE.
+
+Bundled as com.google.protobuf-protobuf-java-util-3.3.1.jar
+Source available at https://github.com/google/protobuf/tree/v3.3.1
+For details, see deps/protobuf-3.3.1/LICENSE.
 ------------------------------------------------------------------------------------
 This product bundles the JCP Standard Java Servlet API, which is available under a
 CDDL 1.1 license. For details, see deps/javax.servlet-api-3.1.0/CDDL+GPL-1.1.
@@ -390,4 +426,10 @@ Bundled as
   - lib/org.slf4j-slf4j-api-1.7.25.jar
   - lib/org.slf4j-slf4j-log4j12-1.7.25.jar
 Source available at https://github.com/qos-ch/slf4j/tree/v_1.7.25
+------------------------------------------------------------------------------------
+This product bundles the Google Auth Library, which is available under a "3-clause BSD"
+license. For details, see deps/google-auth-library-credentials-0.4.0/LICENSE
 
+Bundled as
+  - lib/com.google.auth-google-auth-library-credentials-0.4.0.jar
+Source available at https://github.com/google/google-auth-library-java/tree/0.4.0
diff --git a/bookkeeper-dist/src/main/resources/deps/google-auth-library-credentials-0.4.0/LICENSE b/bookkeeper-dist/src/main/resources/deps/google-auth-library-credentials-0.4.0/LICENSE
new file mode 100644
index 000000000..12edf23c6
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/google-auth-library-credentials-0.4.0/LICENSE
@@ -0,0 +1,28 @@
+Copyright 2014, Google Inc. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+   * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+   * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+
+   * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/bookkeeper-dist/src/main/resources/deps/protobuf-3.0.0/LICENSE b/bookkeeper-dist/src/main/resources/deps/protobuf-3.0.0/LICENSE
new file mode 100644
index 000000000..2dcab42da
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/protobuf-3.0.0/LICENSE
@@ -0,0 +1,32 @@
+Copyright 2014, Google Inc.  All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+    * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Code generated by the Protocol Buffer compiler is owned by the owner
+of the input file used when generating it.  This code is not
+standalone and requires a support library to be linked with it.  This
+support library is itself covered by the above license.
diff --git a/bookkeeper-dist/src/main/resources/deps/protobuf-3.3.1/LICENSE b/bookkeeper-dist/src/main/resources/deps/protobuf-3.3.1/LICENSE
new file mode 100644
index 000000000..2dcab42da
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/protobuf-3.3.1/LICENSE
@@ -0,0 +1,32 @@
+Copyright 2014, Google Inc.  All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+    * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Code generated by the Protocol Buffer compiler is owned by the owner
+of the input file used when generating it.  This code is not
+standalone and requires a support library to be linked with it.  This
+support library is itself covered by the above license.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
index 4815fee0c..1ef56debb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
@@ -48,6 +48,7 @@
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.StringUtils;
 
 /**
  * A bookie server is a server that run bookie and serving rpc requests.
@@ -327,13 +328,18 @@ static LifecycleComponentStack buildBookieServer(BookieConfiguration conf) throw
         // 5. build extra services
         String[] extraComponents = conf.getServerConf().getExtraServerComponents();
         if (null != extraComponents) {
-            List<ServerLifecycleComponent> components = loadServerComponents(
-                extraComponents,
-                conf,
-                rootStatsLogger);
-            for (ServerLifecycleComponent component : components) {
-                serverBuilder.addComponent(component);
-                log.info("Load lifecycle component : {}", component.getClass().getName());
+            try {
+                List<ServerLifecycleComponent> components = loadServerComponents(
+                    extraComponents,
+                    conf,
+                    rootStatsLogger);
+                for (ServerLifecycleComponent component : components) {
+                    serverBuilder.addComponent(component);
+                    log.info("Load lifecycle component : {}", component.getClass().getName());
+                }
+            } catch (Exception e) {
+                log.info("Failed to load extra components '{}' - {}. Continuing without those components.",
+                    StringUtils.join(extraComponents), e.getMessage());
             }
         }
 
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 1be3fe387..104a3d178 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -92,7 +92,10 @@ bookiePort=3181
 # Configure a list of server components to enable and load on a bookie server.
 # This provides the plugin run extra services along with a bookie server.
 #
-# extraServerComponents=
+# NOTE: if bookie fails to load any of extra components configured below, bookie will continue
+#       function by ignoring the components configured below.
+# extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
+extraServerComponents=
 
 #############################################################################
 ## Thread settings
@@ -888,3 +891,26 @@ zkEnableSecurity=false
 
 # The time to backoff when replication worker encounters exceptions on replicating a ledger, in milliseconds.
 # rwRereplicateBackoffMs=5000
+
+
+##################################################################
+##################################################################
+# Settings below are used by stream/table service
+##################################################################
+##################################################################
+
+### Grpc Server ###
+
+# the grpc server port to listen on. default is 4181
+storageserver.grpc.port=4181
+
+### Storage ###
+
+# local storage directories for storing table ranges data (e.g. rocksdb sst files)
+storage.range.store.dirs=data/bookkeeper/ranges
+
+# whether the storage server capable of serving readonly tables. default is false.
+storage.serve.readonly.tables=false
+
+# the cluster controller schedule interval, in milliseconds. default is 30 seconds.
+storage.cluster.controller.schedule.interval.ms=30000
diff --git a/pom.xml b/pom.xml
index fa252479a..a33fb7ffb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
     <curator.version>4.0.1</curator.version>
     <dropwizard.version>3.1.0</dropwizard.version>
     <finagle.version>6.44.0</finagle.version>
-    <freebuilder.version>1.12.3</freebuilder.version>
+    <freebuilder.version>1.14.9</freebuilder.version>
     <google.code.version>3.0.2</google.code.version>
     <google.errorprone.version>2.1.2</google.errorprone.version>
     <grpc.version>1.5.0</grpc.version>
@@ -362,6 +362,12 @@
         <groupId>io.grpc</groupId>
         <artifactId>grpc-all</artifactId>
         <version>${grpc.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.google.errorprone</groupId>
+            <artifactId>error_prone_annotations</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
 
       <!-- rocksdb dependencies -->
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 63e26791d..11bc8af7d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -26,7 +26,6 @@
 import java.net.BindException;
 import java.net.URI;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -43,20 +42,15 @@
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
-import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.StorageServer;
 import org.apache.bookkeeper.stream.storage.StorageConstants;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
-import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LocalDLMEmulator;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * A Cluster that runs a few storage nodes.
@@ -141,28 +135,9 @@ private void stopZooKeeper() {
     }
 
     private void initializeCluster() throws Exception {
-        try (CuratorFramework client = CuratorFrameworkFactory.newClient(
-            zkEnsemble,
-            new ExponentialBackoffRetry(100, Integer.MAX_VALUE, 10000)
-        )) {
-            client.start();
-
-            ZkClusterMetadataStore store = new ZkClusterMetadataStore(client, zkEnsemble, ZK_METADATA_ROOT_PATH);
-
-            ClusterMetadata metadata;
-            try {
-                metadata = store.getClusterMetadata();
-                log.info("Loaded cluster metadata : \n{}", metadata);
-            } catch (StorageRuntimeException sre) {
-                if (sre.getCause() instanceof KeeperException.NoNodeException) {
-                    log.info("Initializing the stream cluster.");
-                    store.initializeCluster(spec.numServers() * 2);
-                    log.info("Successfully initialized the stream cluster : \n{}", store.getClusterMetadata());
-                } else {
-                    throw sre;
-                }
-            }
-        }
+        new ZkClusterInitializer(zkEnsemble).initializeCluster(
+            URI.create("zk://" + zkEnsemble),
+            spec.numServers() * 2);
 
         // format the bookkeeper cluster
         MetadataDrivers.runFunctionWithMetadataBookieDriver(newBookieConfiguration(zkEnsemble), driver -> {
@@ -210,11 +185,10 @@ private LifecycleComponent startServer() throws Exception {
                 log.info("Attempting to start storage server at (bookie port = {}, grpc port = {})"
                         + " : bkDir = {}, rangesStoreDir = {}, serveReadOnlyTables = {}",
                     bookiePort, grpcPort, bkDir, rangesStoreDir, spec.serveReadOnlyTable);
-                server = StorageServer.startStorageServer(
+                server = StorageServer.buildStorageServer(
                     serverConf,
                     grpcPort,
-                    spec.numServers() * 2,
-                    Optional.empty());
+                    spec.numServers() * 2);
                 server.start();
                 log.info("Started storage server at (bookie port = {}, grpc port = {})",
                     bookiePort, grpcPort);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index d5ca39772..e0b87ef99 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -13,6 +13,7 @@
  */
 package org.apache.bookkeeper.stream.server;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
 
 import com.beust.jcommander.JCommander;
@@ -21,15 +22,16 @@
 import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.UnknownHostException;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.component.ComponentStarter;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponentStack;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog.DLCheckpointStore;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
@@ -37,6 +39,7 @@
 import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
 import org.apache.bookkeeper.stream.server.grpc.GrpcServerSpec;
 import org.apache.bookkeeper.stream.server.service.BookieService;
+import org.apache.bookkeeper.stream.server.service.BookieWatchService;
 import org.apache.bookkeeper.stream.server.service.ClusterControllerService;
 import org.apache.bookkeeper.stream.server.service.CuratorProviderService;
 import org.apache.bookkeeper.stream.server.service.DLNamespaceProviderService;
@@ -58,6 +61,7 @@
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.distributedlog.DistributedLogConfiguration;
 
 /**
  * A storage server is a server that run storage service and serving rpc requests.
@@ -71,7 +75,7 @@
         private String serverConfigFile;
 
         @Parameter(names = {"-p", "--port"}, description = "Port to listen on for gPRC server")
-        private int port = 3182;
+        private int port = 4181;
 
         @Parameter(names = {"-h", "--help"}, description = "Show this help message")
         private boolean help = false;
@@ -137,11 +141,10 @@ static int doMain(String[] args) {
 
         LifecycleComponent storageServer;
         try {
-            storageServer = startStorageServer(
+            storageServer = buildStorageServer(
                 conf,
                 grpcPort,
-                1024,
-                Optional.empty());
+                1024);
         } catch (ConfigurationException e) {
             log.error("Invalid storage configuration", e);
             return ExitCode.INVALID_CONF.code();
@@ -164,11 +167,23 @@ static int doMain(String[] args) {
         return ExitCode.OK.code();
     }
 
-    public static LifecycleComponent startStorageServer(CompositeConfiguration conf,
+    public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
+                                                        int grpcPort,
+                                                        int numStorageContainers)
+            throws UnknownHostException, ConfigurationException {
+        return buildStorageServer(conf, grpcPort, numStorageContainers, true, NullStatsLogger.INSTANCE);
+    }
+
+    public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
                                                         int grpcPort,
                                                         int numStorageContainers,
-                                                        Optional<String> instanceName)
+                                                        boolean startBookieAndStartProvider,
+                                                        StatsLogger externalStatsLogger)
         throws ConfigurationException, UnknownHostException {
+
+        LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder()
+            .withName("storage-server");
+
         BookieConfiguration bkConf = BookieConfiguration.of(conf);
         bkConf.validate();
 
@@ -188,42 +203,47 @@ public static LifecycleComponent startStorageServer(CompositeConfiguration conf,
         StorageResources storageResources = StorageResources.create();
 
         // Create the stats provider
-        StatsProviderService statsProviderService = new StatsProviderService(bkConf);
-        StatsLogger rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");
+        StatsLogger rootStatsLogger;
+        if (startBookieAndStartProvider) {
+            StatsProviderService statsProviderService = new StatsProviderService(bkConf);
+            rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");
+            serverBuilder.addComponent(statsProviderService);
+        } else {
+            rootStatsLogger = checkNotNull(externalStatsLogger,
+                "External stats logger is not provided while not starting stats provider");
+        }
 
         // Create the bookie service
-        BookieService bookieService = new BookieService(bkConf, rootStatsLogger);
+        ServerConfiguration bkServerConf;
+        if (startBookieAndStartProvider) {
+            BookieService bookieService = new BookieService(bkConf, rootStatsLogger);
+            serverBuilder.addComponent(bookieService);
+            bkServerConf = bookieService.serverConf();
+        } else {
+            bkServerConf = new ServerConfiguration();
+            bkServerConf.loadConf(bkConf.getUnderlyingConf());
+        }
+
+        // Create the bookie watch service
+        BookieWatchService bkWatchService;
+        {
+            DistributedLogConfiguration dlogConf = new DistributedLogConfiguration();
+            bkWatchService = new BookieWatchService(
+                dlogConf.getEnsembleSize(),
+                bkConf,
+                NullStatsLogger.INSTANCE);
+        }
 
         // Create the curator provider service
         CuratorProviderService curatorProviderService = new CuratorProviderService(
-            bookieService.serverConf(), dlConf, rootStatsLogger.scope("curator"));
+            bkServerConf, dlConf, rootStatsLogger.scope("curator"));
 
         // Create the distributedlog namespace service
         DLNamespaceProviderService dlNamespaceProvider = new DLNamespaceProviderService(
-            bookieService.serverConf(),
+            bkServerConf,
             dlConf,
             rootStatsLogger.scope("dlog"));
 
-        // Create a registration service provider
-        RegistrationServiceProvider regService = new RegistrationServiceProvider(
-            bookieService.serverConf(),
-            dlConf,
-            rootStatsLogger.scope("registration").scope("provider"));
-
-        // Create a cluster controller service
-        ClusterControllerService clusterControllerService = new ClusterControllerService(
-            storageConf,
-            () -> new ClusterControllerImpl(
-                new ZkClusterMetadataStore(
-                    curatorProviderService.get(),
-                    ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
-                    ZK_METADATA_ROOT_PATH),
-                regService.get(),
-                new DefaultStorageContainerController(),
-                new ZkClusterControllerLeaderSelector(curatorProviderService.get(), ZK_METADATA_ROOT_PATH),
-                storageConf),
-            rootStatsLogger.scope("cluster_controller"));
-
         // Create range (stream) store
         RangeStoreBuilder rangeStoreBuilder = RangeStoreBuilder.newBuilder()
             .withStatsLogger(rootStatsLogger.scope("storage"))
@@ -241,7 +261,7 @@ public static LifecycleComponent startStorageServer(CompositeConfiguration conf,
                     storageConf,
                     new ZkClusterMetadataStore(
                         curatorProviderService.get(),
-                        ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
+                        ZKMetadataDriverBase.resolveZkServers(bkServerConf),
                         ZK_METADATA_ROOT_PATH),
                     registry,
                     rootStatsLogger.scope("sc").scope("manager")))
@@ -267,26 +287,44 @@ public static LifecycleComponent startStorageServer(CompositeConfiguration conf,
         GrpcService grpcService = new GrpcService(
             serverConf, serverSpec, rpcStatsLogger);
 
+        // Create a registration service provider
+        RegistrationServiceProvider regService = new RegistrationServiceProvider(
+            bkServerConf,
+            dlConf,
+            rootStatsLogger.scope("registration").scope("provider"));
+
         // Create a registration state service only when service is ready.
         RegistrationStateService regStateService = new RegistrationStateService(
             myEndpoint,
-            bookieService.serverConf(),
+            bkServerConf,
             bkConf,
             regService,
             rootStatsLogger.scope("registration"));
 
+        // Create a cluster controller service
+        ClusterControllerService clusterControllerService = new ClusterControllerService(
+            storageConf,
+            () -> new ClusterControllerImpl(
+                new ZkClusterMetadataStore(
+                    curatorProviderService.get(),
+                    ZKMetadataDriverBase.resolveZkServers(bkServerConf),
+                    ZK_METADATA_ROOT_PATH),
+                regService.get(),
+                new DefaultStorageContainerController(),
+                new ZkClusterControllerLeaderSelector(curatorProviderService.get(), ZK_METADATA_ROOT_PATH),
+                storageConf),
+            rootStatsLogger.scope("cluster_controller"));
+
         // Create all the service stack
-        return LifecycleComponentStack.newBuilder()
-            .withName("storage-server")
-            .addComponent(statsProviderService)     // stats provider
-            .addComponent(bookieService)            // bookie server
+        return serverBuilder
+            .addComponent(bkWatchService)           // service that watches bookies
             .addComponent(curatorProviderService)   // service that provides curator client
             .addComponent(dlNamespaceProvider)      // service that provides dl namespace
-            .addComponent(regService)               // service that provides registration client
-            .addComponent(clusterControllerService) // service that run cluster controller service
             .addComponent(storageService)           // range (stream) store
             .addComponent(grpcService)              // range (stream) server (gRPC)
+            .addComponent(regService)               // service that provides registration client
             .addComponent(regStateService)          // service that manages server state
+            .addComponent(clusterControllerService) // service that run cluster controller service
             .build();
     }
 
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
new file mode 100644
index 000000000..5e6b0b51b
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.server;
+
+import java.net.UnknownHostException;
+import org.apache.bookkeeper.common.component.LifecycleComponent;
+import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
+import org.apache.bookkeeper.server.conf.BookieConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+
+/**
+ * This is a {@link ServerLifecycleComponent} to allow run stream storage component as part of bookie server.
+ */
+public class StreamStorageLifecycleComponent extends ServerLifecycleComponent {
+
+    private final LifecycleComponent streamStorage;
+
+    public StreamStorageLifecycleComponent(BookieConfiguration conf, StatsLogger statsLogger)
+            throws UnknownHostException, ConfigurationException {
+        super("stream-storage", conf, statsLogger);
+
+        StorageServerConfiguration ssConf = StorageServerConfiguration.of(conf.getUnderlyingConf());
+
+        this.streamStorage = StorageServer.buildStorageServer(
+            conf.getUnderlyingConf(),
+            ssConf.getGrpcPort(),
+            1024, /* indicator */
+            false,
+            statsLogger.scope("stream"));
+    }
+
+    @Override
+    protected void doStart() {
+        this.streamStorage.start();
+    }
+
+    @Override
+    protected void doStop() {
+        this.streamStorage.stop();
+    }
+
+    @Override
+    protected void doClose() {
+        this.streamStorage.close();
+    }
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
index b6803f2c6..e3c24c594 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
@@ -21,13 +21,11 @@
  */
 public class BookieConfiguration extends ComponentConfiguration {
 
-    private static final String COMPONENT_PREFIX = "bookie" + DELIMITER;
-
     public static BookieConfiguration of(CompositeConfiguration conf) {
         return new BookieConfiguration(conf);
     }
 
     protected BookieConfiguration(CompositeConfiguration conf) {
-        super(conf, COMPONENT_PREFIX);
+        super(conf, "");
     }
 }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
index a0ef963ed..bc300c9f8 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
@@ -24,6 +24,8 @@
 
     private static final String COMPONENT_PREFIX = "storageserver" + DELIMITER;
 
+    private static final String GRPC_PORT = "grpc.port";
+
     public static StorageServerConfiguration of(CompositeConfiguration conf) {
         return new StorageServerConfiguration(conf);
     }
@@ -31,4 +33,13 @@ public static StorageServerConfiguration of(CompositeConfiguration conf) {
     private StorageServerConfiguration(CompositeConfiguration conf) {
         super(conf, COMPONENT_PREFIX);
     }
+
+    /**
+     * Returns the grpc port that serves requests coming into the stream storage server.
+     *
+     * @return grpc port
+     */
+    public int getGrpcPort() {
+        return getInt(GRPC_PORT, 4181);
+    }
 }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java
new file mode 100644
index 000000000..cfac0dab2
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.server.service;
+
+import com.google.common.base.Stopwatch;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
+
+/**
+ * A service that watches bookies and wait for minimum number of bookies to be alive.
+ */
+@Slf4j
+public class BookieWatchService
+    extends AbstractLifecycleComponent<BookieConfiguration> {
+
+    private final int minNumBookies;
+
+    public BookieWatchService(int minNumBookies,
+                              BookieConfiguration conf,
+                              StatsLogger statsLogger) {
+        super("bookie-watcher", conf, statsLogger);
+        this.minNumBookies = minNumBookies;
+    }
+
+    @Override
+    protected void doStart() {
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.loadConf(conf.getUnderlyingConf());
+
+        @Cleanup("shutdown") ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+
+        try {
+            MetadataDrivers.runFunctionWithMetadataClientDriver(clientConf, clientDriver -> {
+                try {
+                    waitingForNumBookies(clientDriver.getRegistrationClient(), minNumBookies);
+                } catch (Exception e) {
+                    log.error("Encountered exceptions on waiting {} bookies to be alive", minNumBookies);
+                    throw new RuntimeException("Encountered exceptions on waiting "
+                        + minNumBookies + " bookies to be alive", e);
+                }
+                return (Void) null;
+            }, executorService);
+        } catch (MetadataException | ExecutionException  e) {
+            throw new RuntimeException("Failed to start bookie watch service", e);
+        }
+    }
+
+    private static void waitingForNumBookies(RegistrationClient client, int minNumBookies) throws Exception {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        Set<BookieSocketAddress> bookies = FutureUtils.result(client.getWritableBookies()).getValue();
+        while (bookies.size() < minNumBookies) {
+            TimeUnit.SECONDS.sleep(1);
+            bookies = FutureUtils.result(client.getWritableBookies()).getValue();
+            log.info("Only {} bookies are live since {} seconds elapsed, wait for another 1 second",
+                bookies.size(), stopwatch.elapsed(TimeUnit.SECONDS));
+        }
+    }
+
+    @Override
+    protected void doStop() {}
+
+    @Override
+    protected void doClose() {}
+
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
index 7d404f0d9..9bd6ab429 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
@@ -33,6 +34,7 @@
 /**
  * A service to provide a curator client.
  */
+@Slf4j
 public class CuratorProviderService
     extends AbstractLifecycleComponent<DLConfiguration>
     implements Supplier<CuratorFramework> {
@@ -57,6 +59,7 @@ public CuratorProviderService(ServerConfiguration bkServerConf,
     @Override
     protected void doStart() {
         curatorClient.start();
+        log.info("Provided curator clients to zookeeper {}.", zkServers);
     }
 
     @Override
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
index e0359ce8d..4550df55a 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
@@ -69,6 +69,7 @@ private static URI initializeNamespace(ServerConfiguration bkServerConf,
     }
 
     private final ServerConfiguration bkServerConf;
+    @Getter
     private final DistributedLogConfiguration dlConf;
     private final DynamicDistributedLogConfiguration dlDynConf;
     @Getter
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
index 8db59fdea..f1cda91e7 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
@@ -105,6 +105,8 @@ protected void doStart() {
         }
     }
 
+
+
     @Override
     protected void doStop() {
     }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
new file mode 100644
index 000000000..61e5fa3b7
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.api.cluster;
+
+import java.net.URI;
+
+/**
+ * Initializing cluster metadata.
+ */
+public interface ClusterInitializer {
+
+    /**
+     * Retrieves whether the initializer thinks that it can initialize the metadata service
+     * specified by the given {@code metadataServiceUri}. Typically the implementations will
+     * return <tt>true</tt> if they understand the subprotocol specified in the URI and
+     * <tt>false</tt> if they do not.
+     *
+     * @param metatadataServiceUri the metadata service uri
+     * @return <tt>true</tt> if the implementation understands the given URI; <tt>false</tt> otherwise.
+     */
+    boolean acceptsURI(URI metatadataServiceUri);
+
+    /**
+     * Create a new cluster under metadata service specified by {@code metadataServiceUri}.
+     *
+     * @param metadataServiceUri metadata service uri
+     * @param numStorageContainers number storage containers
+     */
+    void initializeCluster(URI metadataServiceUri, int numStorageContainers);
+
+}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
index 935552a9d..d46f6338a 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.stream.storage.api.cluster;
 
+import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
@@ -29,12 +30,18 @@
  */
 public interface ClusterMetadataStore extends AutoCloseable {
 
+
+    default void initializeCluster(int numStorageContainers) {
+        initializeCluster(numStorageContainers, Optional.empty());
+    }
+
     /**
      * Initialize the cluster metadata with the provided <i>numStorageContainers</i>.
      *
      * @param numStorageContainers number of storage containers.
+     * @param segmentStorePath segment store path
      */
-    void initializeCluster(int numStorageContainers);
+    void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath);
 
     /**
      * Get the current cluster assignment data.
diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml
index a75403c38..03870c28c 100644
--- a/stream/storage/impl/pom.xml
+++ b/stream/storage/impl/pom.xml
@@ -27,11 +27,6 @@
   <artifactId>stream-storage-service-impl</artifactId>
   <name>Apache BookKeeper :: Stream Storage :: Storage :: Impl</name>
 
-  <properties>
-    <!-- dependencies -->
-    <helix-core.version>0.6.7</helix-core.version>
-  </properties>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
@@ -52,11 +47,6 @@
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-recipes</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.apache.helix</groupId>
-      <artifactId>helix-core</artifactId>
-      <version>${helix-core.version}</version>
-    </dependency>
     <dependency>
       <groupId>org.apache.distributedlog</groupId>
       <artifactId>distributedlog-core</artifactId>
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
index da7486711..4313ce156 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
@@ -20,11 +20,10 @@
 
 import com.google.common.collect.Maps;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
-import lombok.AccessLevel;
 import lombok.Data;
-import lombok.Getter;
 import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
 import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
 import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
@@ -55,7 +54,8 @@ synchronized int getNumWatchers() {
     }
 
     @Override
-    public synchronized void initializeCluster(int numStorageContainers) {
+    public synchronized void initializeCluster(int numStorageContainers,
+                                               Optional<String> segmentStorePath) {
         this.metadata = ClusterMetadata.newBuilder()
             .setNumStorageContainers(numStorageContainers)
             .build();
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
new file mode 100644
index 000000000..832f15d67
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
+
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
+import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * ZooKeeper Based Cluster Initializer.
+ */
+@Slf4j
+public class ZkClusterInitializer implements ClusterInitializer  {
+
+    private final String zkExternalConnectString;
+
+    public ZkClusterInitializer(String zkServers) {
+        this.zkExternalConnectString = zkServers;
+    }
+
+    @Override
+    public boolean acceptsURI(URI metadataServiceUri) {
+        return metadataServiceUri.getScheme().toLowerCase().startsWith("zk");
+    }
+
+    @Override
+    public void initializeCluster(URI metadataServiceUri, int numStorageContainers) {
+        String zkInternalConnectString = ZKMetadataDriverBase.getZKServersFromServiceUri(metadataServiceUri);
+        // 1) `zkExternalConnectString` are the public endpoints, where the tool can interact with.
+        //    It allows the tools running outside of the cluster. It is useful for being used in dockerized environment.
+        // 2) `zkInternalConnectString` are the internal endpoints, where the services can interact with.
+        //    It is used by dlog to bind a namespace.
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(
+            zkExternalConnectString,
+            new ExponentialBackoffRetry(100, Integer.MAX_VALUE, 10000)
+        )) {
+            client.start();
+
+            ZkClusterMetadataStore store =
+                new ZkClusterMetadataStore(client, zkInternalConnectString, ZK_METADATA_ROOT_PATH);
+
+            ClusterMetadata metadata;
+            try {
+                metadata = store.getClusterMetadata();
+                log.info("Loaded cluster metadata : \n{}", metadata);
+            } catch (StorageRuntimeException sre) {
+                if (sre.getCause() instanceof KeeperException.NoNodeException) {
+                    log.info("Initializing the stream cluster with {} storage containers with segment store path {}.",
+                        numStorageContainers);
+
+                    String ledgersPath = metadataServiceUri.getPath();
+                    Optional<String> segmentStorePath;
+                    if (Strings.isNullOrEmpty(ledgersPath) || "/" == ledgersPath) {
+                        segmentStorePath = Optional.empty();
+                    } else {
+                        segmentStorePath = Optional.of(ledgersPath);
+                    }
+
+                    store.initializeCluster(numStorageContainers, segmentStorePath);
+                    log.info("Successfully initialized the stream cluster : \n{}", store.getClusterMetadata());
+                } else {
+                    throw sre;
+                }
+            }
+        }
+
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
index bcf70c3f3..59af968d9 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
@@ -29,6 +29,7 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
@@ -92,7 +93,7 @@ public void close() {
     }
 
     @Override
-    public void initializeCluster(int numStorageContainers) {
+    public void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
         ClusterMetadata metadata = ClusterMetadata.newBuilder()
             .setNumStorageContainers(numStorageContainers)
             .build();
@@ -101,7 +102,7 @@ public void initializeCluster(int numStorageContainers) {
         try {
             // we are using dlog for the storage backend, so we need to initialize the dlog namespace
             BKDLConfig dlogConfig = new BKDLConfig(
-                zkServers, getSegmentsRootPath(zkRootPath));
+                zkServers, segmentStorePath.orElse(getSegmentsRootPath(zkRootPath)));
             DLMetadata dlogMetadata = DLMetadata.create(dlogConfig);
 
             client.transaction()
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
index 4d2828bb9..73d6e17ff 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
@@ -30,6 +30,7 @@
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -82,7 +83,7 @@ public void setup() {
         ClusterMetadataStore originalStore = metadataStore;
         this.metadataStore = new ClusterMetadataStore() {
             @Override
-            public void initializeCluster(int numStorageContainers) {
+            public void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
                 originalStore.initializeCluster(numStorageContainers);
             }
 
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
index 874ec2a74..d70b4f239 100644
--- a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
@@ -20,11 +20,14 @@
 
 import static java.time.temporal.ChronoUnit.SECONDS;
 
+import com.google.common.base.Strings;
 import java.net.URI;
 import java.time.Duration;
 import java.util.Objects;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.tests.DockerUtils;
 import org.apache.bookkeeper.tests.containers.wait.HttpWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
 
 /**
  * Test Container for Bookies.
@@ -40,13 +43,16 @@
 
     private final String hostname;
     private final String metadataServiceUri;
+    private final String extraServerComponents;
 
     public BookieContainer(String clusterName,
                            String hostname,
-                           String metadataServiceUri) {
+                           String metadataServiceUri,
+                           String extraServerComponents) {
         super(clusterName, IMAGE_NAME);
         this.hostname = hostname;
         this.metadataServiceUri = metadataServiceUri;
+        this.extraServerComponents = extraServerComponents;
     }
 
     @Override
@@ -54,6 +60,14 @@ public String getContainerName() {
         return clusterName + "-" + hostname;
     }
 
+    public String getExternalGrpcEndpointStr() {
+        return getContainerIpAddress() + ":" + getMappedPort(BOOKIE_GRPC_PORT);
+    }
+
+    public String getInternalGrpcEndpointStr() {
+        return DockerUtils.getContainerIP(dockerClient, containerId) + ":" + BOOKIE_GRPC_PORT;
+    }
+
     @Override
     protected void configure() {
         addExposedPorts(
@@ -65,20 +79,28 @@ protected void configure() {
         addEnv("BK_httpServerPort", "" + BOOKIE_HTTP_PORT);
         addEnv("BK_metadataServiceUri", metadataServiceUri);
         addEnv("BK_useHostNameAsBookieID", "true");
+        addEnv("BK_extraServerComponents", extraServerComponents);
         if (metadataServiceUri.toLowerCase().startsWith("zk")) {
             URI uri = URI.create(metadataServiceUri);
             addEnv("BK_zkServers", uri.getAuthority());
             addEnv("BK_zkLedgersRootPath", uri.getPath());
         }
+        // grpc port
+        addEnv("BK_storageserver.grpc.port", "" + BOOKIE_GRPC_PORT);
     }
 
     @Override
     public void start() {
-        this.waitStrategy = new HttpWaitStrategy()
-            .forPath("/heartbeat")
-            .forStatusCode(200)
-            .forPort(BOOKIE_HTTP_PORT)
-            .withStartupTimeout(Duration.of(60, SECONDS));
+        if (Strings.isNullOrEmpty(extraServerComponents)) {
+            this.waitStrategy = new HttpWaitStrategy()
+                .forPath("/heartbeat")
+                .forStatusCode(200)
+                .forPort(BOOKIE_HTTP_PORT)
+                .withStartupTimeout(Duration.of(60, SECONDS));
+        } else {
+            this.waitStrategy = new HostPortWaitStrategy()
+                .withStartupTimeout(Duration.of(300, SECONDS));
+        }
         this.withCreateContainerCmdModifier(createContainerCmd -> {
             createContainerCmd.withHostName(hostname);
             createContainerCmd.withName(getContainerName());
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
index 36d5d86ae..ebc0c8786 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
@@ -33,6 +33,7 @@
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.tests.integration.topologies.BKCluster;
+import org.apache.bookkeeper.tests.integration.topologies.BKClusterSpec;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -50,15 +51,31 @@
 
     @BeforeClass
     public static void setupCluster() throws Exception {
-        bkCluster = new BKCluster(RandomStringUtils.randomAlphabetic(8), 0);
+        BKClusterSpec spec = BKClusterSpec.builder()
+            .clusterName(RandomStringUtils.randomAlphabetic(8))
+            .numBookies(0)
+            .build();
+
+        setupCluster(spec);
+    }
+
+    protected static void setupCluster(BKClusterSpec spec) throws Exception {
+        log.info("Setting up cluster {} with {} bookies : extraServerComponents = {}",
+            spec.clusterName(), spec.numBookies(), spec.extraServerComponents());
+
+        bkCluster = BKCluster.forSpec(spec);
         bkCluster.start();
 
+        log.info("Cluster {} is setup", spec.clusterName());
+
         metadataServiceUri = URI.create(bkCluster.getExternalServiceUri());
         ClientConfiguration conf = new ClientConfiguration()
             .setMetadataServiceUri(metadataServiceUri.toString());
         executor = Executors.newSingleThreadScheduledExecutor();
         metadataClientDriver = MetadataDrivers.getClientDriver(metadataServiceUri);
         metadataClientDriver.initialize(conf, executor, NullStatsLogger.INSTANCE, Optional.empty());
+
+        log.info("Initialized metadata client driver : {}", metadataServiceUri);
     }
 
     @AfterClass
@@ -74,7 +91,7 @@ public static void teardownCluster() {
         }
     }
 
-    private boolean findIfBookieRegistered(String bookieName) throws Exception {
+    private static boolean findIfBookieRegistered(String bookieName) throws Exception {
         Set<BookieSocketAddress> bookies =
             FutureUtils.result(metadataClientDriver.getRegistrationClient().getWritableBookies()).getValue();
         Optional<BookieSocketAddress> registered =
@@ -82,7 +99,7 @@ private boolean findIfBookieRegistered(String bookieName) throws Exception {
         return registered.isPresent();
     }
 
-    protected void waitUntilBookieUnregistered(String bookieName) throws Exception {
+    protected static void waitUntilBookieUnregistered(String bookieName) throws Exception {
         Stopwatch sw = Stopwatch.createStarted();
         while (findIfBookieRegistered(bookieName)) {
             TimeUnit.MILLISECONDS.sleep(1000);
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/LocationClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
similarity index 83%
rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/LocationClientTest.java
rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
index bf3132108..42b428e37 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/LocationClientTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
 import static org.junit.Assert.assertEquals;
@@ -33,25 +33,27 @@
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
  * Integration test for location test.
  */
 @Slf4j
-public class LocationClientTest extends StorageServerTestBase {
+public class LocationClientTest extends StreamClusterTestBase {
 
     private OrderedScheduler scheduler;
     private LocationClient client;
 
-    @Override
-    protected void doSetup() throws Exception {
+    @Before
+    public void setup() {
         scheduler = OrderedScheduler.newSchedulerBuilder()
             .name("location-client-test")
             .numThreads(1)
             .build();
         StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
+            .addEndpoints(getExsternalStreamEndpoints().toArray(new Endpoint[getNumBookies()]))
             .usePlaintext(true)
             .build();
         client = new LocationClientImpl(
@@ -59,8 +61,8 @@ protected void doSetup() throws Exception {
             scheduler);
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != client) {
             client.close();
         }
@@ -80,13 +82,15 @@ public void testLocateStorageContainers() throws Exception {
         assertEquals(StatusCode.SUCCESS, oneResponse.getStatusCode());
 
         Endpoint endpoint = oneResponse.getEndpoint().getRwEndpoint();
-        log.info("Current cluster endpoints = {}", cluster.getRpcEndpoints());
+        log.info("Current cluster endpoints = {}", getInternalStreamEndpoints());
         log.info("Response : rw endpoint = {}", endpoint);
-        assertTrue(cluster.getRpcEndpoints().contains(endpoint));
+        assertTrue(getInternalStreamEndpoints().contains(endpoint));
 
         assertEquals(1, oneResponse.getEndpoint().getRoEndpointCount());
         endpoint = oneResponse.getEndpoint().getRoEndpoint(0);
         log.info("Response : ro endpoint = {}", endpoint);
-        assertTrue(cluster.getRpcEndpoints().contains(endpoint));
+        assertTrue(getInternalStreamEndpoints().contains(endpoint));
     }
+
+
 }
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
new file mode 100644
index 000000000..b86cc7260
--- /dev/null
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.integration.stream;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase;
+import org.apache.bookkeeper.tests.integration.topologies.BKClusterSpec;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Similar as {@link org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase},
+ * but enabled stream storage for testing stream storage related features.
+ */
+@Slf4j
+public abstract class StreamClusterTestBase extends BookKeeperClusterTestBase {
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        BKClusterSpec spec = BKClusterSpec.builder()
+            .clusterName(RandomStringUtils.randomAlphabetic(8))
+            .numBookies(3)
+            .extraServerComponents("org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent")
+            .build();
+        BookKeeperClusterTestBase.setupCluster(spec);
+    }
+
+    @AfterClass
+    public static void teardownCluster() {
+        BookKeeperClusterTestBase.teardownCluster();
+    }
+
+    protected static int getNumBookies() {
+        return bkCluster.getBookieContainers().size();
+    }
+
+    protected static List<Endpoint> getExsternalStreamEndpoints() {
+        return bkCluster.getBookieContainers().values().stream()
+            .map(container ->
+                NetUtils.parseEndpoint(container.getExternalGrpcEndpointStr()))
+            .collect(Collectors.toList());
+    }
+
+    protected static List<Endpoint> getInternalStreamEndpoints() {
+        return bkCluster.getBookieContainers().values().stream()
+            .map(container ->
+                NetUtils.parseEndpoint(container.getInternalGrpcEndpointStr()))
+            .collect(Collectors.toList());
+    }
+
+
+}
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
index c1e9e84f5..80ae90690 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
@@ -18,9 +18,11 @@
 
 package org.apache.bookkeeper.tests.integration.topologies;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -29,6 +31,8 @@
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
 import org.apache.bookkeeper.tests.containers.BookieContainer;
 import org.apache.bookkeeper.tests.containers.MetadataStoreContainer;
 import org.apache.bookkeeper.tests.containers.ZKContainer;
@@ -40,21 +44,37 @@
 @Slf4j
 public class BKCluster {
 
+    /**
+     * BookKeeper Cluster Spec.
+     *
+     * @param spec bookkeeper cluster spec.
+     * @return the built bookkeeper cluster
+     */
+    public static BKCluster forSpec(BKClusterSpec spec) {
+        return new BKCluster(spec);
+    }
+
+    private final BKClusterSpec spec;
     @Getter
     private final String clusterName;
     private final Network network;
     private final MetadataStoreContainer metadataContainer;
     private final Map<String, BookieContainer> bookieContainers;
     private final int numBookies;
+    private final String extraServerComponents;
+    private volatile boolean enableContainerLog;
 
-    public BKCluster(String clusterName, int numBookies) {
-        this.clusterName = clusterName;
+    private BKCluster(BKClusterSpec spec) {
+        this.spec = spec;
+        this.clusterName = spec.clusterName();
         this.network = Network.newNetwork();
         this.metadataContainer = (MetadataStoreContainer) new ZKContainer(clusterName)
             .withNetwork(network)
             .withNetworkAliases(ZKContainer.HOST_NAME);
         this.bookieContainers = Maps.newTreeMap();
-        this.numBookies = numBookies;
+        this.numBookies = spec.numBookies();
+        this.extraServerComponents = spec.extraServerComponents();
+        this.enableContainerLog = spec.enableContainerLog();
     }
 
     public String getExternalServiceUri() {
@@ -67,10 +87,28 @@ public String getInternalServiceUri() {
 
     public void start() throws Exception {
         // start the metadata store
+        if (enableContainerLog) {
+            this.metadataContainer.tailContainerLog();
+        }
         this.metadataContainer.start();
+        log.info("Successfully started metadata store container.");
 
         // init a new cluster
         initNewCluster(metadataContainer.getExternalServiceUri());
+        log.info("Successfully initialized metadata service uri : {}",
+            metadataContainer.getExternalServiceUri());
+
+        if (!Strings.isNullOrEmpty(extraServerComponents)) {
+            int numStorageContainers = numBookies > 0 ? 2 * numBookies : 8;
+            // initialize the stream storage.
+            new ZkClusterInitializer(
+                ZKMetadataDriverBase.getZKServersFromServiceUri(URI.create(metadataContainer.getExternalServiceUri()))
+            ).initializeCluster(
+                URI.create(metadataContainer.getInternalServiceUri()),
+                numStorageContainers);
+            log.info("Successfully initialized stream storage metadata with {} storage containers",
+                numStorageContainers);
+        }
 
         // create bookies
         createBookies("bookie", numBookies);
@@ -123,20 +161,36 @@ public BookieContainer killBookie(String bookieName) {
         return container;
     }
 
-    public synchronized BookieContainer createBookie(String bookieName) {
-        BookieContainer container = getBookie(bookieName);
-        if (null == container) {
-            container = (BookieContainer) new BookieContainer(clusterName, bookieName, ZKContainer.SERVICE_URI)
-                .withNetwork(network)
-                .withNetworkAliases(bookieName);
+    public BookieContainer createBookie(String bookieName) {
+        boolean shouldStart = false;
+        BookieContainer container;
+        synchronized (this) {
+            container = getBookie(bookieName);
+            if (null == container) {
+                shouldStart = true;
+                log.info("Creating bookie {}", bookieName);
+                container = (BookieContainer) new BookieContainer(
+                    clusterName, bookieName, ZKContainer.SERVICE_URI, extraServerComponents
+                ).withNetwork(network).withNetworkAliases(bookieName);
+                if (enableContainerLog) {
+                    container.tailContainerLog();
+                }
+
+                log.info("Created bookie {}", bookieName);
+                bookieContainers.put(bookieName, container);
+            }
+        }
+
+        if (shouldStart) {
+            log.info("Starting bookie {}", bookieName);
             container.start();
-            bookieContainers.put(bookieName, container);
         }
         return container;
     }
 
-    public synchronized Map<String, BookieContainer> createBookies(String bookieNamePrefix, int numBookies)
+    public Map<String, BookieContainer> createBookies(String bookieNamePrefix, int numBookies)
             throws Exception {
+        log.info("Creating {} bookies with bookie name prefix '{}'", numBookies, bookieNamePrefix);
         List<CompletableFuture<Void>> startFutures = Lists.newArrayListWithExpectedSize(numBookies);
         Map<String, BookieContainer> containers = Maps.newHashMap();
         for (int i = 0; i < numBookies; i++) {
@@ -144,6 +198,7 @@ public synchronized BookieContainer createBookie(String bookieName) {
             startFutures.add(
                 CompletableFuture.runAsync(() -> {
                     String bookieName = String.format("%s-%03d", bookieNamePrefix, idx);
+                    log.info("Starting bookie {} at cluster {}", bookieName, clusterName);
                     BookieContainer container = createBookie(bookieName);
                     synchronized (containers) {
                         containers.put(bookieName, container);
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java
new file mode 100644
index 000000000..e919d1af8
--- /dev/null
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.integration.topologies;
+
+import lombok.Builder;
+import lombok.Builder.Default;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+/**
+ * Spec to build {@link BKCluster}.
+ */
+@Builder
+@Accessors(fluent = true)
+@Getter
+@Setter
+public class BKClusterSpec {
+
+    /**
+     * Returns the cluster name.
+     *
+     * @return the cluster name.
+     */
+    String clusterName;
+
+    /**
+     * Returns the number of bookies.
+     *
+     * @return the number of bookies;
+     */
+    @Default
+    int numBookies = 0;
+
+    /**
+     * Returns the extra server components.
+     *
+     * @return the extra server components.
+     */
+    @Default
+    String extraServerComponents = "";
+
+    /**
+     * Returns the flag whether to enable/disable container log.
+     *
+     * @return the flag whether to enable/disable container log.
+     */
+    @Default
+    boolean enableContainerLog = false;
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services