You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 21:01:21 UTC

[bookkeeper] branch branch-4.7 updated (5be2e90 -> 3c5d096)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a change to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git.


    from 5be2e90  Provide a util method to run functions with metadata client driver
     new 8ab854f  Build stream modules for integration tests
     new d869f7c  Allow bookie to start if failed to load extra server components
     new b25fc5f  [TABLE SERVICE] start table service as an extra component of bookie
     new 86e23c2  [TABLE SERVICE] Move integration tests under `stream/tests/integration` to `tests/integration/cluster`
     new 609bf4e  Publish bookkeeper dev image to docker hub
     new 8e8e22f  [TABLE SERVICE] Move grpc services from server module to storage module
     new 335082b  Fix bookkeeper post commit CI
     new edeeb10  Use maven-exec-plugin to get the maven project version for CI jobs
     new c9b32d7  [TABLE SERVICE] client interceptor and storage container grpc proxy
     new 3c5d096  Fix checkstyle warnings

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitignore                                         |   9 +
 .test-infra/jenkins/common_job_properties.groovy   |   1 +
 ...seed.groovy => jenkins_testing_job_seed.groovy} |   2 +-
 .../jenkins/job_bookkeeper_codecoverage.groovy     |   2 +-
 ...ob_bookkeeper_precommit_integrationtests.groovy |   4 +-
 .../job_bookkeeper_release_nightly_snapshot.groovy |  23 +-
 .../install-all-tarballs.sh => bin/standalone      |  20 +-
 bin/standalone.docker-compose                      | 177 +++++++++++
 bookkeeper-dist/all/pom.xml                        |   7 +
 bookkeeper-dist/server/pom.xml                     |   7 +
 bookkeeper-dist/src/assemble/bin-all.xml           |   3 +
 bookkeeper-dist/src/assemble/bin-server.xml        |   4 +
 .../src/main/resources/LICENSE-all.bin.txt         |  47 ++-
 .../src/main/resources/LICENSE-server.bin.txt      |  44 ++-
 .../LICENSE                                        |  14 +-
 .../{protobuf-3.4.0 => protobuf-3.0.0}/LICENSE     |   0
 .../{protobuf-3.4.0 => protobuf-3.3.1}/LICENSE     |   0
 .../java/org/apache/bookkeeper/server/Main.java    |  20 +-
 conf/bk_server.conf                                |  28 +-
 dev/{docker/Dockerfile => common.sh}               |  17 +-
 dev/publish-docker-images.sh                       |  70 +++++
 pom.xml                                            |  11 +-
 .../bookkeeper/clients/StorageClientImpl.java      |   1 -
 .../clients/config/StorageClientSettings.java      |  11 +
 .../clients/impl/channel/StorageServerChannel.java |  49 ++-
 .../impl/channel/StorageServerChannelManager.java  |   2 +-
 .../impl/container/StorageContainerChannel.java    |   6 +-
 .../StorageContainerClientInterceptor.java         |  60 ++++
 .../internal/StorageServerClientManagerImpl.java   |   2 +-
 .../clients/resolver/EndpointResolver.java}        |  33 +-
 stream/common/pom.xml                              |   6 +
 .../netty/IdentityBinaryMarshaller.java}           |  36 +--
 .../netty/IdentityInputStreamMarshaller.java}      |  37 +--
 .../netty/LongBinaryMarshaller.java}               |  37 +--
 .../common/grpc/netty}/package-info.java           |   4 +-
 .../bookkeeper/common/grpc}/package-info.java      |   4 +-
 .../common/grpc/proxy/ChannelFinder.java}          |  25 +-
 .../bookkeeper/common/grpc/proxy/ProxyCall.java    | 138 +++++++++
 .../common/grpc/proxy/ProxyHandlerRegistry.java    | 135 ++++++++
 .../common/grpc/proxy/ProxyServerCallHandler.java  |  56 ++++
 .../common/grpc/proxy}/package-info.java           |   4 +-
 .../grpc/netty/IdentityBinaryMarshallerTest.java   |  52 ++++
 .../netty/IdentityInputStreamMarshallerTest.java}  |  27 +-
 .../grpc/netty/LongBinaryMarshallerTest.java}      |  22 +-
 .../grpc/proxy/DirectPingPongServiceTest.java}     |  24 +-
 .../common/grpc/proxy/PingPongServiceTestBase.java | 345 +++++++++++++++++++++
 .../grpc/proxy/ProxyPingPongServiceTest.java}      |  14 +-
 stream/pom.xml                                     |   2 +-
 stream/proto/pom.xml                               |   6 +
 .../util/StorageContainerPlacementPolicy.java      |   8 +
 .../bookkeeper/stream/cluster/StreamCluster.java   |  39 +--
 .../bookkeeper/stream/server/StorageServer.java    | 135 +++++---
 .../server/StreamStorageLifecycleComponent.java    |  63 ++++
 .../stream/server/conf/BookieConfiguration.java    |   4 +-
 .../server/conf/StorageServerConfiguration.java    |  11 +
 .../bookkeeper/stream/server/grpc/GrpcServer.java  |   3 +
 .../stream/server/service/BookieWatchService.java  |  95 ++++++
 .../server/service/CuratorProviderService.java     |   3 +
 .../server/service/DLNamespaceProviderService.java |   1 +
 .../service/RegistrationServiceProvider.java       |   2 +
 .../storage/api/cluster/ClusterInitializer.java    |  47 +++
 .../storage/api/cluster/ClusterMetadataStore.java  |   9 +-
 .../api/sc/StorageContainerManagerFactory.java     |   4 +-
 stream/storage/impl/pom.xml                        |  10 -
 .../stream/storage/RangeStoreBuilder.java          |  22 +-
 .../stream/storage/impl/RangeStoreImpl.java        |   8 +-
 .../impl/cluster/InMemClusterMetadataStore.java    |   6 +-
 .../storage/impl/cluster/ZkClusterInitializer.java |  95 ++++++
 .../impl/cluster/ZkClusterMetadataStore.java       |   5 +-
 .../storage/impl}/grpc/GrpcMetaRangeService.java   |  22 +-
 .../storage/impl}/grpc/GrpcRootRangeService.java   |  23 +-
 .../storage/impl}/grpc/GrpcTableService.java       |  10 +-
 .../impl/grpc}/handler/ResponseHandler.java        |   2 +-
 .../handler/StorageContainerResponseHandler.java   |   2 +-
 .../storage/impl/grpc}/handler/package-info.java   |   2 +-
 .../stream/storage/impl/grpc}/package-info.java    |   4 +-
 .../stream/storage/impl/TestRangeStoreImpl.java    |   2 +-
 .../cluster/ClusterControllerLeaderImplTest.java   |   3 +-
 .../impl}/grpc/TestGrpcMetaRangeService.java       |   5 +-
 .../impl}/grpc/TestGrpcRootRangeService.java       |  20 +-
 .../storage/impl}/grpc/TestGrpcTableService.java   |   3 +-
 .../storage/impl/grpc}/TestResponseObserver.java   |   4 +-
 .../TestStorageContainerResponseHandler.java       |   4 +-
 stream/{proto => tests-common}/pom.xml             |  20 +-
 .../bookkeeper/tests/rpc/PingPongService.java      | 119 +++++++
 .../apache/bookkeeper/tests/rpc}/package-info.java |   4 +-
 .../main/proto/proto2_coder_test_messages.proto    |   0
 .../src/main/proto/rpc.proto}                      |  32 +-
 stream/tests/integration/pom.xml                   | 110 -------
 .../tests/integration/StorageClientTest.java       | 111 -------
 .../tests/integration/StorageServerTestBase.java   |  80 -----
 .../src/test/resources/log4j.properties            |  55 ----
 stream/tests/pom.xml                               |  32 --
 .../tests/containers/BookieContainer.java          |  34 +-
 .../cluster/BookKeeperClusterTestBase.java         |  23 +-
 .../integration/stream}/LocationClientTest.java    |  30 +-
 .../stream}/StorageAdminClientTest.java            |  37 +--
 .../integration/stream/StreamClusterTestBase.java  | 107 +++++++
 .../integration/stream}/TableClientSimpleTest.java |  44 +--
 .../tests/integration/stream}/TableClientTest.java |  44 +--
 .../tests/integration/topologies/BKCluster.java    |  92 +++++-
 .../integration/topologies/BKClusterSpec.java      |  39 ++-
 102 files changed, 2440 insertions(+), 897 deletions(-)
 rename .test-infra/jenkins/{job_testing_seed.groovy => jenkins_testing_job_seed.groovy} (97%)
 copy tests/docker-images/all-released-versions-image/scripts/install-all-tarballs.sh => bin/standalone (77%)
 create mode 100755 bin/standalone.docker-compose
 copy bookkeeper-dist/src/main/resources/deps/{protobuf-3.4.0 => google-auth-library-credentials-0.4.0}/LICENSE (71%)
 copy bookkeeper-dist/src/main/resources/deps/{protobuf-3.4.0 => protobuf-3.0.0}/LICENSE (100%)
 copy bookkeeper-dist/src/main/resources/deps/{protobuf-3.4.0 => protobuf-3.3.1}/LICENSE (100%)
 copy dev/{docker/Dockerfile => common.sh} (67%)
 create mode 100755 dev/publish-docker-images.sh
 create mode 100644 stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
 copy stream/{statelib/src/main/java/org/apache/bookkeeper/statelib/api/kv/KVStoreReadView.java => clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java} (52%)
 copy stream/common/src/main/java/org/apache/bookkeeper/common/{kv/KVImpl.java => grpc/netty/IdentityBinaryMarshaller.java} (62%)
 copy stream/common/src/main/java/org/apache/bookkeeper/common/{kv/KVImpl.java => grpc/netty/IdentityInputStreamMarshaller.java} (59%)
 copy stream/common/src/main/java/org/apache/bookkeeper/common/{kv/KVImpl.java => grpc/netty/LongBinaryMarshaller.java} (58%)
 copy stream/{distributedlog/common/src/main/java/org/apache/distributedlog/common/rate => common/src/main/java/org/apache/bookkeeper/common/grpc/netty}/package-info.java (91%)
 copy stream/{statelib/src/main/java/org/apache/bookkeeper/statelib/api => common/src/main/java/org/apache/bookkeeper/common/grpc}/package-info.java (92%)
 copy stream/{cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/SubCommand.java => common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java} (63%)
 create mode 100644 stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java
 create mode 100644 stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java
 create mode 100644 stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java
 copy {bookkeeper-server/src/main/java/org/apache/bookkeeper/server/conf => stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy}/package-info.java (90%)
 create mode 100644 stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java
 copy stream/{statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCUtils.java => common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java} (58%)
 copy stream/{statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCUtils.java => common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java} (61%)
 copy stream/{api/src/main/java/org/apache/bookkeeper/api/exceptions/ApiException.java => common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java} (66%)
 create mode 100644 stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java
 copy stream/{api/src/main/java/org/apache/bookkeeper/api/kv/op/CompareTarget.java => common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java} (77%)
 create mode 100644 stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
 create mode 100644 stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java
 create mode 100644 stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
 create mode 100644 stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
 rename stream/{server/src/main/java/org/apache/bookkeeper/stream/server => storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl}/grpc/GrpcMetaRangeService.java (60%)
 rename stream/{server/src/main/java/org/apache/bookkeeper/stream/server => storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl}/grpc/GrpcRootRangeService.java (86%)
 rename stream/{server/src/main/java/org/apache/bookkeeper/stream/server => storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl}/grpc/GrpcTableService.java (88%)
 rename stream/{server/src/main/java/org/apache/bookkeeper/stream/server => storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc}/handler/ResponseHandler.java (96%)
 rename stream/{server/src/main/java/org/apache/bookkeeper/stream/server => storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc}/handler/StorageContainerResponseHandler.java (96%)
 rename stream/{server/src/main/java/org/apache/bookkeeper/stream/server => storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc}/handler/package-info.java (93%)
 copy {bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service => stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc}/package-info.java (87%)
 rename stream/{server/src/test/java/org/apache/bookkeeper/stream/server => storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl}/grpc/TestGrpcMetaRangeService.java (96%)
 rename stream/{server/src/test/java/org/apache/bookkeeper/stream/server => storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl}/grpc/TestGrpcRootRangeService.java (96%)
 rename stream/{server/src/test/java/org/apache/bookkeeper/stream/server => storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl}/grpc/TestGrpcTableService.java (99%)
 rename stream/{server/src/test/java/org/apache/bookkeeper/stream/server => storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc}/TestResponseObserver.java (96%)
 rename stream/{server/src/test/java/org/apache/bookkeeper/stream/server => storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc}/handler/TestStorageContainerResponseHandler.java (97%)
 copy stream/{proto => tests-common}/pom.xml (83%)
 create mode 100644 stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java
 copy {bookkeeper-server/src/main/java/org/apache/bookkeeper/metastore => stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc}/package-info.java (91%)
 rename stream/{proto => tests-common}/src/main/proto/proto2_coder_test_messages.proto (100%)
 copy stream/{proto/src/main/proto/cluster.proto => tests-common/src/main/proto/rpc.proto} (59%)
 delete mode 100644 stream/tests/integration/pom.xml
 delete mode 100644 stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
 delete mode 100644 stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
 delete mode 100644 stream/tests/integration/src/test/resources/log4j.properties
 delete mode 100644 stream/tests/pom.xml
 rename {stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration => tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream}/LocationClientTest.java (79%)
 rename {stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration => tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream}/StorageAdminClientTest.java (86%)
 create mode 100644 tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
 rename {stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration => tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream}/TableClientSimpleTest.java (85%)
 rename {stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration => tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream}/TableClientTest.java (89%)
 copy stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageResourcesSpec.java => tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java (59%)

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 08/10: Use maven-exec-plugin to get the maven project version for CI jobs

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit edeeb102f4acb08fa39ae6868c43ca3572bbacce
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu May 24 15:48:59 2018 -0700

    Use maven-exec-plugin to get the maven project version for CI jobs
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    The current approach to get project version isn't reliable on CI environment. It might have garbage output (e.g. exceptions).
    
    *Solution*
    
    Changed to use `exec-maven-plugin` to echo `${project.vesion}`. It provides a more reliable approach to get version on CI environments.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>
    
    This closes #1437 from sijie/fix_getversion_problem
---
 dev/common.sh | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/dev/common.sh b/dev/common.sh
index d172d07..fb4be74 100644
--- a/dev/common.sh
+++ b/dev/common.sh
@@ -21,7 +21,11 @@
 #
 
 function get_bk_version() {
-    bk_version=`mvn org.apache.maven.plugins:maven-help-plugin:2.1.1:evaluate -Dexpression=project.version | grep -Ev '(^\[|Download\w+:)' 2> /dev/null`
+    bk_version=$(mvn -q \
+    -Dexec.executable="echo" \
+    -Dexec.args='${project.version}' \
+    --non-recursive \
+    org.codehaus.mojo:exec-maven-plugin:1.3.1:exec)
     echo ${bk_version}
 }
 

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 09/10: [TABLE SERVICE] client interceptor and storage container grpc proxy

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit c9b32d7d3cc352dd6fdd998e91ba15f206b7a568
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri May 25 01:02:06 2018 -0700

    [TABLE SERVICE] client interceptor and storage container grpc proxy
    
    Descriptions of the changes in this PR:
    
    This is a subsequent change after apache/bookkeeper#1428.
    
    *Motivation*
    
    Current almost every grpc requests are wrapped into `StorageContainerRequest` and their responses
    are wrapped into `StorageContainerResponse`. It makes things a bit complicated on adding new grpc
    services.
    
    *Changes*
    
    To simplify things, this PR introduces two functionalities for simplifying dispatching container requests/responses.
    
    1) *StorageContainerClientInterceptor*: A grpc `ClientInterceptor` that stamps container information (currently is `scId`) into the requests' metadata before sending the requests to the wire.
    
    2) A simple grpc reverse proxy to dispatch grpc requests to the channels provided by a `ChannelFinder`.
    
    *Tests*
    
    1. Existing unit tests covered client interceptor changes.
    2. Introduced a `stream-storage-tests-common` module to include common classes that would be used for testing.
    3. Introduced a `PingPongService` for testing reverse proxy : unary/client-streaming/server-streaming/bidi-streaming.
    
    Master Issue: #1205
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1430 from sijie/interceptor_container_requests
---
 .../clients/impl/channel/StorageServerChannel.java |  30 +-
 .../impl/container/StorageContainerChannel.java    |   6 +-
 .../StorageContainerClientInterceptor.java         |  60 ++++
 stream/common/pom.xml                              |   6 +
 .../grpc/netty/IdentityBinaryMarshaller.java       |  45 +++
 .../grpc/netty/IdentityInputStreamMarshaller.java  |  46 +++
 .../common/grpc/netty/LongBinaryMarshaller.java    |  46 +++
 .../bookkeeper/common/grpc/netty/package-info.java |  22 ++
 .../bookkeeper/common/grpc/package-info.java       |  22 ++
 .../common/grpc/proxy/ChannelFinder.java           |  40 +++
 .../bookkeeper/common/grpc/proxy/ProxyCall.java    | 138 +++++++++
 .../common/grpc/proxy/ProxyHandlerRegistry.java    | 135 ++++++++
 .../common/grpc/proxy/ProxyServerCallHandler.java  |  56 ++++
 .../bookkeeper/common/grpc/proxy/package-info.java |  22 ++
 .../grpc/netty/IdentityBinaryMarshallerTest.java   |  52 ++++
 .../netty/IdentityInputStreamMarshallerTest.java   |  44 +++
 .../grpc/netty/LongBinaryMarshallerTest.java       |  41 +++
 .../grpc/proxy/DirectPingPongServiceTest.java      |  31 ++
 .../common/grpc/proxy/PingPongServiceTestBase.java | 345 +++++++++++++++++++++
 .../grpc/proxy/ProxyPingPongServiceTest.java       |  29 ++
 stream/pom.xml                                     |   1 +
 stream/proto/pom.xml                               |   6 +
 .../TestStorageContainerResponseHandler.java       |   2 +-
 stream/{proto => tests-common}/pom.xml             |  20 +-
 .../bookkeeper/tests/rpc/PingPongService.java      | 119 +++++++
 .../main/proto/proto2_coder_test_messages.proto    |   0
 stream/tests-common/src/main/proto/rpc.proto       |  48 +++
 27 files changed, 1396 insertions(+), 16 deletions(-)

diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index 34dc957..7e1e022 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -19,12 +19,15 @@
 package org.apache.bookkeeper.clients.impl.channel;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.grpc.Channel;
+import io.grpc.ClientInterceptors;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import java.util.Optional;
 import java.util.function.Function;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
 import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
@@ -53,7 +56,7 @@ public class StorageServerChannel implements AutoCloseable {
     }
 
     private final Optional<String> token;
-    private final ManagedChannel channel;
+    private final Channel channel;
 
     @GuardedBy("this")
     private RootRangeServiceFutureStub rootRangeService;
@@ -87,6 +90,11 @@ public class StorageServerChannel implements AutoCloseable {
     @VisibleForTesting
     public StorageServerChannel(ManagedChannel channel,
                                 Optional<String> token) {
+        this((Channel) channel, token);
+    }
+
+    protected StorageServerChannel(Channel channel,
+                                   Optional<String> token) {
         this.token = token;
         this.channel = channel;
     }
@@ -127,8 +135,26 @@ public class StorageServerChannel implements AutoCloseable {
         return kvService;
     }
 
+    /**
+     * Create an intercepted server channel that add additional storage container metadata.
+     *
+     * @param scId storage container id
+     * @return an intercepted server channel.
+     */
+    public StorageServerChannel intercept(long scId) {
+        Channel interceptedChannel = ClientInterceptors.intercept(
+            this.channel,
+            new StorageContainerClientInterceptor(scId));
+
+        return new StorageServerChannel(
+            interceptedChannel,
+            this.token);
+    }
+
     @Override
     public void close() {
-        channel.shutdown();
+        if (channel instanceof ManagedChannel) {
+            ((ManagedChannel) channel).shutdown();
+        }
     }
 }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
index 8635e0f..a20d7c9 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
@@ -190,9 +190,13 @@ public class StorageContainerChannel {
             }
             return;
         }
+
+        // intercept the storage server channel with additional sc metadata
+        StorageServerChannel interceptedChannel = serverChannel.intercept(scId);
+
         // update the future
         synchronized (this) {
-            rsChannelFuture.complete(serverChannel);
+            rsChannelFuture.complete(interceptedChannel);
         }
     }
 
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
new file mode 100644
index 0000000..284431e
--- /dev/null
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerClientInterceptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.container;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientInterceptors.CheckedForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
+
+/**
+ * A client interceptor that intercepting outgoing calls to storage containers.
+ */
+public class StorageContainerClientInterceptor implements ClientInterceptor {
+
+    private static final String SC_ID_KEY = "SC_ID";
+
+    private final long scId;
+    private final Metadata.Key<Long> scIdKey;
+
+    public StorageContainerClientInterceptor(long scId) {
+        this.scId = scId;
+        this.scIdKey = Metadata.Key.of(
+            SC_ID_KEY,
+            LongBinaryMarshaller.of());
+    }
+
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
+                                                               CallOptions callOptions,
+                                                               Channel next) {
+        return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
+            @Override
+            protected void checkedStart(Listener<RespT> responseListener,
+                                        Metadata headers) throws Exception {
+                headers.put(scIdKey, scId);
+                delegate().start(responseListener, headers);
+            }
+        };
+    }
+}
diff --git a/stream/common/pom.xml b/stream/common/pom.xml
index 43fdc36..50e4edd 100644
--- a/stream/common/pom.xml
+++ b/stream/common/pom.xml
@@ -40,5 +40,11 @@
       <groupId>io.grpc</groupId>
       <artifactId>grpc-all</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper.tests</groupId>
+      <artifactId>stream-storage-tests-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshaller.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshaller.java
new file mode 100644
index 0000000..8b1c939
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshaller.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import io.grpc.Metadata.BinaryMarshaller;
+
+/**
+ * Marshaller for byte array.
+ */
+public class IdentityBinaryMarshaller implements BinaryMarshaller<byte[]> {
+
+    public static IdentityBinaryMarshaller of() {
+        return INSTANCE;
+    }
+
+    private static final IdentityBinaryMarshaller INSTANCE = new IdentityBinaryMarshaller();
+
+    private IdentityBinaryMarshaller() {}
+
+    @Override
+    public byte[] toBytes(byte[] value) {
+        return value;
+    }
+
+    @Override
+    public byte[] parseBytes(byte[] serialized) {
+        return serialized;
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshaller.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshaller.java
new file mode 100644
index 0000000..fd54b5e
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshaller.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import io.grpc.MethodDescriptor;
+import java.io.InputStream;
+
+/**
+ * An identity marshaller.
+ */
+public class IdentityInputStreamMarshaller implements MethodDescriptor.Marshaller<InputStream>  {
+
+    public static IdentityInputStreamMarshaller of() {
+        return INSTANCE;
+    }
+
+    private static final IdentityInputStreamMarshaller INSTANCE = new IdentityInputStreamMarshaller();
+
+    private IdentityInputStreamMarshaller() {}
+
+    @Override
+    public InputStream stream(InputStream value) {
+        return value;
+    }
+
+    @Override
+    public InputStream parse(InputStream stream) {
+        return stream;
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshaller.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshaller.java
new file mode 100644
index 0000000..92194f1
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshaller.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import io.grpc.Metadata.BinaryMarshaller;
+import org.apache.bookkeeper.common.util.Bytes;
+
+/**
+ * Marshaller for long numbers.
+ */
+public class LongBinaryMarshaller implements BinaryMarshaller<Long> {
+
+    public static LongBinaryMarshaller of() {
+        return INSTANCE;
+    }
+
+    private static final LongBinaryMarshaller INSTANCE = new LongBinaryMarshaller();
+
+    private LongBinaryMarshaller() {}
+
+    @Override
+    public byte[] toBytes(Long value) {
+        return Bytes.toBytes(value);
+    }
+
+    @Override
+    public Long parseBytes(byte[] serialized) {
+        return Bytes.toLong(serialized, 0);
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/package-info.java
new file mode 100644
index 0000000..3e69f68
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/netty/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common Grpc Netty Utils.
+ */
+package org.apache.bookkeeper.common.grpc.netty;
\ No newline at end of file
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/package-info.java
new file mode 100644
index 0000000..d943bdf
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common Grpc Utils.
+ */
+package org.apache.bookkeeper.common.grpc;
\ No newline at end of file
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java
new file mode 100644
index 0000000..698c380
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ChannelFinder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import io.grpc.Channel;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+
+/**
+ * Find a grpc {@link io.grpc.Channel} to route the requests.
+ */
+public interface ChannelFinder {
+
+    /**
+     * Find a channel to route the server call.
+     *
+     * @param serverCall server call
+     * @param headers request metadata
+     * @return channel to route the server call.
+     */
+    Channel findChannel(ServerCall<?, ?> serverCall,
+                        Metadata headers);
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java
new file mode 100644
index 0000000..35f6d2d
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyCall.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.Status;
+import lombok.Getter;
+
+/**
+ * Proxy grpc calls.
+ */
+@Getter
+class ProxyCall<ReqT, RespT> {
+
+    private final RequestProxy serverCallListener;
+    private final ResponseProxy clientCallListener;
+
+    ProxyCall(ServerCall<ReqT, RespT> serverCall,
+              ClientCall<ReqT, RespT> clientCall) {
+        this.serverCallListener = new RequestProxy(clientCall);
+        this.clientCallListener = new ResponseProxy(serverCall);
+    }
+
+    /**
+     * Request proxy to delegate client call.
+     */
+    private class RequestProxy extends ServerCall.Listener<ReqT> {
+
+        private final ClientCall<ReqT, ?> clientCall;
+        private boolean needToRequest;
+
+        public RequestProxy(ClientCall<ReqT, ?> clientCall) {
+            this.clientCall = clientCall;
+        }
+
+        @Override
+        public void onMessage(ReqT message) {
+            clientCall.sendMessage(message);
+            synchronized (this) {
+                if (clientCall.isReady()) {
+                    clientCallListener.serverCall.request(1);
+                } else {
+                    needToRequest = true;
+                }
+            }
+        }
+
+        @Override
+        public void onHalfClose() {
+            clientCall.halfClose();
+        }
+
+        @Override
+        public void onCancel() {
+            clientCall.cancel("Server cancelled", null);
+        }
+
+        @Override
+        public void onReady() {
+            clientCallListener.onServerReady();
+        }
+
+        synchronized void onClientReady() {
+            if (needToRequest) {
+                clientCallListener.serverCall.request(1);
+                needToRequest = false;
+            }
+        }
+
+    }
+
+    /**
+     * Response proxy to delegate server call.
+     */
+    private class ResponseProxy extends ClientCall.Listener<RespT> {
+
+        private final ServerCall<?, RespT> serverCall;
+        private boolean needToRequest;
+
+        public ResponseProxy(ServerCall<?, RespT> serverCall) {
+            this.serverCall = serverCall;
+        }
+
+        @Override
+        public void onHeaders(Metadata headers) {
+            serverCall.sendHeaders(headers);
+        }
+
+        @Override
+        public void onMessage(RespT message) {
+            serverCall.sendMessage(message);
+            synchronized (this) {
+                if (serverCall.isReady()) {
+                    serverCallListener.clientCall.request(1);
+                } else {
+                    needToRequest = true;
+                }
+            }
+        }
+
+        @Override
+        public void onClose(Status status, Metadata trailers) {
+            serverCall.close(status, trailers);
+        }
+
+        @Override
+        public void onReady() {
+            serverCallListener.onClientReady();
+        }
+
+        synchronized void onServerReady() {
+            if (needToRequest) {
+                serverCallListener.clientCall.request(1);
+                needToRequest = false;
+            }
+        }
+
+    }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java
new file mode 100644
index 0000000..94cebb1
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyHandlerRegistry.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.grpc.CallOptions;
+import io.grpc.HandlerRegistry;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerMethodDefinition;
+import io.grpc.ServerServiceDefinition;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.bookkeeper.common.grpc.netty.IdentityInputStreamMarshaller;
+
+/**
+ * Registry for proxying grpc services.
+ */
+public class ProxyHandlerRegistry extends HandlerRegistry {
+
+    private final Map<String, ServerMethodDefinition<?, ?>> methods;
+
+    private ProxyHandlerRegistry(Map<String, ServerMethodDefinition<?, ?>> methods) {
+        this.methods = methods;
+    }
+
+    @Nullable
+    @Override
+    public ServerMethodDefinition<?, ?> lookupMethod(String methodName,
+                                                     @Nullable String authority) {
+        return methods.get(methodName);
+    }
+
+    private static ServerMethodDefinition<?, ?> createProxyServerMethodDefinition(
+            MethodDescriptor<?, ?> methodDesc,
+            ServerCallHandler<InputStream, InputStream> handler) {
+        MethodDescriptor<InputStream, InputStream> methodDescriptor = MethodDescriptor.newBuilder(
+            IdentityInputStreamMarshaller.of(), IdentityInputStreamMarshaller.of())
+            .setFullMethodName(methodDesc.getFullMethodName())
+            .setType(methodDesc.getType())
+            .setIdempotent(methodDesc.isIdempotent())
+            .setSafe(methodDesc.isSafe())
+            .build();
+        return ServerMethodDefinition.create(methodDescriptor, handler);
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder to build the handler registry.
+     */
+    public static class Builder {
+
+        // store per-service first, to make sure services are added/replaced atomically.
+        private final HashMap<String, ServerServiceDefinition> services =
+            new LinkedHashMap<>();
+        private ChannelFinder finder;
+
+        /**
+         * Add the service to this grpc handler registry.
+         *
+         * @param service grpc service definition
+         * @return registry builder
+         */
+        public Builder addService(ServerServiceDefinition service) {
+            services.put(
+                service.getServiceDescriptor().getName(),
+                service);
+            return this;
+        }
+
+        /**
+         * Registered a channel finder for proxying server calls.
+         *
+         * @param finder channel finder
+         * @return registry builder
+         */
+        public Builder setChannelFinder(ChannelFinder finder) {
+            this.finder = finder;
+            return this;
+        }
+
+        /**
+         * Build the proxy handler registry.
+         *
+         * @return registry builder
+         */
+        public ProxyHandlerRegistry build() {
+            checkNotNull(finder, "No channel finder defined");
+
+            ProxyServerCallHandler<InputStream, InputStream> proxyHandler =
+                new ProxyServerCallHandler<>(finder, CallOptions.DEFAULT);
+
+            Map<String, ServerMethodDefinition<?, ?>> methods = new HashMap<>();
+            for (ServerServiceDefinition service : services.values()) {
+                for (ServerMethodDefinition<?, ?> method : service.getMethods()) {
+                    String methodName = method.getMethodDescriptor().getFullMethodName();
+                    methods.put(
+                        methodName,
+                        createProxyServerMethodDefinition(
+                            method.getMethodDescriptor(),
+                            proxyHandler)
+                    );
+                }
+            }
+            return new ProxyHandlerRegistry(
+                Collections.unmodifiableMap(methods));
+        }
+
+    }
+
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java
new file mode 100644
index 0000000..a691497
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+
+/**
+ * Abstract server call handler
+ */
+class ProxyServerCallHandler<ReqT, RespT> implements ServerCallHandler<ReqT, RespT> {
+
+    private final ChannelFinder finder;
+    private final CallOptions callOptions;
+
+    ProxyServerCallHandler(ChannelFinder finder,
+                           CallOptions callOptions) {
+        this.finder = finder;
+        this.callOptions = callOptions;
+    }
+
+    @Override
+    public Listener<ReqT> startCall(ServerCall<ReqT, RespT> serverCall, Metadata headers) {
+        Channel channel = finder.findChannel(serverCall, headers);
+        ClientCall<ReqT, RespT> clientCall = channel.newCall(
+            serverCall.getMethodDescriptor(), callOptions);
+        ProxyCall<ReqT, RespT> proxyCall = new ProxyCall<>(
+            serverCall,
+            clientCall);
+        clientCall.start(proxyCall.getClientCallListener(), headers);
+        serverCall.request(1);
+        clientCall.request(1);
+        return proxyCall.getServerCallListener();
+    }
+}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/package-info.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/package-info.java
new file mode 100644
index 0000000..7e023f5
--- /dev/null
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Grpc Utils for proxying grpc requests.
+ */
+package org.apache.bookkeeper.common.grpc.proxy;
\ No newline at end of file
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java
new file mode 100644
index 0000000..730cc6f
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityBinaryMarshallerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertSame;
+
+import java.util.concurrent.ThreadLocalRandom;
+import org.junit.Test;
+
+/**
+ * Unit test {@link IdentityBinaryMarshaller}.
+ */
+public class IdentityBinaryMarshallerTest {
+
+    @Test
+    public void testParseAndToBytes() {
+        byte[] data = new byte[32];
+        ThreadLocalRandom.current().nextBytes(data);
+        byte[] dataCopy = new byte[data.length];
+        System.arraycopy(data, 0, dataCopy, 0, data.length);
+
+        byte[] serializedData = IdentityBinaryMarshaller.of().toBytes(data);
+        // identity binary marshaller should return same object
+        assertSame(data, serializedData);
+        // identity binary marshaller should return same content
+        assertArrayEquals(dataCopy, serializedData);
+
+        byte[] deserializedData = IdentityBinaryMarshaller.of().parseBytes(data);
+        // identity binary marshaller should return same object
+        assertSame(data, deserializedData);
+        // identity binary marshaller should return same content
+        assertArrayEquals(dataCopy, deserializedData);
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java
new file mode 100644
index 0000000..8cafc2f
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/IdentityInputStreamMarshallerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+
+import java.io.InputStream;
+import org.junit.Test;
+
+/**
+ * Unit test {@link IdentityInputStreamMarshaller}.
+ */
+public class IdentityInputStreamMarshallerTest {
+
+    @Test
+    public void testStream() {
+        InputStream mockIs = mock(InputStream.class);
+        assertSame(mockIs, IdentityInputStreamMarshaller.of().stream(mockIs));
+    }
+
+    @Test
+    public void testParse() {
+        InputStream mockIs = mock(InputStream.class);
+        assertSame(mockIs, IdentityInputStreamMarshaller.of().parse(mockIs));
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java
new file mode 100644
index 0000000..e4dccb5
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/netty/LongBinaryMarshallerTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.netty;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.common.util.Bytes;
+import org.junit.Test;
+
+/**
+ * Unit test {@link LongBinaryMarshaller}.
+ */
+public class LongBinaryMarshallerTest {
+
+    @Test
+    public void testParseAndToBytes() {
+        long value = System.currentTimeMillis();
+        byte[] valueBytes = LongBinaryMarshaller.of().toBytes(value);
+        assertArrayEquals(Bytes.toBytes(value), valueBytes);
+        long parsedValue = LongBinaryMarshaller.of().parseBytes(valueBytes);
+        assertEquals(value, parsedValue);
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java
new file mode 100644
index 0000000..4611efe
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/DirectPingPongServiceTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+/**
+ * Test PingPongService by directly accessing the grpc service.
+ *
+ * <p>This is to ensure the tests in {@link PingPongServiceTestBase} are correct to be used for testing
+ * reverse proxy in {@link ProxyPingPongServiceTest}.
+ */
+public class DirectPingPongServiceTest extends PingPongServiceTestBase {
+    public DirectPingPongServiceTest() {
+        super(false);
+    }
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java
new file mode 100644
index 0000000..ce3dcfd
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/PingPongServiceTestBase.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+import lombok.Data;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.ExceptionalFunction;
+import org.apache.bookkeeper.tests.rpc.PingPongService;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceBlockingStub;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceStub;
+import org.bookkeeper.tests.proto.rpc.PingRequest;
+import org.bookkeeper.tests.proto.rpc.PongResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test grpc reverse proxy using {@link org.apache.bookkeeper.tests.rpc.PingPongService}.
+ */
+public abstract class PingPongServiceTestBase {
+
+    private static final int NUM_PONGS_PER_PING = 10;
+    private static final String SERVICE_NAME = "pingpong";
+
+    private final boolean useReverseProxy;
+
+    protected Server realServer;
+    protected Server proxyServer;
+    protected PingPongService service;
+    protected ManagedChannel proxyChannel;
+    protected ManagedChannel clientChannel;
+    protected PingPongServiceStub client;
+
+    PingPongServiceTestBase(boolean useReverseProxy) {
+        this.useReverseProxy = useReverseProxy;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        service = new PingPongService(NUM_PONGS_PER_PING);
+        ServerServiceDefinition pingPongServiceDef = service.bindService();
+
+        String serverName;
+        if (useReverseProxy) {
+            serverName = "proxy-" + SERVICE_NAME;
+        } else {
+            serverName = SERVICE_NAME;
+        }
+        // build a real server
+        MutableHandlerRegistry realRegistry = new MutableHandlerRegistry();
+        realServer = InProcessServerBuilder
+            .forName(serverName)
+            .fallbackHandlerRegistry(realRegistry)
+            .directExecutor()
+            .build()
+            .start();
+        realRegistry.addService(pingPongServiceDef);
+
+        if (useReverseProxy) {
+            proxyChannel = InProcessChannelBuilder.forName(serverName)
+                .usePlaintext(false)
+                .build();
+
+            ProxyHandlerRegistry registry = ProxyHandlerRegistry.newBuilder()
+                .addService(pingPongServiceDef)
+                .setChannelFinder((serverCall, header) -> proxyChannel)
+                .build();
+            proxyServer = InProcessServerBuilder
+                .forName(SERVICE_NAME)
+                .fallbackHandlerRegistry(registry)
+                .directExecutor()
+                .build()
+                .start();
+        } else {
+            proxyServer = realServer;
+        }
+
+        clientChannel = InProcessChannelBuilder.forName(SERVICE_NAME)
+            .usePlaintext(false)
+            .build();
+
+        client = PingPongServiceGrpc.newStub(clientChannel);
+
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (null != clientChannel) {
+            clientChannel.shutdown();
+        }
+
+        if (null != proxyServer) {
+            proxyServer.shutdown();
+        }
+
+        if (null != proxyChannel) {
+            proxyChannel.shutdown();
+        }
+
+        if (null != realServer && proxyServer != realServer) {
+            realServer.shutdown();
+        }
+    }
+
+
+    @Test
+    public void testUnary() {
+        PingPongServiceBlockingStub clientBlocking = PingPongServiceGrpc.newBlockingStub(clientChannel);
+
+        long sequence = ThreadLocalRandom.current().nextLong();
+        PingRequest request = PingRequest.newBuilder()
+            .setSequence(sequence)
+            .build();
+        PongResponse response = clientBlocking.pingPong(request);
+        assertEquals(sequence, response.getLastSequence());
+        assertEquals(1, response.getNumPingReceived());
+        assertEquals(0, response.getSlotId());
+    }
+
+    @Test
+    public void testServerStreaming() {
+        PingPongServiceBlockingStub clientBlocking = PingPongServiceGrpc.newBlockingStub(clientChannel);
+
+        long sequence = ThreadLocalRandom.current().nextLong(100000);
+        PingRequest request = PingRequest.newBuilder()
+            .setSequence(sequence)
+            .build();
+        Iterator<PongResponse> respIter = clientBlocking.lotsOfPongs(request);
+        int count = 0;
+        while (respIter.hasNext()) {
+            PongResponse resp = respIter.next();
+            assertEquals(sequence, resp.getLastSequence());
+            assertEquals(1, resp.getNumPingReceived());
+            assertEquals(count, resp.getSlotId());
+            ++count;
+        }
+    }
+
+    @Test
+    public void testClientStreaming() throws Exception {
+        final int numPings = 100;
+        final long sequence = ThreadLocalRandom.current().nextLong(100000);
+        final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+        final LinkedBlockingQueue<PongResponse> respQueue = new LinkedBlockingQueue<>();
+        StreamObserver<PingRequest> pinger = client.lotsOfPings(new StreamObserver<PongResponse>() {
+            @Override
+            public void onNext(PongResponse resp) {
+                respQueue.offer(resp);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                respFuture.completeExceptionally(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                FutureUtils.complete(respFuture, null);
+            }
+        });
+
+        for (int i = 0; i < numPings; i++) {
+            PingRequest request = PingRequest.newBuilder()
+                .setSequence(sequence + i)
+                .build();
+            pinger.onNext(request);
+        }
+        pinger.onCompleted();
+
+        // wait for response to be received.
+        result(respFuture);
+
+        assertEquals(1, respQueue.size());
+
+        PongResponse resp = respQueue.take();
+        assertEquals(sequence + numPings - 1, resp.getLastSequence());
+        assertEquals(numPings, resp.getNumPingReceived());
+        assertEquals(0, resp.getSlotId());
+    }
+
+    @Test
+    public void testBidiStreaming() throws Exception {
+        final int numPings = 100;
+
+        final CompletableFuture<Void> respFuture = new CompletableFuture<>();
+        final LinkedBlockingQueue<PongResponse> respQueue = new LinkedBlockingQueue<>();
+        StreamObserver<PingRequest> pinger = client.bidiPingPong(new StreamObserver<PongResponse>() {
+            @Override
+            public void onNext(PongResponse resp) {
+                respQueue.offer(resp);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                respFuture.completeExceptionally(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                FutureUtils.complete(respFuture, null);
+            }
+        });
+
+        final LinkedBlockingQueue<PingRequest> reqQueue = new LinkedBlockingQueue<>();
+        for (int i = 0; i < numPings; i++) {
+            final long sequence = ThreadLocalRandom.current().nextLong(100000);
+            PingRequest request = PingRequest.newBuilder()
+                .setSequence(sequence)
+                .build();
+            reqQueue.put(request);
+            pinger.onNext(request);
+        }
+        pinger.onCompleted();
+
+        // wait for response to be received
+        result(respFuture);
+
+        assertEquals(numPings, respQueue.size());
+
+        int count = 0;
+        for (PingRequest request : reqQueue) {
+            PongResponse response = respQueue.take();
+
+            assertEquals(request.getSequence(), response.getLastSequence());
+            assertEquals(++count, response.getNumPingReceived());
+            assertEquals(0, response.getSlotId());
+        }
+        assertNull(respQueue.poll());
+        assertEquals(numPings, count);
+    }
+
+    @Data
+    static class Runner implements Runnable {
+
+        private final CountDownLatch startLatch;
+        private final CountDownLatch doneLatch;
+        private final AtomicReference<Exception> exceptionHolder;
+        private final ExceptionalFunction<Void, Void> func;
+
+        @Override
+        public void run() {
+            try {
+                startLatch.await();
+            } catch (InterruptedException e) {
+            }
+            int numIters = ThreadLocalRandom.current().nextInt(10, 100);
+            IntStream.of(numIters).forEach(idx -> {
+                if (null != exceptionHolder.get()) {
+                    // break if exception occurs
+                    return;
+                }
+                try {
+                    func.apply(null);
+                } catch (Exception e) {
+                    exceptionHolder.set(e);
+                    doneLatch.countDown();
+                }
+            });
+            if (null == exceptionHolder.get()) {
+                doneLatch.countDown();
+            }
+        }
+    }
+
+    @Test
+    public void testMixed() throws Exception {
+        int numTypes = 4;
+
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch doneLatch = new CountDownLatch(numTypes);
+        final AtomicReference<Exception> exception = new AtomicReference<>();
+
+        ExecutorService executor = Executors.newFixedThreadPool(numTypes);
+        // start unmary test
+        executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+            testUnary();
+            return null;
+        }));
+
+        // start client streaming tests
+        executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+            testClientStreaming();
+            return null;
+        }));
+
+        // start server streaming tests
+        executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+            testServerStreaming();
+            return null;
+        }));
+
+        // start bidi streaming tests
+        executor.submit(new Runner(startLatch, doneLatch, exception, ignored -> {
+            testBidiStreaming();
+            return null;
+        }));
+
+        // start the tests
+        startLatch.countDown();
+
+        // wait for tests to complete
+        doneLatch.await();
+
+        // make sure all succeed
+        assertNull("Exception found : " + exception.get(), exception.get());
+    }
+
+}
diff --git a/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java
new file mode 100644
index 0000000..a099717
--- /dev/null
+++ b/stream/common/src/test/java/org/apache/bookkeeper/common/grpc/proxy/ProxyPingPongServiceTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.grpc.proxy;
+
+/**
+ * Test reverse grpc proxy using ping pong service.
+ */
+public class ProxyPingPongServiceTest extends PingPongServiceTestBase {
+
+    public ProxyPingPongServiceTest() {
+        super(true);
+    }
+}
diff --git a/stream/pom.xml b/stream/pom.xml
index f2580ec..340c6f4 100644
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -30,6 +30,7 @@
 
   <modules>
     <module>common</module>
+    <module>tests-common</module>
     <module>statelib</module>
     <module>api</module>
     <module>proto</module>
diff --git a/stream/proto/pom.xml b/stream/proto/pom.xml
index c943c77..644c312 100644
--- a/stream/proto/pom.xml
+++ b/stream/proto/pom.xml
@@ -41,6 +41,12 @@
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper.tests</groupId>
+      <artifactId>stream-storage-tests-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
index cf9b917..3eb440a 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/stream/proto/pom.xml b/stream/tests-common/pom.xml
similarity index 83%
copy from stream/proto/pom.xml
copy to stream/tests-common/pom.xml
index c943c77..0f6cc10 100644
--- a/stream/proto/pom.xml
+++ b/stream/tests-common/pom.xml
@@ -15,27 +15,23 @@
    See the License for the specific language governing permissions and
    limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.bookkeeper</groupId>
     <artifactId>stream-storage-parent</artifactId>
-    <version>4.7.1-SNAPSHOT</version>
+    <version>4.8.0-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
-  <groupId>org.apache.bookkeeper</groupId>
-  <artifactId>stream-storage-proto</artifactId>
-  <name>Apache BookKeeper :: Stream Storage :: Proto</name>
+  <groupId>org.apache.bookkeeper.tests</groupId>
+  <artifactId>stream-storage-tests-common</artifactId>
+  <name>Apache BookKeeper :: Stream Storage :: Common Classes for Tests</name>
 
   <dependencies>
     <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>stream-storage-common</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-all</artifactId>
     </dependency>
     <dependency>
       <groupId>com.google.protobuf</groupId>
diff --git a/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java b/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java
new file mode 100644
index 0000000..4a94776
--- /dev/null
+++ b/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/PingPongService.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.rpc;
+
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import org.bookkeeper.tests.proto.rpc.PingPongServiceGrpc.PingPongServiceImplBase;
+import org.bookkeeper.tests.proto.rpc.PingRequest;
+import org.bookkeeper.tests.proto.rpc.PongResponse;
+
+/**
+ * An implementation of the ping pong service used for testing.
+ */
+@Slf4j
+public class PingPongService extends PingPongServiceImplBase {
+
+    private final int streamPongSize;
+
+    public PingPongService(int streamPongSize) {
+        this.streamPongSize = streamPongSize;
+    }
+
+    @Override
+    public void pingPong(PingRequest request, StreamObserver<PongResponse> responseObserver) {
+        responseObserver.onNext(PongResponse.newBuilder()
+            .setLastSequence(request.getSequence())
+            .setNumPingReceived(1)
+            .setSlotId(0)
+            .build());
+        responseObserver.onCompleted();
+    }
+
+    @Override
+    public StreamObserver<PingRequest> lotsOfPings(StreamObserver<PongResponse> responseObserver) {
+        return new StreamObserver<PingRequest>() {
+
+            int pingCount = 0;
+            long lastSequence = -1L;
+
+            @Override
+            public void onNext(PingRequest value) {
+                pingCount++;
+                lastSequence = value.getSequence();
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                log.error("Failed on receiving stream of pings", t);
+            }
+
+            @Override
+            public void onCompleted() {
+                responseObserver.onNext(PongResponse.newBuilder()
+                    .setNumPingReceived(pingCount)
+                    .setLastSequence(lastSequence)
+                    .setSlotId(0)
+                    .build());
+                responseObserver.onCompleted();
+            }
+        };
+    }
+
+    @Override
+    public void lotsOfPongs(PingRequest request, StreamObserver<PongResponse> responseObserver) {
+        long sequence = request.getSequence();
+        for (int i = 0; i < streamPongSize; i++) {
+            responseObserver.onNext(PongResponse.newBuilder()
+                .setLastSequence(sequence)
+                .setNumPingReceived(1)
+                .setSlotId(i)
+                .build());
+        }
+        responseObserver.onCompleted();
+    }
+
+    @Override
+    public StreamObserver<PingRequest> bidiPingPong(StreamObserver<PongResponse> responseObserver) {
+        return new StreamObserver<PingRequest>() {
+
+            int pingCount = 0;
+
+            @Override
+            public void onNext(PingRequest ping) {
+                pingCount++;
+                responseObserver.onNext(PongResponse.newBuilder()
+                    .setLastSequence(ping.getSequence())
+                    .setNumPingReceived(pingCount)
+                    .setSlotId(0)
+                    .build());
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                responseObserver.onError(t);
+            }
+
+            @Override
+            public void onCompleted() {
+                responseObserver.onCompleted();
+            }
+        };
+    }
+}
diff --git a/stream/proto/src/main/proto/proto2_coder_test_messages.proto b/stream/tests-common/src/main/proto/proto2_coder_test_messages.proto
similarity index 100%
rename from stream/proto/src/main/proto/proto2_coder_test_messages.proto
rename to stream/tests-common/src/main/proto/proto2_coder_test_messages.proto
diff --git a/stream/tests-common/src/main/proto/rpc.proto b/stream/tests-common/src/main/proto/rpc.proto
new file mode 100644
index 0000000..8bda369
--- /dev/null
+++ b/stream/tests-common/src/main/proto/rpc.proto
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+package bookkeeper.tests.proto.rpc;
+
+option java_multiple_files = true;
+option java_package = "org.bookkeeper.tests.proto.rpc";
+
+message PingRequest {
+    int64 sequence = 1;
+}
+
+message PongResponse {
+    int64 last_sequence = 1;
+    int32 num_ping_received = 2;
+    // the slot id in this stream of pong responses.
+    int32 slot_id = 3;
+}
+
+service PingPongService {
+
+    rpc PingPong(PingRequest) returns (PongResponse) {}
+
+    rpc LotsOfPings(stream PingRequest) returns (PongResponse) {}
+
+    rpc LotsOfPongs(PingRequest) returns (stream PongResponse) {}
+
+    rpc BidiPingPong(stream PingRequest) returns (stream PongResponse) {}
+
+}
+
+

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 01/10: Build stream modules for integration tests

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 8ab854f283bb0aa6f04de806b80faf7b1162808a
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue May 22 00:27:46 2018 -0700

    Build stream modules for integration tests
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    apache/bookkeeper#1422 adds table service as part of integration tests. so we need to build stream modules
    in order to run integration tests for apache/bookkeeper#1422
    
    *Solution*
    
    Add "-Dstream" in the maven commands for `precommit-integrationtests` CI job.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1424 from sijie/compile_stream_module
---
 .test-infra/jenkins/job_bookkeeper_precommit_integrationtests.groovy | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/.test-infra/jenkins/job_bookkeeper_precommit_integrationtests.groovy b/.test-infra/jenkins/job_bookkeeper_precommit_integrationtests.groovy
index b7b0344..094edb1 100644
--- a/.test-infra/jenkins/job_bookkeeper_precommit_integrationtests.groovy
+++ b/.test-infra/jenkins/job_bookkeeper_precommit_integrationtests.groovy
@@ -49,7 +49,7 @@ freeStyleJob('bookkeeper_precommit_integrationtests') {
             // Set Maven parameters.
             common_job_properties.setMavenConfig(delegate)
 
-            goals('-B clean install -Pdocker')
+            goals('-B clean install -Dstream -Pdocker')
             properties(skipTests: true, interactiveMode: false)
         }
 
@@ -57,7 +57,7 @@ freeStyleJob('bookkeeper_precommit_integrationtests') {
             // Set Maven parameters.
             common_job_properties.setMavenConfig(delegate)
             rootPOM('tests/pom.xml')
-            goals('-B test -DintegrationTests')
+            goals('-B test -Dstream -DintegrationTests')
         }
 
         shell('kill $(cat docker-log.pid) || true')

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 07/10: Fix bookkeeper post commit CI

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 335082bf3c538fc7aa5eae4e67119092973858c5
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed May 23 15:42:02 2018 -0700

    Fix bookkeeper post commit CI
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    apache/bookkeeper#1422 includes stream storage integration tests in tests/integration.
    so we need to include `-Dstream` on building the tests.
    
    *Solution*
    
    Update the bookkeeper post commit CI jobs to include `-Dstream`
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>
    
    This closes #1429 from sijie/fix_post_commit_ci
---
 .test-infra/jenkins/job_bookkeeper_codecoverage.groovy | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.test-infra/jenkins/job_bookkeeper_codecoverage.groovy b/.test-infra/jenkins/job_bookkeeper_codecoverage.groovy
index 08a08b9..6127812 100644
--- a/.test-infra/jenkins/job_bookkeeper_codecoverage.groovy
+++ b/.test-infra/jenkins/job_bookkeeper_codecoverage.groovy
@@ -36,5 +36,5 @@ mavenJob('bookkeeper_codecoverage') {
   common_job_properties.setMavenConfig(delegate)
 
   // Maven build project.
-  goals('clean verify jacoco:report coveralls:report -Pcode-coverage -DrepoToken=$COVERALLS_REPO_TOKEN -Dmaven.test.failure.ignore=true')
+  goals('clean verify jacoco:report coveralls:report -Pcode-coverage -DrepoToken=$COVERALLS_REPO_TOKEN -Dmaven.test.failure.ignore=true -Dstream')
 }

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 05/10: Publish bookkeeper dev image to docker hub

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 609bf4ed7ebd1f043696d600a567f640cac3057f
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed May 23 10:56:04 2018 -0700

    Publish bookkeeper dev image to docker hub
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    We built a `bookkeeper-current` image that reflects latest master for integration tests. However currently we don't publish this docker image to any docker registry. So it is inconvenient to test latest master in dockerized environments.
    
    *Solution*
    
    - Add a dev script to publish `bookkeeper-current` image to configured docker registry specified at `DOCKER_REGISTRY`
    - Update the nightly snapshot job to publish generated `bookkeeper-current` image to docker hub `apachebookkeeper`
    - Add `bin/standalone` script to use `docker-compose` to launch a 3-nodes bookkeeper cluster locally. The script generates docker-compose.yml and handles mounting local volumes and port forwarding. so developers can use that for quick development.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1427 from sijie/push_current_docker_image
---
 .gitignore                                         |   9 ++
 .test-infra/jenkins/common_job_properties.groovy   |   1 +
 ...seed.groovy => jenkins_testing_job_seed.groovy} |   2 +-
 .../job_bookkeeper_release_nightly_snapshot.groovy |  23 ++-
 bin/standalone                                     |  31 ++++
 bin/standalone.docker-compose                      | 177 +++++++++++++++++++++
 dev/common.sh                                      |  30 ++++
 dev/publish-docker-images.sh                       |  70 ++++++++
 pom.xml                                            |   3 +
 9 files changed, 340 insertions(+), 6 deletions(-)

diff --git a/.gitignore b/.gitignore
index 15ad27f..4865c98 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,3 +20,12 @@ lib/
 log/
 target/
 dependency-reduced-pom.xml
+
+# Logs
+logs/
+
+# Vagrant
+**/.vagrant
+
+# Data directory
+data/
diff --git a/.test-infra/jenkins/common_job_properties.groovy b/.test-infra/jenkins/common_job_properties.groovy
index 7ff5fe1..2b51f42 100644
--- a/.test-infra/jenkins/common_job_properties.groovy
+++ b/.test-infra/jenkins/common_job_properties.groovy
@@ -110,6 +110,7 @@ class common_job_properties {
 
       credentialsBinding {
         string("COVERALLS_REPO_TOKEN", "bookkeeper-coveralls-token")
+        usernamePassword('DOCKER_USER', 'DOCKER_PASSWORD', 'bookkeeper_dockerhub')
       }
     }
   }
diff --git a/.test-infra/jenkins/job_testing_seed.groovy b/.test-infra/jenkins/jenkins_testing_job_seed.groovy
similarity index 97%
rename from .test-infra/jenkins/job_testing_seed.groovy
rename to .test-infra/jenkins/jenkins_testing_job_seed.groovy
index 5688ba7..a4cb50a 100644
--- a/.test-infra/jenkins/job_testing_seed.groovy
+++ b/.test-infra/jenkins/jenkins_testing_job_seed.groovy
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-job('bookkeeper-jenkins-testing/seed') {
+job('bookkeeper-jenkins-testing-seed') {
   description('Seed job, which allows DSL jobs to be tested before being pushed for review')
 
   // Source code management.
diff --git a/.test-infra/jenkins/job_bookkeeper_release_nightly_snapshot.groovy b/.test-infra/jenkins/job_bookkeeper_release_nightly_snapshot.groovy
index 02d2a60..5938e06 100644
--- a/.test-infra/jenkins/job_bookkeeper_release_nightly_snapshot.groovy
+++ b/.test-infra/jenkins/job_bookkeeper_release_nightly_snapshot.groovy
@@ -19,7 +19,7 @@
 import common_job_properties
 
 // This job deploys a snapshot of latest master to artifactory nightly
-mavenJob('bookkeeper_release_nightly_snapshot') {
+freeStyleJob('bookkeeper_release_nightly_snapshot') {
   description('runs a `mvn clean deploy` of the nightly snapshot for bookkeeper.')
 
   // Set common parameters.
@@ -37,9 +37,22 @@ mavenJob('bookkeeper_release_nightly_snapshot') {
       'Release Snapshot',
       '/release-snapshot')
 
-  // Set maven parameters.
-  common_job_properties.setMavenConfig(delegate)
+  steps {
+    maven {
+      // Set maven parameters.
+      common_job_properties.setMavenConfig(delegate)
 
-  // Maven build project.
-  goals('clean apache-rat:check package spotbugs:check -Dmaven.test.failure.ignore=true deploy -Ddistributedlog -Dstream -DstreamTests')
+      // Maven build project.
+      goals('clean apache-rat:check package spotbugs:check -Dmaven.test.failure.ignore=true deploy -Ddistributedlog -Dstream -DstreamTests -Pdocker')
+    }
+
+    // publish the docker images
+    shell '''
+export MAVEN_HOME=/home/jenkins/tools/maven/latest
+export PATH=$JAVA_HOME/bin:$MAVEN_HOME/bin:$PATH
+export MAVEN_OPTS=-Xmx2048m
+
+./dev/publish-docker-images.sh
+    '''.stripIndent().trim()
+  }
 }
diff --git a/bin/standalone b/bin/standalone
new file mode 100755
index 0000000..d4bc0f5
--- /dev/null
+++ b/bin/standalone
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+#
+# vim:et:ft=sh:sts=2:sw=2
+#
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+BINDIR=${BK_BINDIR:-"`dirname "$0"`"}
+
+DOCKER_COMPOSE=$(which docker-compose)
+if [ $? != 0 ]; then
+  echo "Error: docker-compose is not found in ${PATH}." 1>&2
+  exit 1
+else
+  ${BINDIR}/standalone.docker-compose $@
+fi
diff --git a/bin/standalone.docker-compose b/bin/standalone.docker-compose
new file mode 100755
index 0000000..25e9fcb
--- /dev/null
+++ b/bin/standalone.docker-compose
@@ -0,0 +1,177 @@
+#!/usr/bin/env bash
+#
+# vim:et:ft=sh:sts=2:sw=2
+#
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+BINDIR=${BK_BINDIR:-"`dirname "$0"`"}
+
+source ${BINDIR}/common.sh
+
+DOCKER_COMPOSE=docker-compose
+
+DATA_ROOT_DIR=${BK_DATA_DIR:-"${BK_HOME}/data"}
+mkdir -p ${DATA_ROOT_DIR}
+
+function gen_metadata_service_section() {
+  local cluster=$1
+  local image=$2
+  cat <<EOF
+  metadata-service:
+    image: ${image}
+    hostname: metadata-service
+    command: ["zookeeper"]
+    environment:
+      - ZK_dataDir=/data/zookeeper/data
+      - ZK_dataLogDir=/data/zookeeper/txlog
+      - ZK_standaloneEnabled=true
+    ports:
+      - "9990:9990"
+      - "2181:2181"
+    volumes:
+      - "${DATA_ROOT_DIR}/${cluster}/zookeeper/data:/data/zookeeper/data"
+      - "${DATA_ROOT_DIR}/${cluster}/zookeeper/txlog:/data/zookeeper/txlog"
+EOF
+}
+
+function gen_bookie_section() {
+  local cluster=$1
+  local bookie_name=$2
+  local bookie_port=$3
+  local image=$4
+  cat <<EOF
+  ${bookie_name}:
+    image: ${image}
+    depends_on:
+      - metadata-service
+    environment:
+      - BK_zkServers=metadata-service
+      - BK_metadataServiceUri=zk://metadata-service/ledgers
+      - BK_zkLedgersRootPath=/ledgers
+      - BK_journalDirectory=/data/bookkeeper/journal
+      - BK_ledgerDirectories=/data/bookkeeper/ledgers
+      - BK_indexDirectories=/data/bookkeeper/ledgers
+      - BK_advertisedAddress=localhost
+      - BK_bookiePort=${bookie_port}
+    ports:
+      - "${bookie_port}:${bookie_port}"
+    volumes:
+      - "${DATA_ROOT_DIR}/${cluster}/${bookie_name}/journal:/data/bookkeeper/journal"
+      - "${DATA_ROOT_DIR}/${cluster}/${bookie_name}/ledgers:/data/bookkeeper/ledgers"
+EOF
+}
+
+function generate_docker_compose_file() {
+  local cluster=$1
+  local num_bookies=$2
+  local image=$3
+  local docker_compose_file="${DATA_ROOT_DIR}/${cluster}/docker-compose.yml"
+
+  local metadata_service_section=$(gen_metadata_service_section ${cluster} ${image})
+    
+  echo "version: '3'"                 >  ${docker_compose_file}
+  echo ""                             >> ${docker_compose_file}
+  echo "services:"                    >> ${docker_compose_file}
+  echo ""                             >> ${docker_compose_file}
+  echo "${metadata_service_section}"  >> ${docker_compose_file}
+  echo ""                             >> ${docker_compose_file}
+  local BI=0
+  while [ ${BI} -lt $((num_bookies)) ]; do
+    local bookie_port=$((3181 + BI))
+    local bookie_section=$(gen_bookie_section ${cluster} "bookie-${BI}" ${bookie_port} ${image})
+    echo "${bookie_section}"        >> ${docker_compose_file}
+    let BI=BI+1
+  done
+}
+
+function show_help() {
+  cat <<EOF
+Usage: standalone [-c <cluster_name>] [-h] <action:[up|down]>
+EOF
+}
+
+# main entrypoint
+
+CLUSTER_NAME="bk-standalone-dc"
+OPTIND=1
+IMAGE=${IMAGE:-"apachebookkeeper/bookkeeper-current"}
+NUM_BOOKIES=${NUM_BOOKIES:-"3"}
+
+while getopts "h:c:" opt; do
+  case "${opt}" in
+  c )
+    CLUSTER_NAME=${OPTARG}
+    echo "use cluster = '${CLUSTER_NAME}'."
+    ;;
+  h|\? )
+    show_help
+    exit 1
+    ;;
+  esac
+done
+
+shift $((OPTIND-1))
+
+[ "${1:-}" = "--" ] && shift
+
+if [ $# -le 0 ]; then
+  show_help
+  exit 1
+fi
+
+ACTION=$1
+DOCKER_COMPOSE_OPTS=""
+case "${ACTION}" in
+  up)
+    DOCKER_COMPOSE_OPTS="--detach"
+    ;;
+  down)
+    ;;
+  *)
+    echo "Unknown action : ${ACTION}"
+    show_help
+    exit 1
+    ;;
+esac
+
+CLUSTER_DATA_ROOT="${DATA_ROOT_DIR}/${CLUSTER_NAME}"
+mkdir -p ${CLUSTER_DATA_ROOT}
+
+# generate docker compose file 
+DOCKER_COMPOSE_FILE="${CLUSTER_DATA_ROOT}/docker-compose.yml"
+
+if [ ! -f ${DOCKER_COMPOSE_FILE} ]; then
+  generate_docker_compose_file ${CLUSTER_NAME} ${NUM_BOOKIES} ${IMAGE}
+fi
+
+cd ${CLUSTER_DATA_ROOT}
+
+
+
+${DOCKER_COMPOSE} $@ ${DOCKER_COMPOSE_OPTS}
+
+if [ $? == 0 -a "${ACTION}" == "up" ]; then
+    echo ""
+    echo "Standalone cluster '${CLUSTER_NAME}' is up running."
+    echo "Use following uris to connect to standalone cluster:"
+    echo ""
+    echo "  - metadata service uri = 'zk://localhost/ledgers'"
+    echo "  - dlog uri             = 'distributedlog://localhost/distributedlog'"
+fi
+exit $?
diff --git a/dev/common.sh b/dev/common.sh
new file mode 100644
index 0000000..d172d07
--- /dev/null
+++ b/dev/common.sh
@@ -0,0 +1,30 @@
+#!/usr/bin/env bash
+#
+# vim:et:ft=sh:sts=2:sw=2
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+function get_bk_version() {
+    bk_version=`mvn org.apache.maven.plugins:maven-help-plugin:2.1.1:evaluate -Dexpression=project.version | grep -Ev '(^\[|Download\w+:)' 2> /dev/null`
+    echo ${bk_version}
+}
+
+export BK_DEV_DIR=`dirname "$0"`
+export BK_HOME=`cd ${BK_DEV_DIR}/..;pwd`
+export BK_VERSION=`get_bk_version`
diff --git a/dev/publish-docker-images.sh b/dev/publish-docker-images.sh
new file mode 100755
index 0000000..66d3e80
--- /dev/null
+++ b/dev/publish-docker-images.sh
@@ -0,0 +1,70 @@
+#!/usr/bin/env bash
+#
+# vim:et:ft=sh:sts=2:sw=2
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+###############################################################################
+# Script to publish docker images to docker hub. This script is run at Jenkins
+# after building the current images.
+#
+# The script is sourced from Apache Pulsar (incubating).
+# https://github.com/apache/incubator-pulsar/blob/master/docker/publish.sh
+#
+# usage: ./dev/publish-docker-images.sh
+
+source `dirname "$0"`/common.sh
+
+if [ -z "${DOCKER_USER}" ]; then
+    echo "Docker user in variable \$DOCKER_USER was not set. Skipping image publishing"
+    exit 1
+fi
+
+if [ -z "${DOCKER_PASSWORD}" ]; then
+    echo "Docker password in variable \$DOCKER_PASSWORD was not set. Skipping image publishing"
+    exit 1
+fi
+
+DOCKER_ORG="${DOCKER_ORG:-apachebookkeeper}"
+
+docker login ${DOCKER_REGISTRY} -u="${DOCKER_USER}" -p="${DOCKER_PASSWORD}"
+if [ $? -ne 0 ]; then
+    echo "Failed to loging to Docker Hub"
+    exit 1
+fi
+
+echo "BookKeeper Version: ${BK_VERSION}"
+
+if [[ -z ${DOCKER_REGISTRY} ]]; then
+    docker_registry_org=${DOCKER_ORG}
+else
+    docker_registry_org=${DOCKER_REGISTRY}/${DOCKER_ORG}
+fi
+echo "Starting to push images to ${docker_registry_org} as user ${DOCKER_USER}..."
+
+set -x
+
+# Fail if any of the subsequent commands fail
+set -e
+
+# Publish the current image
+docker push ${docker_registry_org}/bookkeeper-current:latest
+docker push ${docker_registry_org}/bookkeeper-current:$BK_VERSION
+
+echo "Finished pushing images to ${docker_registry_org}"
diff --git a/pom.xml b/pom.xml
index 8a4ef12..715a2c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -864,6 +864,9 @@
 
             <!-- logs -->
             <exclude>**/*.log</exclude>
+
+            <!-- data -->
+            <exclude>data/**</exclude>
           </excludes>
           <consoleOutput>true</consoleOutput>
         </configuration>

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 10/10: Fix checkstyle warnings

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 3c5d0960fbec42690932a07aa3df62f13d374f9d
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri May 25 10:49:57 2018 -0700

    Fix checkstyle warnings
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    There were two concurrent PRs merged at the same time.
    
    apache/bookkeeper#1435 (checkstyle changes) was merged before apache/bookkeeper#1430. so checkstyle warnings
    introduced by apache/bookkeeper#1430 were not caught by any CI jobs.
    
    *Solution*
    
    Address the checkstyle warnings introduced by apache/bookkeeper#1430
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>
    
    This closes #1444 from sijie/fix_checkstyle_issue
---
 .../common/grpc/proxy/ProxyServerCallHandler.java  |  2 +-
 .../apache/bookkeeper/tests/rpc/package-info.java  | 23 ++++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java
index a691497..2cc6652 100644
--- a/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/grpc/proxy/ProxyServerCallHandler.java
@@ -27,7 +27,7 @@ import io.grpc.ServerCall.Listener;
 import io.grpc.ServerCallHandler;
 
 /**
- * Abstract server call handler
+ * Abstract server call handler.
  */
 class ProxyServerCallHandler<ReqT, RespT> implements ServerCallHandler<ReqT, RespT> {
 
diff --git a/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/package-info.java b/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/package-info.java
new file mode 100644
index 0000000..96cf1c1
--- /dev/null
+++ b/stream/tests-common/src/main/java/org/apache/bookkeeper/tests/rpc/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Common rpc classes used for testing.
+ */
+package org.apache.bookkeeper.tests.rpc;
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 04/10: [TABLE SERVICE] Move integration tests under `stream/tests/integration` to `tests/integration/cluster`

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 86e23c26a51c6ef0b49f6164b3cb22bb44e08564
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue May 22 19:11:16 2018 -0700

    [TABLE SERVICE] Move integration tests under `stream/tests/integration` to `tests/integration/cluster`
    
    Descriptions of the changes in this PR:
    
    The original integration tests were written based a non-dockerized standalone stream cluster. Moved them to use
    the dockerized integration test framework. So all the integration tests are actually testing the table service run as part of bookies.
    
    This change is based on #1422 .
    a371ff2 is the change in this PR to be reviewed.
    
    Master Issue: #1205
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1423 from sijie/move_more_stream_it_tests
---
 .../bookkeeper/clients/StorageClientImpl.java      |   1 -
 .../clients/config/StorageClientSettings.java      |  11 ++
 .../clients/impl/channel/StorageServerChannel.java |  19 +++-
 .../impl/channel/StorageServerChannelManager.java  |   2 +-
 .../internal/StorageServerClientManagerImpl.java   |   2 +-
 .../clients/resolver/EndpointResolver.java         |  48 +++++++++
 stream/pom.xml                                     |   1 -
 .../util/StorageContainerPlacementPolicy.java      |   8 ++
 .../bookkeeper/stream/cluster/StreamCluster.java   |   3 +-
 .../bookkeeper/stream/server/StorageServer.java    |  25 +++--
 .../server/StreamStorageLifecycleComponent.java    |   1 -
 .../api/sc/StorageContainerManagerFactory.java     |   4 +-
 .../stream/storage/RangeStoreBuilder.java          |  22 +++-
 .../stream/storage/impl/RangeStoreImpl.java        |   8 +-
 .../stream/storage/impl/TestRangeStoreImpl.java    |   2 +-
 stream/tests/integration/pom.xml                   | 110 --------------------
 .../tests/integration/StorageClientTest.java       | 111 ---------------------
 .../tests/integration/StorageServerTestBase.java   |  80 ---------------
 .../src/test/resources/log4j.properties            |  55 ----------
 stream/tests/pom.xml                               |  32 ------
 .../integration/stream/LocationClientTest.java     |   6 +-
 .../stream}/StorageAdminClientTest.java            |  37 ++-----
 .../integration/stream/StreamClusterTestBase.java  |  34 +++++++
 .../integration/stream}/TableClientSimpleTest.java |  44 ++------
 .../tests/integration/stream}/TableClientTest.java |  44 ++------
 .../tests/integration/topologies/BKCluster.java    |  19 +++-
 26 files changed, 208 insertions(+), 521 deletions(-)

diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
index aff322c..37c8223 100644
--- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
+++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
@@ -116,7 +116,6 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable implements StorageCli
             serverManager.close();
             closeFuture.complete(null);
             SharedResourceManager.shared().release(resources.scheduler(), scheduler);
-            scheduler.shutdown();
         });
     }
 
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
index 20b5821..87768fa 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
@@ -24,6 +24,7 @@ import io.grpc.ManagedChannelBuilder;
 import io.grpc.NameResolver;
 import java.util.List;
 import java.util.Optional;
+import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.ClientConstants;
 import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
@@ -57,6 +58,15 @@ public interface StorageClientSettings {
     List<Endpoint> endpoints();
 
     /**
+     * Return the endpoint resolver for resolving individual endpoints.
+     *
+     * <p>The default resolver is an identity resolver.
+     *
+     * @return the endpoint resolver for resolving endpoints.
+     */
+    EndpointResolver endpointResolver();
+
+    /**
      * Returns the builder to create the managed channel.
      *
      * @return
@@ -99,6 +109,7 @@ public interface StorageClientSettings {
             numWorkerThreads(Runtime.getRuntime().availableProcessors());
             usePlaintext(true);
             backoffPolicy(ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+            endpointResolver(EndpointResolver.identity());
         }
 
         @Override
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index e8e72db..34dc957 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -24,6 +24,8 @@ import io.grpc.ManagedChannelBuilder;
 import java.util.Optional;
 import java.util.function.Function;
 import javax.annotation.concurrent.GuardedBy;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc;
@@ -42,8 +44,12 @@ import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceF
  */
 public class StorageServerChannel implements AutoCloseable {
 
-    public static Function<Endpoint, StorageServerChannel> factory(boolean usePlaintext) {
-        return (endpoint) -> new StorageServerChannel(endpoint, Optional.empty(), usePlaintext);
+    public static Function<Endpoint, StorageServerChannel> factory(StorageClientSettings settings) {
+        return (endpoint) -> new StorageServerChannel(
+            endpoint,
+            Optional.empty(),
+            settings.usePlaintext(),
+            settings.endpointResolver());
     }
 
     private final Optional<String> token;
@@ -63,14 +69,17 @@ public class StorageServerChannel implements AutoCloseable {
      *
      * @param endpoint range server endpoint.
      * @param token    token used to access range server
+     * @param usePlainText whether to plain text protocol or not
      */
     public StorageServerChannel(Endpoint endpoint,
                                 Optional<String> token,
-                                boolean usePlainText) {
+                                boolean usePlainText,
+                                EndpointResolver endpointResolver) {
         this.token = token;
+        Endpoint resolvedEndpoint = endpointResolver.resolve(endpoint);
         this.channel = ManagedChannelBuilder.forAddress(
-            endpoint.getHostname(),
-            endpoint.getPort())
+            resolvedEndpoint.getHostname(),
+            resolvedEndpoint.getPort())
             .usePlaintext(usePlainText)
             .build();
     }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
index 67b3d0f..308d25d 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
@@ -40,7 +40,7 @@ public class StorageServerChannelManager implements AutoCloseable {
     private final Function<Endpoint, StorageServerChannel> channelFactory;
 
     public StorageServerChannelManager(StorageClientSettings settings) {
-        this(StorageServerChannel.factory(settings.usePlaintext()));
+        this(StorageServerChannel.factory(settings));
     }
 
     @VisibleForTesting
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
index d219b6a..3ae3b6b 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
@@ -65,7 +65,7 @@ public class StorageServerClientManagerImpl
         this(
             settings,
             schedulerResource,
-            StorageServerChannel.factory(settings.usePlaintext()));
+            StorageServerChannel.factory(settings));
     }
 
     public StorageServerClientManagerImpl(StorageClientSettings settings,
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java
new file mode 100644
index 0000000..e5002b1
--- /dev/null
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.resolver;
+
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+
+/**
+ * Resolve an endpoint to another endpoint.
+ *
+ * <p>The resolver can be used for resolving the right ip address for an advertised endpoint. It is typically useful
+ * in dockerized integration tests, where the test clients are typically outside of the docker network.
+ */
+public interface EndpointResolver {
+
+    /**
+     * Returns a resolver that always returns its input endpoint.
+     *
+     * @return a function that always returns its input endpoint
+     */
+    static EndpointResolver identity() {
+        return endpoint -> endpoint;
+    }
+
+    /**
+     * Resolve <tt>endpoint</tt> to another endpoint.
+     *
+     * @param endpoint endpoint to resolve
+     * @return the resolved endpoint.
+     */
+    Endpoint resolve(Endpoint endpoint);
+
+}
diff --git a/stream/pom.xml b/stream/pom.xml
index 09afddd..f2580ec 100644
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -37,7 +37,6 @@
     <module>storage</module>
     <module>server</module>
     <module>cli</module>
-    <module>tests</module>
   </modules>
 
   <build>
diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
index 8c886ea..340a731 100644
--- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
+++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
@@ -31,8 +31,16 @@ package org.apache.bookkeeper.stream.protocol.util;
 /**
  * Placement policy to place ranges to group.
  */
+@FunctionalInterface
 public interface StorageContainerPlacementPolicy {
 
+    @FunctionalInterface
+    interface Factory {
+
+        StorageContainerPlacementPolicy newPlacementPolicy();
+
+    }
+
     long placeStreamRange(long streamId, long rangeId);
 
 }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 11bc8af..ff0cda2 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -187,8 +187,7 @@ public class StreamCluster
                     bookiePort, grpcPort, bkDir, rangesStoreDir, spec.serveReadOnlyTable);
                 server = StorageServer.buildStorageServer(
                     serverConf,
-                    grpcPort,
-                    spec.numServers() * 2);
+                    grpcPort);
                 server.start();
                 log.info("Started storage server at (bookie port = {}, grpc port = {})",
                     bookiePort, grpcPort);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index e0b87ef..94c8587 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -55,6 +55,7 @@ import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
 import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
 import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
 import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactoryImpl;
 import org.apache.commons.configuration.CompositeConfiguration;
@@ -143,8 +144,7 @@ public class StorageServer {
         try {
             storageServer = buildStorageServer(
                 conf,
-                grpcPort,
-                1024);
+                grpcPort);
         } catch (ConfigurationException e) {
             log.error("Invalid storage configuration", e);
             return ExitCode.INVALID_CONF.code();
@@ -168,15 +168,13 @@ public class StorageServer {
     }
 
     public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
-                                                        int grpcPort,
-                                                        int numStorageContainers)
+                                                        int grpcPort)
             throws UnknownHostException, ConfigurationException {
-        return buildStorageServer(conf, grpcPort, numStorageContainers, true, NullStatsLogger.INSTANCE);
+        return buildStorageServer(conf, grpcPort, true, NullStatsLogger.INSTANCE);
     }
 
     public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
                                                         int grpcPort,
-                                                        int numStorageContainers,
                                                         boolean startBookieAndStartProvider,
                                                         StatsLogger externalStatsLogger)
         throws ConfigurationException, UnknownHostException {
@@ -250,12 +248,21 @@ public class StorageServer {
             .withStorageConfiguration(storageConf)
             // the storage resources shared across multiple components
             .withStorageResources(storageResources)
-            // the number of storage containers
-            .withNumStorageContainers(numStorageContainers)
+            // the placement policy
+            .withStorageContainerPlacementPolicyFactory(() -> {
+                long numStorageContainers;
+                try (ZkClusterMetadataStore store = new ZkClusterMetadataStore(
+                    curatorProviderService.get(),
+                    ZKMetadataDriverBase.resolveZkServers(bkServerConf),
+                    ZK_METADATA_ROOT_PATH)) {
+                    numStorageContainers = store.getClusterMetadata().getNumStorageContainers();
+                }
+                return StorageContainerPlacementPolicyImpl.of((int) numStorageContainers);
+            })
             // the default log backend uri
             .withDefaultBackendUri(dlNamespaceProvider.getDlogUri())
             // with zk-based storage container manager
-            .withStorageContainerManagerFactory((ignored, storeConf, registry) ->
+            .withStorageContainerManagerFactory((storeConf, registry) ->
                 new ZkStorageContainerManager(
                     myEndpoint,
                     storageConf,
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
index 5e6b0b5..6cf73b0 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
@@ -42,7 +42,6 @@ public class StreamStorageLifecycleComponent extends ServerLifecycleComponent {
         this.streamStorage = StorageServer.buildStorageServer(
             conf.getUnderlyingConf(),
             ssConf.getGrpcPort(),
-            1024, /* indicator */
             false,
             statsLogger.scope("stream"));
     }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
index daa130b..b89a465 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
@@ -24,13 +24,11 @@ public interface StorageContainerManagerFactory {
     /**
      * Create a storage container manager to manage lifecycles of {@link StorageContainer}.
      *
-     * @param numStorageContainers num of storage containers.
      * @param conf                 storage configuration
      * @param registry             storage container registry
      * @return storage container manager.
      */
-    StorageContainerManager create(int numStorageContainers,
-                                   StorageConfiguration conf,
+    StorageContainerManager create(StorageConfiguration conf,
                                    StorageContainerRegistry registry);
 
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
index 1bd5510..c03b374 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
@@ -19,10 +19,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import java.net.URI;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
 import org.apache.bookkeeper.stream.storage.api.RangeStore;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
 
 /**
@@ -38,8 +40,9 @@ public final class RangeStoreBuilder {
     private StorageConfiguration storeConf = null;
     private StorageResources storeResources = null;
     private StorageContainerManagerFactory scmFactory = null;
+    private StorageContainerPlacementPolicy.Factory placementPolicyFactory = () ->
+        StorageContainerPlacementPolicyImpl.of(1024);
     private MVCCStoreFactory mvccStoreFactory = null;
-    private int numStorageContainers = 1024;
     private URI defaultBackendUri = null;
 
     private RangeStoreBuilder() {
@@ -52,7 +55,19 @@ public final class RangeStoreBuilder {
      * @return range store builder
      */
     public RangeStoreBuilder withNumStorageContainers(int numStorageContainers) {
-        this.numStorageContainers = numStorageContainers;
+        this.placementPolicyFactory = () -> StorageContainerPlacementPolicyImpl.of(numStorageContainers);
+        return this;
+    }
+
+    /**
+     * Build the range store with the provided <tt>placementPolicyFactory</tt>.
+     *
+     * @param placementPolicyFactory placement policy factor to create placement policies.
+     * @return range store builder.
+     */
+    public RangeStoreBuilder withStorageContainerPlacementPolicyFactory(
+        StorageContainerPlacementPolicy.Factory placementPolicyFactory) {
+        this.placementPolicyFactory = placementPolicyFactory;
         return this;
     }
 
@@ -130,6 +145,7 @@ public final class RangeStoreBuilder {
         checkNotNull(storeConf, "StorageConfiguration is not provided");
         checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided");
         checkNotNull(defaultBackendUri, "Default backend uri is not provided");
+        checkNotNull(placementPolicyFactory, "Storage Container Placement Policy Factory is not provided");
 
         return new RangeStoreImpl(
             storeConf,
@@ -137,7 +153,7 @@ public final class RangeStoreBuilder {
             scmFactory,
             mvccStoreFactory,
             defaultBackendUri,
-            numStorageContainers,
+            placementPolicyFactory,
             statsLogger);
     }
 
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
index 42fe40b..f175a55 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
@@ -48,7 +48,6 @@ import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactor
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
-import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
 
@@ -71,15 +70,14 @@ public class RangeStoreImpl
                           StorageContainerManagerFactory factory,
                           MVCCStoreFactory mvccStoreFactory,
                           URI defaultBackendUri,
-                          int numStorageContainers,
+                          StorageContainerPlacementPolicy.Factory placementPolicyFactory,
                           StatsLogger statsLogger) {
         super("range-service", conf, statsLogger);
         this.schedulerResource = schedulerResource;
         this.scheduler = SharedResourceManager.shared().get(schedulerResource);
         this.scmFactory = factory;
-        StorageContainerPlacementPolicy placementPolicy =
-            StorageContainerPlacementPolicyImpl.of(numStorageContainers);
         this.storeFactory = mvccStoreFactory;
+        StorageContainerPlacementPolicy placementPolicy = placementPolicyFactory.newPlacementPolicy();
         this.scRegistry = new StorageContainerRegistryImpl(
             new DefaultStorageContainerFactory(
                 conf,
@@ -88,7 +86,7 @@ public class RangeStoreImpl
                 storeFactory,
                 defaultBackendUri),
             scheduler);
-        this.scManager = scmFactory.create(numStorageContainers, conf, scRegistry);
+        this.scManager = scmFactory.create(conf, scRegistry);
     }
 
     @Override
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
index 1befe82..ac4bb78 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
@@ -211,7 +211,7 @@ public class TestRangeStoreImpl {
         rangeStore = (RangeStoreImpl) RangeStoreBuilder.newBuilder()
             .withStorageConfiguration(storageConf)
             .withStorageResources(storageResources)
-            .withStorageContainerManagerFactory((numScs, storeConf, rgRegistry)
+            .withStorageContainerManagerFactory((storeConf, rgRegistry)
                 -> new LocalStorageContainerManager(endpoint, storeConf, rgRegistry, 2))
             .withRangeStoreFactory(storeFactory)
             .withDefaultBackendUri(URI.create("distributedlog://127.0.0.1/stream/storage"))
diff --git a/stream/tests/integration/pom.xml b/stream/tests/integration/pom.xml
deleted file mode 100644
index 2eca696..0000000
--- a/stream/tests/integration/pom.xml
+++ /dev/null
@@ -1,110 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.bookkeeper.tests</groupId>
-    <artifactId>stream-storage-tests-parent</artifactId>
-    <version>4.7.1-SNAPSHOT</version>
-  </parent>
-  <artifactId>stream-storage-integration-test</artifactId>
-  <name>Apache BookKeeper :: Stream Storage :: Tests :: Integration</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>stream-storage-java-client</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>stream-storage-server</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.distributedlog</groupId>
-      <artifactId>distributedlog-core</artifactId>
-      <version>${project.parent.version}</version>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-        <version>${maven-jar-plugin.version}</version>
-        <executions>
-          <execution>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <version>${maven-surefire-plugin.version}</version>
-        <configuration>
-          <!-- only run tests when -DstreamIntegrationTests is specified //-->
-          <skipTests>true</skipTests>
-          <redirectTestOutputToFile>true</redirectTestOutputToFile>
-          <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G</argLine>
-          <forkMode>always</forkMode>
-          <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
-  <profiles>
-    <profile>
-      <id>streamIntegrationTests</id>
-      <activation>
-        <property>
-          <name>streamIntegrationTests</name>
-        </property>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <skipTests>false</skipTests>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-deploy-plugin</artifactId>
-            <version>${maven-deploy-plugin.version}</version>
-            <configuration>
-              <skip>true</skip>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-</project>
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
deleted file mode 100644
index ebe99f0..0000000
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.tests.integration;
-
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_RETENTION_POLICY;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SEGMENT_ROLLING_POLICY;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SPLIT_POLICY;
-import static org.junit.Assert.assertEquals;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.api.StorageClient;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
-import org.apache.bookkeeper.clients.admin.StorageAdminClient;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
-import org.apache.bookkeeper.stream.proto.RangeKeyType;
-import org.apache.bookkeeper.stream.proto.StreamConfiguration;
-import org.apache.bookkeeper.stream.proto.StreamProperties;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Integration test for stream client test.
- */
-@Slf4j
-public class StorageClientTest extends StorageServerTestBase {
-
-    @Rule
-    public final TestName testName = new TestName();
-
-    private String nsName;
-    private String streamName;
-    private StorageAdminClient adminClient;
-    private StorageClient client;
-    private final StreamConfiguration streamConf = StreamConfiguration.newBuilder()
-        .setKeyType(RangeKeyType.HASH)
-        .setInitialNumRanges(4)
-        .setMinNumRanges(4)
-        .setRetentionPolicy(DEFAULT_RETENTION_POLICY)
-        .setRollingPolicy(DEFAULT_SEGMENT_ROLLING_POLICY)
-        .setSplitPolicy(DEFAULT_SPLIT_POLICY)
-        .build();
-    private final NamespaceConfiguration colConf = NamespaceConfiguration.newBuilder()
-        .setDefaultStreamConf(streamConf)
-        .build();
-
-    @Override
-    protected void doSetup() throws Exception {
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
-        nsName = "test_namespace";
-        FutureUtils.result(
-            adminClient.createNamespace(nsName, colConf));
-        client = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(nsName)
-            .build();
-        streamName = "test_stream";
-        createStream(streamName);
-    }
-
-    @Override
-    protected void doTeardown() throws Exception {
-        if (null != client) {
-            client.closeAsync();
-        }
-        if (null != adminClient) {
-            adminClient.closeAsync();
-        }
-    }
-
-    private void createStream(String streamName) throws Exception {
-        FutureUtils.result(
-            adminClient.createStream(
-                nsName,
-                streamName,
-                streamConf));
-    }
-
-    @Test
-    public void testAdmin() throws Exception {
-        StreamProperties properties =
-            FutureUtils.result(adminClient.getStream(nsName, streamName));
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build()
-            , properties.getStreamConf());
-    }
-
-}
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
deleted file mode 100644
index 21dc004..0000000
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.tests.integration;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.stream.cluster.StreamCluster;
-import org.apache.bookkeeper.stream.cluster.StreamClusterSpec;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-
-/**
- * Test Base for Range Server related tests.
- */
-@Slf4j
-public abstract class StorageServerTestBase {
-
-    static {
-        // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words
-        // are disabled by default due to security reasons
-        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
-    }
-
-    @Rule
-    public final TemporaryFolder testDir = new TemporaryFolder();
-
-    protected StreamClusterSpec spec;
-    protected StreamCluster cluster;
-
-    protected StorageServerTestBase() {
-        this(StreamClusterSpec.builder()
-            .baseConf(new CompositeConfiguration())
-            .numServers(3)
-            .build());
-    }
-
-    protected StorageServerTestBase(StreamClusterSpec spec) {
-        this.spec = spec;
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        spec = spec.storageRootDir(testDir.newFolder("tests"));
-        this.cluster = StreamCluster.build(spec);
-        this.cluster.start();
-        doSetup();
-    }
-
-    protected abstract void doSetup() throws Exception;
-
-    @After
-    public void tearDown() throws Exception {
-        doTeardown();
-        if (null != this.cluster) {
-            this.cluster.stop();
-        }
-    }
-
-    protected abstract void doTeardown() throws Exception;
-
-}
diff --git a/stream/tests/integration/src/test/resources/log4j.properties b/stream/tests/integration/src/test/resources/log4j.properties
deleted file mode 100644
index 614fe75..0000000
--- a/stream/tests/integration/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,55 +0,0 @@
-#/**
-# * Copyright 2007 The Apache Software Foundation
-# *
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-#
-# DisributedLog Logging Configuration
-#
-
-# Example with rolling log file
-log4j.rootLogger=INFO, CONSOLE
-
-#disable zookeeper logging
-log4j.logger.org.apache.zookeeper=OFF
-#Set the bookkeeper level to warning
-log4j.logger.org.apache.bookkeeper=INFO
-#disable helix logging
-log4j.logger.org.apache.helix=OFF
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.Threshold=DEBUG
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
-
-# Add ROLLINGFILE to rootLogger to get log file output
-#    Log DEBUG level and above messages to a log file
-#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
-#log4j.appender.ROLLINGFILE.Threshold=INFO
-#log4j.appender.ROLLINGFILE.File=stream.log
-#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
-#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm
-#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
-
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.Threshold=TRACE
-log4j.appender.R.File=target/error.log
-log4j.appender.R.MaxFileSize=200MB
-log4j.appender.R.MaxBackupIndex=7
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
diff --git a/stream/tests/pom.xml b/stream/tests/pom.xml
deleted file mode 100644
index 08c5332..0000000
--- a/stream/tests/pom.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <packaging>pom</packaging>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.bookkeeper</groupId>
-    <artifactId>stream-storage-parent</artifactId>
-    <version>4.7.1-SNAPSHOT</version>
-  </parent>
-  <groupId>org.apache.bookkeeper.tests</groupId>
-  <artifactId>stream-storage-tests-parent</artifactId>
-  <name>Apache BookKeeper :: Stream Storage :: Tests</name>
-  <modules>
-    <module>integration</module>
-  </modules>
-</project>
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
index 42b428e..92e39c3 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
@@ -52,12 +52,8 @@ public class LocationClientTest extends StreamClusterTestBase {
             .name("location-client-test")
             .numThreads(1)
             .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(getExsternalStreamEndpoints().toArray(new Endpoint[getNumBookies()]))
-            .usePlaintext(true)
-            .build();
         client = new LocationClientImpl(
-            settings,
+            newStorageClientSettings(),
             scheduler);
     }
 
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java
similarity index 86%
rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java
index 03edfb8..569cf7c 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 import static org.junit.Assert.assertEquals;
@@ -39,6 +39,8 @@ import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -46,37 +48,23 @@ import org.junit.rules.TestName;
 /**
  * Integration test for stream admin client test.
  */
-public class StorageAdminClientTest extends StorageServerTestBase {
+public class StorageAdminClientTest extends StreamClusterTestBase {
 
     @Rule
     public final TestName testName = new TestName();
 
-    private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
 
-    @Override
-    protected void doSetup() throws Exception {
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("admin-client-test")
-            .numThreads(1)
-            .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
+    @Before
+    public void setup() {
+        adminClient = createStorageAdminClient(newStorageClientSettings());
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != adminClient) {
             adminClient.close();
         }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
     }
 
     @Test
@@ -152,11 +140,6 @@ public class StorageAdminClientTest extends StorageServerTestBase {
             .build();
         StreamProperties streamProps = FutureUtils.result(adminClient.createStream(nsName, streamName, streamConf));
         assertEquals(streamName, streamProps.getStreamName());
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build(),
-            streamProps.getStreamConf());
 
         // create a duplicated stream
         try {
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
index b86cc72..fe7c914 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -21,6 +21,10 @@ package org.apache.bookkeeper.tests.integration.stream;
 import java.util.List;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.admin.StorageAdminClient;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.utils.NetUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase;
@@ -69,5 +73,35 @@ public abstract class StreamClusterTestBase extends BookKeeperClusterTestBase {
             .collect(Collectors.toList());
     }
 
+    //
+    // Test Util Methods
+    //
+
+    protected static StorageClientSettings newStorageClientSettings() {
+        return StorageClientSettings.newBuilder()
+            .addEndpoints(getExsternalStreamEndpoints().toArray(new Endpoint[getNumBookies()]))
+            .endpointResolver(endpoint -> {
+                String internalEndpointStr = NetUtils.endpointToString(endpoint);
+                String externalEndpointStr =
+                    bkCluster.resolveExternalGrpcEndpointStr(internalEndpointStr);
+                log.info("Resolve endpoint {} to {}", internalEndpointStr, externalEndpointStr);
+                return NetUtils.parseEndpoint(externalEndpointStr);
+            })
+            .usePlaintext(true)
+            .build();
+    }
+
+    protected static StorageAdminClient createStorageAdminClient(StorageClientSettings settings) {
+        return StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .buildAdmin();
+    }
+
+    protected static StorageClient createStorageClient(StorageClientSettings settings, String namespace) {
+        return StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .withNamespace(namespace)
+            .build();
+    }
 
 }
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
similarity index 85%
rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
index 7add6d7..d1ff091 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
@@ -38,15 +38,14 @@ import org.apache.bookkeeper.api.kv.PTable;
 import org.apache.bookkeeper.api.kv.exceptions.KvApiException;
 import org.apache.bookkeeper.api.kv.result.Code;
 import org.apache.bookkeeper.api.kv.result.KeyValue;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -55,47 +54,31 @@ import org.junit.rules.TestName;
  * Integration test for table service.
  */
 @Slf4j
-public class TableClientSimpleTest extends StorageServerTestBase {
+public class TableClientSimpleTest extends StreamClusterTestBase {
 
     @Rule
     public final TestName testName = new TestName();
 
     private final String namespace = "test_namespace";
-    private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
     private StorageClient storageClient;
 
-    @Override
-    protected void doSetup() throws Exception {
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("table-client-test")
-            .numThreads(1)
-            .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
+    @Before
+    public void setup() {
+        StorageClientSettings settings = newStorageClientSettings();
         String namespace = "test_namespace";
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
-        storageClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(namespace)
-            .build();
+        adminClient = createStorageAdminClient(settings);
+        storageClient = createStorageClient(settings, namespace);
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != adminClient) {
             adminClient.close();
         }
         if (null != storageClient) {
             storageClient.close();
         }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
     }
 
     private static ByteBuf getLKey(int i) {
@@ -123,11 +106,6 @@ public class TableClientSimpleTest extends StorageServerTestBase {
         StreamProperties streamProps = result(
             adminClient.createStream(namespace, streamName, streamConf));
         assertEquals(streamName, streamProps.getStreamName());
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build(),
-            streamProps.getStreamConf());
 
         // Open the table
         PTable<ByteBuf, ByteBuf> table = result(storageClient.openPTable(streamName));
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
similarity index 89%
rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
index 7db265b..0dff7fa 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
@@ -47,16 +47,15 @@ import org.apache.bookkeeper.api.kv.result.PutResult;
 import org.apache.bookkeeper.api.kv.result.RangeResult;
 import org.apache.bookkeeper.api.kv.result.Result;
 import org.apache.bookkeeper.api.kv.result.TxnResult;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -65,48 +64,32 @@ import org.junit.rules.TestName;
  * Integration test for table service.
  */
 @Slf4j
-public class TableClientTest extends StorageServerTestBase {
+public class TableClientTest extends StreamClusterTestBase {
 
     @Rule
     public final TestName testName = new TestName();
 
     private final String namespace = "test_namespace";
-    private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
     private StorageClient storageClient;
     private final OptionFactory<ByteBuf> optionFactory = new OptionFactoryImpl<>();
 
-    @Override
-    protected void doSetup() throws Exception {
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("table-client-test")
-            .numThreads(1)
-            .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
+    @Before
+    public void setup() {
+        StorageClientSettings settings = newStorageClientSettings();
         String namespace = "test_namespace";
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
-        storageClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(namespace)
-            .build();
+        adminClient = createStorageAdminClient(settings);
+        storageClient = createStorageClient(settings, namespace);
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != adminClient) {
             adminClient.close();
         }
         if (null != storageClient) {
             storageClient.close();
         }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
     }
 
     private static ByteBuf getLKey(int i) {
@@ -134,11 +117,6 @@ public class TableClientTest extends StorageServerTestBase {
         StreamProperties streamProps = FutureUtils.result(
             adminClient.createStream(namespace, streamName, streamConf));
         assertEquals(streamName, streamProps.getStreamName());
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build(),
-            streamProps.getStreamConf());
 
         // Open the table
         PTable<ByteBuf, ByteBuf> table = FutureUtils.result(storageClient.openPTable(streamName));
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
index 80ae906..9a7cd86 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
@@ -18,6 +18,8 @@
 
 package org.apache.bookkeeper.tests.integration.topologies;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -60,6 +62,7 @@ public class BKCluster {
     private final Network network;
     private final MetadataStoreContainer metadataContainer;
     private final Map<String, BookieContainer> bookieContainers;
+    private final Map<String, String> internalEndpointsToExternalEndpoints;
     private final int numBookies;
     private final String extraServerComponents;
     private volatile boolean enableContainerLog;
@@ -72,6 +75,7 @@ public class BKCluster {
             .withNetwork(network)
             .withNetworkAliases(ZKContainer.HOST_NAME);
         this.bookieContainers = Maps.newTreeMap();
+        this.internalEndpointsToExternalEndpoints = Maps.newConcurrentMap();
         this.numBookies = spec.numBookies();
         this.extraServerComponents = spec.extraServerComponents();
         this.enableContainerLog = spec.enableContainerLog();
@@ -114,6 +118,13 @@ public class BKCluster {
         createBookies("bookie", numBookies);
     }
 
+    public String resolveExternalGrpcEndpointStr(String internalGrpcEndpointStr) {
+        String externalGrpcEndpointStr = internalEndpointsToExternalEndpoints.get(internalGrpcEndpointStr);
+        checkNotNull(externalGrpcEndpointStr,
+            "No internal grpc endpoint is found : " + internalGrpcEndpointStr);
+        return externalGrpcEndpointStr;
+    }
+
     public void stop() {
         synchronized (this) {
             bookieContainers.values().forEach(BookieContainer::stop);
@@ -155,6 +166,7 @@ public class BKCluster {
         synchronized (this) {
             container = bookieContainers.remove(bookieName);
             if (null != container) {
+                internalEndpointsToExternalEndpoints.remove(container.getInternalGrpcEndpointStr());
                 container.stop();
             }
         }
@@ -175,8 +187,6 @@ public class BKCluster {
                 if (enableContainerLog) {
                     container.tailContainerLog();
                 }
-
-                log.info("Created bookie {}", bookieName);
                 bookieContainers.put(bookieName, container);
             }
         }
@@ -184,6 +194,11 @@ public class BKCluster {
         if (shouldStart) {
             log.info("Starting bookie {}", bookieName);
             container.start();
+            log.info("Started bookie {} : internal endpoint = {}, external endpoint = {}",
+                bookieName, container.getInternalGrpcEndpointStr(), container.getExternalGrpcEndpointStr());
+            internalEndpointsToExternalEndpoints.put(
+                container.getInternalGrpcEndpointStr(),
+                container.getExternalGrpcEndpointStr());
         }
         return container;
     }

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 06/10: [TABLE SERVICE] Move grpc services from server module to storage module

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 8e8e22ff6922a012d4959b25638f045406701506
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed May 23 10:56:58 2018 -0700

    [TABLE SERVICE] Move grpc services from server module to storage module
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    Current almost every grpc requests are wrapped into `StorageContainerRequest` and their responses
    are wrapped into `StorageContainerResponse`. It makes things a bit complicated on adding new grpc
    services.
    
    To simplify things, we can use grpc ClientInterceptor to stamp container information (e.g. scId)
    into the request metadata and write a grpc service registry to take the container information from
    request metadata and dispatch requests to containers.
    
    In order to achieve it, we need to move the grpc services to storage container.
    
    *Solution*
    
    This PR moves grpc services from server module to storage module, so we can simplify the wire protocols.
    
    Master Issue: #1205
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1428 from sijie/move_grpc_service_to_storage
---
 .../bookkeeper/stream/server/grpc/GrpcServer.java  |  3 +++
 .../storage/impl}/grpc/GrpcMetaRangeService.java   | 22 ++++++++++++---------
 .../storage/impl}/grpc/GrpcRootRangeService.java   | 23 ++++++++++++----------
 .../storage/impl}/grpc/GrpcTableService.java       | 10 +++++-----
 .../impl/grpc}/handler/ResponseHandler.java        |  2 +-
 .../handler/StorageContainerResponseHandler.java   |  2 +-
 .../storage/impl/grpc}/handler/package-info.java   |  2 +-
 .../stream/storage/impl/grpc}/package-info.java    |  6 +++---
 .../impl}/grpc/TestGrpcMetaRangeService.java       |  5 ++---
 .../impl}/grpc/TestGrpcRootRangeService.java       | 20 ++++++++++++++++++-
 .../storage/impl}/grpc/TestGrpcTableService.java   |  3 +--
 .../storage/impl/grpc}/TestResponseObserver.java   |  4 ++--
 .../TestStorageContainerResponseHandler.java       |  2 +-
 13 files changed, 65 insertions(+), 39 deletions(-)

diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
index 8ecb58a..ce95ccf 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcServer.java
@@ -27,6 +27,9 @@ import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
 import org.apache.bookkeeper.stream.server.exceptions.StorageServerRuntimeException;
 import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcMetaRangeService;
+import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcRootRangeService;
+import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcTableService;
 
 /**
  * KeyRange Server.
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcMetaRangeService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
similarity index 60%
rename from stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcMetaRangeService.java
rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
index bc42f00..1e4f681 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcMetaRangeService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
@@ -1,7 +1,11 @@
 /*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -11,26 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.bookkeeper.stream.server.grpc;
+package org.apache.bookkeeper.stream.storage.impl.grpc;
 
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.server.handler.StorageContainerResponseHandler;
+import org.apache.bookkeeper.stream.storage.impl.grpc.handler.StorageContainerResponseHandler;
 import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 
 /**
  * The gRPC protocol based range service.
  */
 @Slf4j
-class GrpcMetaRangeService extends MetaRangeServiceImplBase {
+public class GrpcMetaRangeService extends MetaRangeServiceImplBase {
 
-    private final RangeStore rangeStore;
+    private final RangeStoreService rangeStore;
 
-    GrpcMetaRangeService(RangeStore service) {
+    public GrpcMetaRangeService(RangeStore service) {
         this.rangeStore = service;
         log.info("Created MetaRange service");
     }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcRootRangeService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcRootRangeService.java
similarity index 86%
rename from stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcRootRangeService.java
rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcRootRangeService.java
index 15b9910..763a221 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcRootRangeService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcRootRangeService.java
@@ -1,7 +1,11 @@
 /*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -11,8 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.bookkeeper.stream.server.grpc;
+package org.apache.bookkeeper.stream.storage.impl.grpc;
 
 import io.grpc.stub.StreamObserver;
 import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
@@ -29,17 +32,17 @@ import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceImplBase;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.server.handler.ResponseHandler;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.impl.grpc.handler.ResponseHandler;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 
 /**
  * Grpc based root range service.
  */
-class GrpcRootRangeService extends RootRangeServiceImplBase {
+public class GrpcRootRangeService extends RootRangeServiceImplBase {
 
-    private final RangeStore rs;
+    private final RangeStoreService rs;
 
-    GrpcRootRangeService(RangeStore rs) {
+    public GrpcRootRangeService(RangeStoreService rs) {
         this.rs = rs;
     }
 
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcTableService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java
similarity index 88%
rename from stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcTableService.java
rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java
index eac6daf..e86759d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/grpc/GrpcTableService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java
@@ -15,15 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.server.grpc;
+package org.apache.bookkeeper.stream.storage.impl.grpc;
 
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
 import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceImplBase;
-import org.apache.bookkeeper.stream.server.handler.StorageContainerResponseHandler;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.storage.impl.grpc.handler.StorageContainerResponseHandler;
+import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 
 /**
  * The gRPC protocol based k/v service.
@@ -31,9 +31,9 @@ import org.apache.bookkeeper.stream.storage.api.RangeStore;
 @Slf4j
 public class GrpcTableService extends TableServiceImplBase {
 
-    private final RangeStore rangeStore;
+    private final RangeStoreService rangeStore;
 
-    GrpcTableService(RangeStore store) {
+    public GrpcTableService(RangeStoreService store) {
         this.rangeStore = store;
         log.info("Created Table service");
     }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/handler/ResponseHandler.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/ResponseHandler.java
similarity index 96%
rename from stream/server/src/main/java/org/apache/bookkeeper/stream/server/handler/ResponseHandler.java
rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/ResponseHandler.java
index 526076d..4707b98 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/handler/ResponseHandler.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/ResponseHandler.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.server.handler;
+package org.apache.bookkeeper.stream.storage.impl.grpc.handler;
 
 import io.grpc.StatusException;
 import io.grpc.StatusRuntimeException;
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/handler/StorageContainerResponseHandler.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/StorageContainerResponseHandler.java
similarity index 96%
rename from stream/server/src/main/java/org/apache/bookkeeper/stream/server/handler/StorageContainerResponseHandler.java
rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/StorageContainerResponseHandler.java
index 5c45715..42c21a4 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/handler/StorageContainerResponseHandler.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/StorageContainerResponseHandler.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.server.handler;
+package org.apache.bookkeeper.stream.storage.impl.grpc.handler;
 
 import io.grpc.stub.StreamObserver;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/handler/package-info.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/package-info.java
similarity index 93%
copy from stream/server/src/main/java/org/apache/bookkeeper/stream/server/handler/package-info.java
copy to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/package-info.java
index a9e9a9a..bb30bd1 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/handler/package-info.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/package-info.java
@@ -19,4 +19,4 @@
 /**
  * Handler for processing requests and responses.
  */
-package org.apache.bookkeeper.stream.server.handler;
+package org.apache.bookkeeper.stream.storage.impl.grpc.handler;
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/handler/package-info.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/package-info.java
similarity index 82%
rename from stream/server/src/main/java/org/apache/bookkeeper/stream/server/handler/package-info.java
rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/package-info.java
index a9e9a9a..2a64f22 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/handler/package-info.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/package-info.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,6 +17,6 @@
  */
 
 /**
- * Handler for processing requests and responses.
+ * Grpc services for serving requests to storage containers.
  */
-package org.apache.bookkeeper.stream.server.handler;
+package org.apache.bookkeeper.stream.storage.impl.grpc;
\ No newline at end of file
diff --git a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcMetaRangeService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
similarity index 96%
rename from stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcMetaRangeService.java
rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
index 12e0a70..787453f 100644
--- a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcMetaRangeService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.server.grpc;
+package org.apache.bookkeeper.stream.storage.impl.grpc;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -32,7 +32,6 @@ import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.server.TestResponseObserver;
 import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
 import org.junit.Test;
 
diff --git a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcRootRangeService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcRootRangeService.java
similarity index 96%
rename from stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcRootRangeService.java
rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcRootRangeService.java
index 0d51786..7970793 100644
--- a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcRootRangeService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcRootRangeService.java
@@ -1,4 +1,22 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -12,7 +30,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.server.grpc;
+package org.apache.bookkeeper.stream.storage.impl.grpc;
 
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateNamespaceRequest;
diff --git a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcTableService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
similarity index 99%
rename from stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcTableService.java
rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
index 56833eb..ed97dc1 100644
--- a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/grpc/TestGrpcTableService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.server.grpc;
+package org.apache.bookkeeper.stream.storage.impl.grpc;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -38,7 +38,6 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
 import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.server.TestResponseObserver;
 import org.apache.bookkeeper.stream.storage.api.RangeStore;
 import org.junit.Test;
 
diff --git a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/TestResponseObserver.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestResponseObserver.java
similarity index 96%
rename from stream/server/src/test/java/org/apache/bookkeeper/stream/server/TestResponseObserver.java
rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestResponseObserver.java
index d1f65a3..7fae694 100644
--- a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/TestResponseObserver.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestResponseObserver.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.server;
+package org.apache.bookkeeper.stream.storage.impl.grpc;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
diff --git a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/handler/TestStorageContainerResponseHandler.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
similarity index 98%
rename from stream/server/src/test/java/org/apache/bookkeeper/stream/server/handler/TestStorageContainerResponseHandler.java
rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
index 696da07..cf9b917 100644
--- a/stream/server/src/test/java/org/apache/bookkeeper/stream/server/handler/TestStorageContainerResponseHandler.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.server.handler;
+package org.apache.bookkeeper.stream.storage.impl.grpc.handler;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 03/10: [TABLE SERVICE] start table service as an extra component of bookie

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit b25fc5f3414622b4179e8ea24013b8146d7cf9f4
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue May 22 11:43:31 2018 -0700

    [TABLE SERVICE] start table service as an extra component of bookie
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    table service was built over bookkeeper ledgers. it is an extension to bookies' functionalities, so it is much convenient to start the service as one additional component with bookie, just like how we start `AutoRecovery` along with bookie.
    
    *Solution*
    
    - include `stream/server` module as part of bookkeeper server/all binary distributions
    - abstract `StorageServer` to allow controlling whether to start bookie or not
    - provide a ServerLifecycleComponent of `StorageServer`, so it can be used as an extra component of bookie
    
    *Tests*
    
    - improve the integration tests to start table service as part of bookie containers
    - move `LocationClientTest` to container based integration tests
    
    Master Issue: #1205
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1422 from sijie/start_table_service
---
 bookkeeper-dist/all/pom.xml                        |   7 ++
 bookkeeper-dist/server/pom.xml                     |   7 ++
 bookkeeper-dist/src/assemble/bin-all.xml           |   3 +
 bookkeeper-dist/src/assemble/bin-server.xml        |   4 +
 .../src/main/resources/LICENSE-all.bin.txt         |  47 +++++++-
 .../src/main/resources/LICENSE-server.bin.txt      |  44 +++++++-
 .../google-auth-library-credentials-0.4.0/LICENSE  |  28 +++++
 .../src/main/resources/deps/protobuf-3.0.0/LICENSE |  32 ++++++
 .../src/main/resources/deps/protobuf-3.3.1/LICENSE |  32 ++++++
 conf/bk_server.conf                                |  28 ++++-
 pom.xml                                            |   8 +-
 .../bookkeeper/stream/cluster/StreamCluster.java   |  38 ++-----
 .../bookkeeper/stream/server/StorageServer.java    | 118 ++++++++++++++-------
 .../server/StreamStorageLifecycleComponent.java    |  64 +++++++++++
 .../stream/server/conf/BookieConfiguration.java    |   4 +-
 .../server/conf/StorageServerConfiguration.java    |  11 ++
 .../stream/server/service/BookieWatchService.java  |  95 +++++++++++++++++
 .../server/service/CuratorProviderService.java     |   3 +
 .../server/service/DLNamespaceProviderService.java |   1 +
 .../service/RegistrationServiceProvider.java       |   2 +
 .../storage/api/cluster/ClusterInitializer.java    |  47 ++++++++
 .../storage/api/cluster/ClusterMetadataStore.java  |   9 +-
 stream/storage/impl/pom.xml                        |  10 --
 .../impl/cluster/InMemClusterMetadataStore.java    |   6 +-
 .../storage/impl/cluster/ZkClusterInitializer.java |  95 +++++++++++++++++
 .../impl/cluster/ZkClusterMetadataStore.java       |   5 +-
 .../cluster/ClusterControllerLeaderImplTest.java   |   3 +-
 .../tests/containers/BookieContainer.java          |  34 ++++--
 .../cluster/BookKeeperClusterTestBase.java         |  23 +++-
 .../integration/stream}/LocationClientTest.java    |  26 +++--
 .../integration/stream/StreamClusterTestBase.java  |  73 +++++++++++++
 .../tests/integration/topologies/BKCluster.java    |  77 ++++++++++++--
 .../integration/topologies/BKClusterSpec.java      |  67 ++++++++++++
 33 files changed, 924 insertions(+), 127 deletions(-)

diff --git a/bookkeeper-dist/all/pom.xml b/bookkeeper-dist/all/pom.xml
index 253b356..d1f5b25 100644
--- a/bookkeeper-dist/all/pom.xml
+++ b/bookkeeper-dist/all/pom.xml
@@ -92,6 +92,13 @@
       <version>${project.version}</version>
     </dependency>
 
+    <!-- stream.storage -->
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-server</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <!-- bookkeeper benchmark -->
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
diff --git a/bookkeeper-dist/server/pom.xml b/bookkeeper-dist/server/pom.xml
index 75e70e9..abcd0ef 100644
--- a/bookkeeper-dist/server/pom.xml
+++ b/bookkeeper-dist/server/pom.xml
@@ -76,6 +76,13 @@
       <version>${project.version}</version>
     </dependency>
 
+    <!-- stream.storage -->
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-server</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <!-- slf4j binding -->
     <dependency>
       <groupId>org.slf4j</groupId>
diff --git a/bookkeeper-dist/src/assemble/bin-all.xml b/bookkeeper-dist/src/assemble/bin-all.xml
index fd45d67..4a7c4dc 100644
--- a/bookkeeper-dist/src/assemble/bin-all.xml
+++ b/bookkeeper-dist/src/assemble/bin-all.xml
@@ -54,11 +54,14 @@
       <directory>../src/main/resources/deps</directory>
       <outputDirectory>/deps</outputDirectory>
       <includes>
+        <include>google-auth-library-credentials-0.4.0/LICENSE</include>
         <include>javax.servlet-api-3.1.0/CDDL+GPL-1.1</include>
         <include>jsr-305/LICENSE</include>
         <include>netty-3.10.1.Final/*</include>
         <include>netty-4.1.12.Final/*</include>
         <include>paranamer-2.8/LICENSE.txt</include>
+        <include>protobuf-3.0.0/LICENSE</include>
+        <include>protobuf-3.3.1/LICENSE</include>
         <include>protobuf-3.4.0/LICENSE</include>
         <include>scala-library-2.11.7/LICENSE.md</include>
         <include>scala-parser-combinators_2.11-1.0.4/LICENSE.md</include>
diff --git a/bookkeeper-dist/src/assemble/bin-server.xml b/bookkeeper-dist/src/assemble/bin-server.xml
index 7bbd3b2..ce5a646 100644
--- a/bookkeeper-dist/src/assemble/bin-server.xml
+++ b/bookkeeper-dist/src/assemble/bin-server.xml
@@ -49,8 +49,11 @@
       <directory>../src/main/resources/deps</directory>
       <outputDirectory>/deps</outputDirectory>
       <includes>
+        <include>google-auth-library-credentials-0.4.0/LICENSE</include>
         <include>javax.servlet-api-3.1.0/CDDL+GPL-1.1</include>
         <include>netty-4.1.12.Final/*</include>
+        <include>protobuf-3.0.0/LICENSE</include>
+        <include>protobuf-3.3.1/LICENSE</include>
         <include>protobuf-3.4.0/LICENSE</include>
         <include>slf4j-1.7.25/LICENSE.txt</include>
       </includes>
@@ -85,6 +88,7 @@
       <!-- Include 'groupId' in the dependencies Jar names to better identify the provenance of the jar -->
       <outputFileNameMapping>${artifact.groupId}-${artifact.artifactId}-${artifact.version}${dashClassifier?}.${artifact.extension}</outputFileNameMapping>
       <excludes>
+        <exclude>com.google.code.findbugs:jsr305</exclude>
         <!-- All these dependencies are already included in netty-all -->
         <exclude>io.netty:netty-buffer</exclude>
         <exclude>io.netty:netty-codec</exclude>
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
index 7f523db..289916d 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
@@ -276,6 +276,25 @@ Apache Software License, Version 2.
 - lib/net.jpountz.lz4-lz4-1.3.0.jar [38]
 - lib/org.codehaus.jackson-jackson-core-asl-1.9.11.jar [39]
 - lib/org.codehaus.jackson-jackson-mapper-asl-1.9.11.jar [40]
+- lib/com.google.api.grpc-proto-google-common-protos-0.1.9.jar [41]
+- lib/com.google.code.gson-gson-2.7.jar [42]
+- lib/com.google.instrumentation-instrumentation-api-0.4.3.jar [43]
+- lib/com.squareup.okhttp-okhttp-2.5.0.jar [44]
+- lib/com.squareup.okio-okio-1.6.0.jar [45]
+- lib/io.grpc-grpc-all-1.5.0.jar [46]
+- lib/io.grpc-grpc-auth-1.5.0.jar [46]
+- lib/io.grpc-grpc-context-1.5.0.jar [46]
+- lib/io.grpc-grpc-core-1.5.0.jar [46]
+- lib/io.grpc-grpc-netty-1.5.0.jar [46]
+- lib/io.grpc-grpc-okhttp-1.5.0.jar [46]
+- lib/io.grpc-grpc-protobuf-1.5.0.jar [46]
+- lib/io.grpc-grpc-protobuf-lite-1.5.0.jar [46]
+- lib/io.grpc-grpc-protobuf-nano-1.5.0.jar [46]
+- lib/io.grpc-grpc-stub-1.5.0.jar [46]
+- lib/org.apache.curator-curator-client-4.0.1.jar [47]
+- lib/org.apache.curator-curator-framework-4.0.1.jar [47]
+- lib/org.apache.curator-curator-recipes-4.0.1.jar [47]
+- lib/org.inferred-freebuilder-1.14.9.jar [48]
 
 [1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -316,6 +335,15 @@ Apache Software License, Version 2.
 [38] Source available at https://github.com/lz4/lz4-java/tree/1.3.0
 [39] Source available at https://github.com/codehaus/jackson/tree/1.9
 [40] Source available at https://github.com/codehaus/jackson/tree/1.9
+[41] Source available at https://github.com/googleapis/googleapis
+[42] Source available at https://github.com/google/gson/tree/gson-parent-2.7
+[43] Source available at https://github.com/census-instrumentation/opencensus-java/tree/v0.4.3
+[44] Source available at https://github.com/square/okhttp/tree/parent-2.5.0
+[45] Source available at https://github.com/square/okio/tree/okio-parent-1.6.0
+[46] Source available at https://github.com/grpc/grpc-java/tree/v1.5.0
+[47] Source available at https://github.com/apache/curator/tree/apache-curator-4.0.1
+[48] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
+
 
 ------------------------------------------------------------------------------------
 lib/io.netty-netty-3.10.1.Final.jar contains the extensions to Java Collections Framework which has
@@ -459,10 +487,20 @@ Bundled as lib/com.google.code.findbugs-jsr305-3.0.2.jar
 Source available at https://storage.googleapis.com/google-code-archive-source/v2/code.google.com/jsr-305/source-archive.zip
 ------------------------------------------------------------------------------------
 This product bundles Google Protocal Buffers, which is available under a "3-clause BSD"
-license. For details, see deps/protobuf-3.4.0/LICENSE.
+license.
 
 Bundled as lib/com.google.protobuf-protobuf-java-3.4.0.jar
 Source available at https://github.com/google/protobuf/tree/v3.4.0
+For details, see deps/protobuf-3.4.0/LICENSE.
+
+Bundled as lib/com.google.protobuf.nano-protobuf-javanano-3.0.0-alpha-5.jar
+Source available at https://github.com/google/protobuf/tree/3.0.0-pre
+For details, see deps/protobuf-3.0.0/LICENSE.
+
+Bundled as com.google.protobuf-protobuf-java-util-3.3.1.jar
+Source available at https://github.com/google/protobuf/tree/v3.3.1
+For details, see deps/protobuf-3.3.1/LICENSE.
+
 ------------------------------------------------------------------------------------
 This product bundles Paranamer, which is available under a "3-clause BSD" license.
 For details, see deps/paranamer-2.8/LICENSE.txt.
@@ -501,4 +539,11 @@ Bundled as
   - lib/org.slf4j-slf4j-api-1.7.25.jar
   - lib/org.slf4j-slf4j-log4j12-1.7.25.jar
 Source available at https://github.com/qos-ch/slf4j/tree/v_1.7.25
+------------------------------------------------------------------------------------
+This product bundles the Google Auth Library, which is available under a "3-clause BSD"
+license. For details, see deps/google-auth-library-credentials-0.4.0/LICENSE
+
+Bundled as
+  - lib/com.google.auth-google-auth-library-credentials-0.4.0.jar
+Source available at https://github.com/google/google-auth-library-java/tree/0.4.0
 
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
index 1f437ab..168f8fd 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
@@ -241,6 +241,25 @@ Apache Software License, Version 2.
 - lib/net.jpountz.lz4-lz4-1.3.0.jar [25]
 - lib/org.codehaus.jackson-jackson-core-asl-1.9.11.jar [26]
 - lib/org.codehaus.jackson-jackson-mapper-asl-1.9.11.jar [27]
+- lib/com.google.api.grpc-proto-google-common-protos-0.1.9.jar [28]
+- lib/com.google.code.gson-gson-2.7.jar [29]
+- lib/com.google.instrumentation-instrumentation-api-0.4.3.jar [30]
+- lib/com.squareup.okhttp-okhttp-2.5.0.jar [31]
+- lib/com.squareup.okio-okio-1.6.0.jar [32]
+- lib/io.grpc-grpc-all-1.5.0.jar [33]
+- lib/io.grpc-grpc-auth-1.5.0.jar [33]
+- lib/io.grpc-grpc-context-1.5.0.jar [33]
+- lib/io.grpc-grpc-core-1.5.0.jar [33]
+- lib/io.grpc-grpc-netty-1.5.0.jar [33]
+- lib/io.grpc-grpc-okhttp-1.5.0.jar [33]
+- lib/io.grpc-grpc-protobuf-1.5.0.jar [33]
+- lib/io.grpc-grpc-protobuf-lite-1.5.0.jar [33]
+- lib/io.grpc-grpc-protobuf-nano-1.5.0.jar [33]
+- lib/io.grpc-grpc-stub-1.5.0.jar [33]
+- lib/org.apache.curator-curator-client-4.0.1.jar [34]
+- lib/org.apache.curator-curator-framework-4.0.1.jar [34]
+- lib/org.apache.curator-curator-recipes-4.0.1.jar [34]
+- lib/org.inferred-freebuilder-1.14.9.jar [35]
 
 [1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -269,6 +288,14 @@ Apache Software License, Version 2.
 [25] Source available at https://github.com/lz4/lz4-java/tree/1.3.0
 [26] Source available at https://github.com/codehaus/jackson/tree/1.9
 [27] Source available at https://github.com/codehaus/jackson/tree/1.9
+[28] Source available at https://github.com/googleapis/googleapis
+[29] Source available at https://github.com/google/gson/tree/gson-parent-2.7
+[30] Source available at https://github.com/census-instrumentation/opencensus-java/tree/v0.4.3
+[31] Source available at https://github.com/square/okhttp/tree/parent-2.5.0
+[32] Source available at https://github.com/square/okio/tree/okio-parent-1.6.0
+[33] Source available at https://github.com/grpc/grpc-java/tree/v1.5.0
+[34] Source available at https://github.com/apache/curator/tree/apache-curator-4.0.1
+[35] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
 
 ------------------------------------------------------------------------------------
 lib/io.netty-netty-all-4.1.12.Final.jar bundles some 3rd party dependencies
@@ -372,10 +399,19 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 ------------------------------------------------------------------------------------
 This product bundles Google Protocal Buffers, which is available under a "3-clause BSD"
-license. For details, see deps/protobuf-3.4.0/LICENSE.
+license.
 
 Bundled as lib/com.google.protobuf-protobuf-java-3.4.0.jar
 Source available at https://github.com/google/protobuf/tree/v3.4.0
+For details, see deps/protobuf-3.4.0/LICENSE.
+
+Bundled as lib/com.google.protobuf.nano-protobuf-javanano-3.0.0-alpha-5.jar
+Source available at https://github.com/google/protobuf/tree/3.0.0-pre
+For details, see deps/protobuf-3.0.0/LICENSE.
+
+Bundled as com.google.protobuf-protobuf-java-util-3.3.1.jar
+Source available at https://github.com/google/protobuf/tree/v3.3.1
+For details, see deps/protobuf-3.3.1/LICENSE.
 ------------------------------------------------------------------------------------
 This product bundles the JCP Standard Java Servlet API, which is available under a
 CDDL 1.1 license. For details, see deps/javax.servlet-api-3.1.0/CDDL+GPL-1.1.
@@ -390,4 +426,10 @@ Bundled as
   - lib/org.slf4j-slf4j-api-1.7.25.jar
   - lib/org.slf4j-slf4j-log4j12-1.7.25.jar
 Source available at https://github.com/qos-ch/slf4j/tree/v_1.7.25
+------------------------------------------------------------------------------------
+This product bundles the Google Auth Library, which is available under a "3-clause BSD"
+license. For details, see deps/google-auth-library-credentials-0.4.0/LICENSE
 
+Bundled as
+  - lib/com.google.auth-google-auth-library-credentials-0.4.0.jar
+Source available at https://github.com/google/google-auth-library-java/tree/0.4.0
diff --git a/bookkeeper-dist/src/main/resources/deps/google-auth-library-credentials-0.4.0/LICENSE b/bookkeeper-dist/src/main/resources/deps/google-auth-library-credentials-0.4.0/LICENSE
new file mode 100644
index 0000000..12edf23
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/google-auth-library-credentials-0.4.0/LICENSE
@@ -0,0 +1,28 @@
+Copyright 2014, Google Inc. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+   * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+   * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+
+   * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/bookkeeper-dist/src/main/resources/deps/protobuf-3.0.0/LICENSE b/bookkeeper-dist/src/main/resources/deps/protobuf-3.0.0/LICENSE
new file mode 100644
index 0000000..2dcab42
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/protobuf-3.0.0/LICENSE
@@ -0,0 +1,32 @@
+Copyright 2014, Google Inc.  All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+    * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Code generated by the Protocol Buffer compiler is owned by the owner
+of the input file used when generating it.  This code is not
+standalone and requires a support library to be linked with it.  This
+support library is itself covered by the above license.
diff --git a/bookkeeper-dist/src/main/resources/deps/protobuf-3.3.1/LICENSE b/bookkeeper-dist/src/main/resources/deps/protobuf-3.3.1/LICENSE
new file mode 100644
index 0000000..2dcab42
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/protobuf-3.3.1/LICENSE
@@ -0,0 +1,32 @@
+Copyright 2014, Google Inc.  All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+    * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Code generated by the Protocol Buffer compiler is owned by the owner
+of the input file used when generating it.  This code is not
+standalone and requires a support library to be linked with it.  This
+support library is itself covered by the above license.
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 2080b68..6e3c1b4 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -92,7 +92,10 @@ bookiePort=3181
 # Configure a list of server components to enable and load on a bookie server.
 # This provides the plugin run extra services along with a bookie server.
 #
-# extraServerComponents=
+# NOTE: if bookie fails to load any of extra components configured below, bookie will continue
+#       function by ignoring the components configured below.
+# extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
+extraServerComponents=
 
 #############################################################################
 ## Thread settings
@@ -878,3 +881,26 @@ zkEnableSecurity=false
 
 # The time to backoff when replication worker encounters exceptions on replicating a ledger, in milliseconds.
 # rwRereplicateBackoffMs=5000
+
+
+##################################################################
+##################################################################
+# Settings below are used by stream/table service
+##################################################################
+##################################################################
+
+### Grpc Server ###
+
+# the grpc server port to listen on. default is 4181
+storageserver.grpc.port=4181
+
+### Storage ###
+
+# local storage directories for storing table ranges data (e.g. rocksdb sst files)
+storage.range.store.dirs=data/bookkeeper/ranges
+
+# whether the storage server capable of serving readonly tables. default is false.
+storage.serve.readonly.tables=false
+
+# the cluster controller schedule interval, in milliseconds. default is 30 seconds.
+storage.cluster.controller.schedule.interval.ms=30000
diff --git a/pom.xml b/pom.xml
index 3c35d8d..8a4ef12 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
     <curator.version>4.0.1</curator.version>
     <dropwizard.version>3.1.0</dropwizard.version>
     <finagle.version>6.44.0</finagle.version>
-    <freebuilder.version>1.12.3</freebuilder.version>
+    <freebuilder.version>1.14.9</freebuilder.version>
     <google.code.version>3.0.2</google.code.version>
     <google.errorprone.version>2.1.2</google.errorprone.version>
     <grpc.version>1.5.0</grpc.version>
@@ -362,6 +362,12 @@
         <groupId>io.grpc</groupId>
         <artifactId>grpc-all</artifactId>
         <version>${grpc.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.google.errorprone</groupId>
+            <artifactId>error_prone_annotations</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
 
       <!-- rocksdb dependencies -->
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 63e2679..11bc8af 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.net.BindException;
 import java.net.URI;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -43,20 +42,15 @@ import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
-import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.StorageServer;
 import org.apache.bookkeeper.stream.storage.StorageConstants;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
-import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LocalDLMEmulator;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * A Cluster that runs a few storage nodes.
@@ -141,28 +135,9 @@ public class StreamCluster
     }
 
     private void initializeCluster() throws Exception {
-        try (CuratorFramework client = CuratorFrameworkFactory.newClient(
-            zkEnsemble,
-            new ExponentialBackoffRetry(100, Integer.MAX_VALUE, 10000)
-        )) {
-            client.start();
-
-            ZkClusterMetadataStore store = new ZkClusterMetadataStore(client, zkEnsemble, ZK_METADATA_ROOT_PATH);
-
-            ClusterMetadata metadata;
-            try {
-                metadata = store.getClusterMetadata();
-                log.info("Loaded cluster metadata : \n{}", metadata);
-            } catch (StorageRuntimeException sre) {
-                if (sre.getCause() instanceof KeeperException.NoNodeException) {
-                    log.info("Initializing the stream cluster.");
-                    store.initializeCluster(spec.numServers() * 2);
-                    log.info("Successfully initialized the stream cluster : \n{}", store.getClusterMetadata());
-                } else {
-                    throw sre;
-                }
-            }
-        }
+        new ZkClusterInitializer(zkEnsemble).initializeCluster(
+            URI.create("zk://" + zkEnsemble),
+            spec.numServers() * 2);
 
         // format the bookkeeper cluster
         MetadataDrivers.runFunctionWithMetadataBookieDriver(newBookieConfiguration(zkEnsemble), driver -> {
@@ -210,11 +185,10 @@ public class StreamCluster
                 log.info("Attempting to start storage server at (bookie port = {}, grpc port = {})"
                         + " : bkDir = {}, rangesStoreDir = {}, serveReadOnlyTables = {}",
                     bookiePort, grpcPort, bkDir, rangesStoreDir, spec.serveReadOnlyTable);
-                server = StorageServer.startStorageServer(
+                server = StorageServer.buildStorageServer(
                     serverConf,
                     grpcPort,
-                    spec.numServers() * 2,
-                    Optional.empty());
+                    spec.numServers() * 2);
                 server.start();
                 log.info("Started storage server at (bookie port = {}, grpc port = {})",
                     bookiePort, grpcPort);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index d5ca397..e0b87ef 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -13,6 +13,7 @@
  */
 package org.apache.bookkeeper.stream.server;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
 
 import com.beust.jcommander.JCommander;
@@ -21,15 +22,16 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.UnknownHostException;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.component.ComponentStarter;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponentStack;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog.DLCheckpointStore;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
@@ -37,6 +39,7 @@ import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
 import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
 import org.apache.bookkeeper.stream.server.grpc.GrpcServerSpec;
 import org.apache.bookkeeper.stream.server.service.BookieService;
+import org.apache.bookkeeper.stream.server.service.BookieWatchService;
 import org.apache.bookkeeper.stream.server.service.ClusterControllerService;
 import org.apache.bookkeeper.stream.server.service.CuratorProviderService;
 import org.apache.bookkeeper.stream.server.service.DLNamespaceProviderService;
@@ -58,6 +61,7 @@ import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.distributedlog.DistributedLogConfiguration;
 
 /**
  * A storage server is a server that run storage service and serving rpc requests.
@@ -71,7 +75,7 @@ public class StorageServer {
         private String serverConfigFile;
 
         @Parameter(names = {"-p", "--port"}, description = "Port to listen on for gPRC server")
-        private int port = 3182;
+        private int port = 4181;
 
         @Parameter(names = {"-h", "--help"}, description = "Show this help message")
         private boolean help = false;
@@ -137,11 +141,10 @@ public class StorageServer {
 
         LifecycleComponent storageServer;
         try {
-            storageServer = startStorageServer(
+            storageServer = buildStorageServer(
                 conf,
                 grpcPort,
-                1024,
-                Optional.empty());
+                1024);
         } catch (ConfigurationException e) {
             log.error("Invalid storage configuration", e);
             return ExitCode.INVALID_CONF.code();
@@ -164,11 +167,23 @@ public class StorageServer {
         return ExitCode.OK.code();
     }
 
-    public static LifecycleComponent startStorageServer(CompositeConfiguration conf,
+    public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
+                                                        int grpcPort,
+                                                        int numStorageContainers)
+            throws UnknownHostException, ConfigurationException {
+        return buildStorageServer(conf, grpcPort, numStorageContainers, true, NullStatsLogger.INSTANCE);
+    }
+
+    public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
                                                         int grpcPort,
                                                         int numStorageContainers,
-                                                        Optional<String> instanceName)
+                                                        boolean startBookieAndStartProvider,
+                                                        StatsLogger externalStatsLogger)
         throws ConfigurationException, UnknownHostException {
+
+        LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder()
+            .withName("storage-server");
+
         BookieConfiguration bkConf = BookieConfiguration.of(conf);
         bkConf.validate();
 
@@ -188,42 +203,47 @@ public class StorageServer {
         StorageResources storageResources = StorageResources.create();
 
         // Create the stats provider
-        StatsProviderService statsProviderService = new StatsProviderService(bkConf);
-        StatsLogger rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");
+        StatsLogger rootStatsLogger;
+        if (startBookieAndStartProvider) {
+            StatsProviderService statsProviderService = new StatsProviderService(bkConf);
+            rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");
+            serverBuilder.addComponent(statsProviderService);
+        } else {
+            rootStatsLogger = checkNotNull(externalStatsLogger,
+                "External stats logger is not provided while not starting stats provider");
+        }
 
         // Create the bookie service
-        BookieService bookieService = new BookieService(bkConf, rootStatsLogger);
+        ServerConfiguration bkServerConf;
+        if (startBookieAndStartProvider) {
+            BookieService bookieService = new BookieService(bkConf, rootStatsLogger);
+            serverBuilder.addComponent(bookieService);
+            bkServerConf = bookieService.serverConf();
+        } else {
+            bkServerConf = new ServerConfiguration();
+            bkServerConf.loadConf(bkConf.getUnderlyingConf());
+        }
+
+        // Create the bookie watch service
+        BookieWatchService bkWatchService;
+        {
+            DistributedLogConfiguration dlogConf = new DistributedLogConfiguration();
+            bkWatchService = new BookieWatchService(
+                dlogConf.getEnsembleSize(),
+                bkConf,
+                NullStatsLogger.INSTANCE);
+        }
 
         // Create the curator provider service
         CuratorProviderService curatorProviderService = new CuratorProviderService(
-            bookieService.serverConf(), dlConf, rootStatsLogger.scope("curator"));
+            bkServerConf, dlConf, rootStatsLogger.scope("curator"));
 
         // Create the distributedlog namespace service
         DLNamespaceProviderService dlNamespaceProvider = new DLNamespaceProviderService(
-            bookieService.serverConf(),
+            bkServerConf,
             dlConf,
             rootStatsLogger.scope("dlog"));
 
-        // Create a registration service provider
-        RegistrationServiceProvider regService = new RegistrationServiceProvider(
-            bookieService.serverConf(),
-            dlConf,
-            rootStatsLogger.scope("registration").scope("provider"));
-
-        // Create a cluster controller service
-        ClusterControllerService clusterControllerService = new ClusterControllerService(
-            storageConf,
-            () -> new ClusterControllerImpl(
-                new ZkClusterMetadataStore(
-                    curatorProviderService.get(),
-                    ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
-                    ZK_METADATA_ROOT_PATH),
-                regService.get(),
-                new DefaultStorageContainerController(),
-                new ZkClusterControllerLeaderSelector(curatorProviderService.get(), ZK_METADATA_ROOT_PATH),
-                storageConf),
-            rootStatsLogger.scope("cluster_controller"));
-
         // Create range (stream) store
         RangeStoreBuilder rangeStoreBuilder = RangeStoreBuilder.newBuilder()
             .withStatsLogger(rootStatsLogger.scope("storage"))
@@ -241,7 +261,7 @@ public class StorageServer {
                     storageConf,
                     new ZkClusterMetadataStore(
                         curatorProviderService.get(),
-                        ZKMetadataDriverBase.resolveZkServers(bookieService.serverConf()),
+                        ZKMetadataDriverBase.resolveZkServers(bkServerConf),
                         ZK_METADATA_ROOT_PATH),
                     registry,
                     rootStatsLogger.scope("sc").scope("manager")))
@@ -267,26 +287,44 @@ public class StorageServer {
         GrpcService grpcService = new GrpcService(
             serverConf, serverSpec, rpcStatsLogger);
 
+        // Create a registration service provider
+        RegistrationServiceProvider regService = new RegistrationServiceProvider(
+            bkServerConf,
+            dlConf,
+            rootStatsLogger.scope("registration").scope("provider"));
+
         // Create a registration state service only when service is ready.
         RegistrationStateService regStateService = new RegistrationStateService(
             myEndpoint,
-            bookieService.serverConf(),
+            bkServerConf,
             bkConf,
             regService,
             rootStatsLogger.scope("registration"));
 
+        // Create a cluster controller service
+        ClusterControllerService clusterControllerService = new ClusterControllerService(
+            storageConf,
+            () -> new ClusterControllerImpl(
+                new ZkClusterMetadataStore(
+                    curatorProviderService.get(),
+                    ZKMetadataDriverBase.resolveZkServers(bkServerConf),
+                    ZK_METADATA_ROOT_PATH),
+                regService.get(),
+                new DefaultStorageContainerController(),
+                new ZkClusterControllerLeaderSelector(curatorProviderService.get(), ZK_METADATA_ROOT_PATH),
+                storageConf),
+            rootStatsLogger.scope("cluster_controller"));
+
         // Create all the service stack
-        return LifecycleComponentStack.newBuilder()
-            .withName("storage-server")
-            .addComponent(statsProviderService)     // stats provider
-            .addComponent(bookieService)            // bookie server
+        return serverBuilder
+            .addComponent(bkWatchService)           // service that watches bookies
             .addComponent(curatorProviderService)   // service that provides curator client
             .addComponent(dlNamespaceProvider)      // service that provides dl namespace
-            .addComponent(regService)               // service that provides registration client
-            .addComponent(clusterControllerService) // service that run cluster controller service
             .addComponent(storageService)           // range (stream) store
             .addComponent(grpcService)              // range (stream) server (gRPC)
+            .addComponent(regService)               // service that provides registration client
             .addComponent(regStateService)          // service that manages server state
+            .addComponent(clusterControllerService) // service that run cluster controller service
             .build();
     }
 
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
new file mode 100644
index 0000000..5e6b0b5
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.server;
+
+import java.net.UnknownHostException;
+import org.apache.bookkeeper.common.component.LifecycleComponent;
+import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
+import org.apache.bookkeeper.server.conf.BookieConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+
+/**
+ * This is a {@link ServerLifecycleComponent} to allow run stream storage component as part of bookie server.
+ */
+public class StreamStorageLifecycleComponent extends ServerLifecycleComponent {
+
+    private final LifecycleComponent streamStorage;
+
+    public StreamStorageLifecycleComponent(BookieConfiguration conf, StatsLogger statsLogger)
+            throws UnknownHostException, ConfigurationException {
+        super("stream-storage", conf, statsLogger);
+
+        StorageServerConfiguration ssConf = StorageServerConfiguration.of(conf.getUnderlyingConf());
+
+        this.streamStorage = StorageServer.buildStorageServer(
+            conf.getUnderlyingConf(),
+            ssConf.getGrpcPort(),
+            1024, /* indicator */
+            false,
+            statsLogger.scope("stream"));
+    }
+
+    @Override
+    protected void doStart() {
+        this.streamStorage.start();
+    }
+
+    @Override
+    protected void doStop() {
+        this.streamStorage.stop();
+    }
+
+    @Override
+    protected void doClose() {
+        this.streamStorage.close();
+    }
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
index b6803f2..e3c24c5 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/BookieConfiguration.java
@@ -21,13 +21,11 @@ import org.apache.commons.configuration.CompositeConfiguration;
  */
 public class BookieConfiguration extends ComponentConfiguration {
 
-    private static final String COMPONENT_PREFIX = "bookie" + DELIMITER;
-
     public static BookieConfiguration of(CompositeConfiguration conf) {
         return new BookieConfiguration(conf);
     }
 
     protected BookieConfiguration(CompositeConfiguration conf) {
-        super(conf, COMPONENT_PREFIX);
+        super(conf, "");
     }
 }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
index a0ef963..bc300c9 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
@@ -24,6 +24,8 @@ public class StorageServerConfiguration extends ComponentConfiguration {
 
     private static final String COMPONENT_PREFIX = "storageserver" + DELIMITER;
 
+    private static final String GRPC_PORT = "grpc.port";
+
     public static StorageServerConfiguration of(CompositeConfiguration conf) {
         return new StorageServerConfiguration(conf);
     }
@@ -31,4 +33,13 @@ public class StorageServerConfiguration extends ComponentConfiguration {
     private StorageServerConfiguration(CompositeConfiguration conf) {
         super(conf, COMPONENT_PREFIX);
     }
+
+    /**
+     * Returns the grpc port that serves requests coming into the stream storage server.
+     *
+     * @return grpc port
+     */
+    public int getGrpcPort() {
+        return getInt(GRPC_PORT, 4181);
+    }
 }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java
new file mode 100644
index 0000000..cfac0da
--- /dev/null
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/BookieWatchService.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.server.service;
+
+import com.google.common.base.Stopwatch;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
+
+/**
+ * A service that watches bookies and wait for minimum number of bookies to be alive.
+ */
+@Slf4j
+public class BookieWatchService
+    extends AbstractLifecycleComponent<BookieConfiguration> {
+
+    private final int minNumBookies;
+
+    public BookieWatchService(int minNumBookies,
+                              BookieConfiguration conf,
+                              StatsLogger statsLogger) {
+        super("bookie-watcher", conf, statsLogger);
+        this.minNumBookies = minNumBookies;
+    }
+
+    @Override
+    protected void doStart() {
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.loadConf(conf.getUnderlyingConf());
+
+        @Cleanup("shutdown") ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+
+        try {
+            MetadataDrivers.runFunctionWithMetadataClientDriver(clientConf, clientDriver -> {
+                try {
+                    waitingForNumBookies(clientDriver.getRegistrationClient(), minNumBookies);
+                } catch (Exception e) {
+                    log.error("Encountered exceptions on waiting {} bookies to be alive", minNumBookies);
+                    throw new RuntimeException("Encountered exceptions on waiting "
+                        + minNumBookies + " bookies to be alive", e);
+                }
+                return (Void) null;
+            }, executorService);
+        } catch (MetadataException | ExecutionException  e) {
+            throw new RuntimeException("Failed to start bookie watch service", e);
+        }
+    }
+
+    private static void waitingForNumBookies(RegistrationClient client, int minNumBookies) throws Exception {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        Set<BookieSocketAddress> bookies = FutureUtils.result(client.getWritableBookies()).getValue();
+        while (bookies.size() < minNumBookies) {
+            TimeUnit.SECONDS.sleep(1);
+            bookies = FutureUtils.result(client.getWritableBookies()).getValue();
+            log.info("Only {} bookies are live since {} seconds elapsed, wait for another 1 second",
+                bookies.size(), stopwatch.elapsed(TimeUnit.SECONDS));
+        }
+    }
+
+    @Override
+    protected void doStop() {}
+
+    @Override
+    protected void doClose() {}
+
+}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
index 7d404f0..9bd6ab4 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/CuratorProviderService.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.stream.server.service;
 
 import java.io.IOException;
 import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
@@ -33,6 +34,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 /**
  * A service to provide a curator client.
  */
+@Slf4j
 public class CuratorProviderService
     extends AbstractLifecycleComponent<DLConfiguration>
     implements Supplier<CuratorFramework> {
@@ -57,6 +59,7 @@ public class CuratorProviderService
     @Override
     protected void doStart() {
         curatorClient.start();
+        log.info("Provided curator clients to zookeeper {}.", zkServers);
     }
 
     @Override
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
index e0359ce..4550df5 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
@@ -69,6 +69,7 @@ public class DLNamespaceProviderService
     }
 
     private final ServerConfiguration bkServerConf;
+    @Getter
     private final DistributedLogConfiguration dlConf;
     private final DynamicDistributedLogConfiguration dlDynConf;
     @Getter
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
index 8db59fd..f1cda91 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/RegistrationServiceProvider.java
@@ -105,6 +105,8 @@ public class RegistrationServiceProvider
         }
     }
 
+
+
     @Override
     protected void doStop() {
     }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
new file mode 100644
index 0000000..61e5fa3
--- /dev/null
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.api.cluster;
+
+import java.net.URI;
+
+/**
+ * Initializing cluster metadata.
+ */
+public interface ClusterInitializer {
+
+    /**
+     * Retrieves whether the initializer thinks that it can initialize the metadata service
+     * specified by the given {@code metadataServiceUri}. Typically the implementations will
+     * return <tt>true</tt> if they understand the subprotocol specified in the URI and
+     * <tt>false</tt> if they do not.
+     *
+     * @param metatadataServiceUri the metadata service uri
+     * @return <tt>true</tt> if the implementation understands the given URI; <tt>false</tt> otherwise.
+     */
+    boolean acceptsURI(URI metatadataServiceUri);
+
+    /**
+     * Create a new cluster under metadata service specified by {@code metadataServiceUri}.
+     *
+     * @param metadataServiceUri metadata service uri
+     * @param numStorageContainers number storage containers
+     */
+    void initializeCluster(URI metadataServiceUri, int numStorageContainers);
+
+}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
index 935552a..d46f633 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.stream.storage.api.cluster;
 
+import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
@@ -29,12 +30,18 @@ import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
  */
 public interface ClusterMetadataStore extends AutoCloseable {
 
+
+    default void initializeCluster(int numStorageContainers) {
+        initializeCluster(numStorageContainers, Optional.empty());
+    }
+
     /**
      * Initialize the cluster metadata with the provided <i>numStorageContainers</i>.
      *
      * @param numStorageContainers number of storage containers.
+     * @param segmentStorePath segment store path
      */
-    void initializeCluster(int numStorageContainers);
+    void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath);
 
     /**
      * Get the current cluster assignment data.
diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml
index ae4a5e8..b178fc7 100644
--- a/stream/storage/impl/pom.xml
+++ b/stream/storage/impl/pom.xml
@@ -26,11 +26,6 @@
   <artifactId>stream-storage-service-impl</artifactId>
   <name>Apache BookKeeper :: Stream Storage :: Storage :: Impl</name>
 
-  <properties>
-    <!-- dependencies -->
-    <helix-core.version>0.6.7</helix-core.version>
-  </properties>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
@@ -52,11 +47,6 @@
       <artifactId>curator-recipes</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.helix</groupId>
-      <artifactId>helix-core</artifactId>
-      <version>${helix-core.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.distributedlog</groupId>
       <artifactId>distributedlog-core</artifactId>
       <version>${project.parent.version}</version>
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
index da74867..4313ce1 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
@@ -20,11 +20,10 @@ package org.apache.bookkeeper.stream.storage.impl.cluster;
 
 import com.google.common.collect.Maps;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
-import lombok.AccessLevel;
 import lombok.Data;
-import lombok.Getter;
 import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
 import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
 import org.apache.bookkeeper.stream.storage.api.cluster.ClusterMetadataStore;
@@ -55,7 +54,8 @@ public class InMemClusterMetadataStore implements ClusterMetadataStore {
     }
 
     @Override
-    public synchronized void initializeCluster(int numStorageContainers) {
+    public synchronized void initializeCluster(int numStorageContainers,
+                                               Optional<String> segmentStorePath) {
         this.metadata = ClusterMetadata.newBuilder()
             .setNumStorageContainers(numStorageContainers)
             .build();
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
new file mode 100644
index 0000000..832f15d
--- /dev/null
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.cluster;
+
+import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
+
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
+import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
+import org.apache.bookkeeper.stream.storage.exceptions.StorageRuntimeException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * ZooKeeper Based Cluster Initializer.
+ */
+@Slf4j
+public class ZkClusterInitializer implements ClusterInitializer  {
+
+    private final String zkExternalConnectString;
+
+    public ZkClusterInitializer(String zkServers) {
+        this.zkExternalConnectString = zkServers;
+    }
+
+    @Override
+    public boolean acceptsURI(URI metadataServiceUri) {
+        return metadataServiceUri.getScheme().toLowerCase().startsWith("zk");
+    }
+
+    @Override
+    public void initializeCluster(URI metadataServiceUri, int numStorageContainers) {
+        String zkInternalConnectString = ZKMetadataDriverBase.getZKServersFromServiceUri(metadataServiceUri);
+        // 1) `zkExternalConnectString` are the public endpoints, where the tool can interact with.
+        //    It allows the tools running outside of the cluster. It is useful for being used in dockerized environment.
+        // 2) `zkInternalConnectString` are the internal endpoints, where the services can interact with.
+        //    It is used by dlog to bind a namespace.
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(
+            zkExternalConnectString,
+            new ExponentialBackoffRetry(100, Integer.MAX_VALUE, 10000)
+        )) {
+            client.start();
+
+            ZkClusterMetadataStore store =
+                new ZkClusterMetadataStore(client, zkInternalConnectString, ZK_METADATA_ROOT_PATH);
+
+            ClusterMetadata metadata;
+            try {
+                metadata = store.getClusterMetadata();
+                log.info("Loaded cluster metadata : \n{}", metadata);
+            } catch (StorageRuntimeException sre) {
+                if (sre.getCause() instanceof KeeperException.NoNodeException) {
+                    log.info("Initializing the stream cluster with {} storage containers with segment store path {}.",
+                        numStorageContainers);
+
+                    String ledgersPath = metadataServiceUri.getPath();
+                    Optional<String> segmentStorePath;
+                    if (Strings.isNullOrEmpty(ledgersPath) || "/" == ledgersPath) {
+                        segmentStorePath = Optional.empty();
+                    } else {
+                        segmentStorePath = Optional.of(ledgersPath);
+                    }
+
+                    store.initializeCluster(numStorageContainers, segmentStorePath);
+                    log.info("Successfully initialized the stream cluster : \n{}", store.getClusterMetadata());
+                } else {
+                    throw sre;
+                }
+            }
+        }
+
+    }
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
index bcf70c3..59af968 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
@@ -29,6 +29,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
@@ -92,7 +93,7 @@ public class ZkClusterMetadataStore implements ClusterMetadataStore {
     }
 
     @Override
-    public void initializeCluster(int numStorageContainers) {
+    public void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
         ClusterMetadata metadata = ClusterMetadata.newBuilder()
             .setNumStorageContainers(numStorageContainers)
             .build();
@@ -101,7 +102,7 @@ public class ZkClusterMetadataStore implements ClusterMetadataStore {
         try {
             // we are using dlog for the storage backend, so we need to initialize the dlog namespace
             BKDLConfig dlogConfig = new BKDLConfig(
-                zkServers, getSegmentsRootPath(zkRootPath));
+                zkServers, segmentStorePath.orElse(getSegmentsRootPath(zkRootPath)));
             DLMetadata dlogMetadata = DLMetadata.create(dlogConfig);
 
             client.transaction()
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
index 4d2828b..73d6e17 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.when;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -82,7 +83,7 @@ public class ClusterControllerLeaderImplTest {
         ClusterMetadataStore originalStore = metadataStore;
         this.metadataStore = new ClusterMetadataStore() {
             @Override
-            public void initializeCluster(int numStorageContainers) {
+            public void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
                 originalStore.initializeCluster(numStorageContainers);
             }
 
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
index 874ec2a..d70b4f2 100644
--- a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
@@ -20,11 +20,14 @@ package org.apache.bookkeeper.tests.containers;
 
 import static java.time.temporal.ChronoUnit.SECONDS;
 
+import com.google.common.base.Strings;
 import java.net.URI;
 import java.time.Duration;
 import java.util.Objects;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.tests.DockerUtils;
 import org.apache.bookkeeper.tests.containers.wait.HttpWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
 
 /**
  * Test Container for Bookies.
@@ -40,13 +43,16 @@ public class BookieContainer<SELF extends BookieContainer<SELF>> extends ChaosCo
 
     private final String hostname;
     private final String metadataServiceUri;
+    private final String extraServerComponents;
 
     public BookieContainer(String clusterName,
                            String hostname,
-                           String metadataServiceUri) {
+                           String metadataServiceUri,
+                           String extraServerComponents) {
         super(clusterName, IMAGE_NAME);
         this.hostname = hostname;
         this.metadataServiceUri = metadataServiceUri;
+        this.extraServerComponents = extraServerComponents;
     }
 
     @Override
@@ -54,6 +60,14 @@ public class BookieContainer<SELF extends BookieContainer<SELF>> extends ChaosCo
         return clusterName + "-" + hostname;
     }
 
+    public String getExternalGrpcEndpointStr() {
+        return getContainerIpAddress() + ":" + getMappedPort(BOOKIE_GRPC_PORT);
+    }
+
+    public String getInternalGrpcEndpointStr() {
+        return DockerUtils.getContainerIP(dockerClient, containerId) + ":" + BOOKIE_GRPC_PORT;
+    }
+
     @Override
     protected void configure() {
         addExposedPorts(
@@ -65,20 +79,28 @@ public class BookieContainer<SELF extends BookieContainer<SELF>> extends ChaosCo
         addEnv("BK_httpServerPort", "" + BOOKIE_HTTP_PORT);
         addEnv("BK_metadataServiceUri", metadataServiceUri);
         addEnv("BK_useHostNameAsBookieID", "true");
+        addEnv("BK_extraServerComponents", extraServerComponents);
         if (metadataServiceUri.toLowerCase().startsWith("zk")) {
             URI uri = URI.create(metadataServiceUri);
             addEnv("BK_zkServers", uri.getAuthority());
             addEnv("BK_zkLedgersRootPath", uri.getPath());
         }
+        // grpc port
+        addEnv("BK_storageserver.grpc.port", "" + BOOKIE_GRPC_PORT);
     }
 
     @Override
     public void start() {
-        this.waitStrategy = new HttpWaitStrategy()
-            .forPath("/heartbeat")
-            .forStatusCode(200)
-            .forPort(BOOKIE_HTTP_PORT)
-            .withStartupTimeout(Duration.of(60, SECONDS));
+        if (Strings.isNullOrEmpty(extraServerComponents)) {
+            this.waitStrategy = new HttpWaitStrategy()
+                .forPath("/heartbeat")
+                .forStatusCode(200)
+                .forPort(BOOKIE_HTTP_PORT)
+                .withStartupTimeout(Duration.of(60, SECONDS));
+        } else {
+            this.waitStrategy = new HostPortWaitStrategy()
+                .withStartupTimeout(Duration.of(300, SECONDS));
+        }
         this.withCreateContainerCmdModifier(createContainerCmd -> {
             createContainerCmd.withHostName(hostname);
             createContainerCmd.withName(getContainerName());
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
index 36d5d86..ebc0c87 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
@@ -33,6 +33,7 @@ import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.tests.integration.topologies.BKCluster;
+import org.apache.bookkeeper.tests.integration.topologies.BKClusterSpec;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -50,15 +51,31 @@ public abstract class BookKeeperClusterTestBase {
 
     @BeforeClass
     public static void setupCluster() throws Exception {
-        bkCluster = new BKCluster(RandomStringUtils.randomAlphabetic(8), 0);
+        BKClusterSpec spec = BKClusterSpec.builder()
+            .clusterName(RandomStringUtils.randomAlphabetic(8))
+            .numBookies(0)
+            .build();
+
+        setupCluster(spec);
+    }
+
+    protected static void setupCluster(BKClusterSpec spec) throws Exception {
+        log.info("Setting up cluster {} with {} bookies : extraServerComponents = {}",
+            spec.clusterName(), spec.numBookies(), spec.extraServerComponents());
+
+        bkCluster = BKCluster.forSpec(spec);
         bkCluster.start();
 
+        log.info("Cluster {} is setup", spec.clusterName());
+
         metadataServiceUri = URI.create(bkCluster.getExternalServiceUri());
         ClientConfiguration conf = new ClientConfiguration()
             .setMetadataServiceUri(metadataServiceUri.toString());
         executor = Executors.newSingleThreadScheduledExecutor();
         metadataClientDriver = MetadataDrivers.getClientDriver(metadataServiceUri);
         metadataClientDriver.initialize(conf, executor, NullStatsLogger.INSTANCE, Optional.empty());
+
+        log.info("Initialized metadata client driver : {}", metadataServiceUri);
     }
 
     @AfterClass
@@ -74,7 +91,7 @@ public abstract class BookKeeperClusterTestBase {
         }
     }
 
-    private boolean findIfBookieRegistered(String bookieName) throws Exception {
+    private static boolean findIfBookieRegistered(String bookieName) throws Exception {
         Set<BookieSocketAddress> bookies =
             FutureUtils.result(metadataClientDriver.getRegistrationClient().getWritableBookies()).getValue();
         Optional<BookieSocketAddress> registered =
@@ -82,7 +99,7 @@ public abstract class BookKeeperClusterTestBase {
         return registered.isPresent();
     }
 
-    protected void waitUntilBookieUnregistered(String bookieName) throws Exception {
+    protected static void waitUntilBookieUnregistered(String bookieName) throws Exception {
         Stopwatch sw = Stopwatch.createStarted();
         while (findIfBookieRegistered(bookieName)) {
             TimeUnit.MILLISECONDS.sleep(1000);
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/LocationClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
similarity index 83%
rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/LocationClientTest.java
rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
index bf31321..42b428e 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/LocationClientTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
 import static org.junit.Assert.assertEquals;
@@ -33,25 +33,27 @@ import org.apache.bookkeeper.common.util.Revisioned;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
  * Integration test for location test.
  */
 @Slf4j
-public class LocationClientTest extends StorageServerTestBase {
+public class LocationClientTest extends StreamClusterTestBase {
 
     private OrderedScheduler scheduler;
     private LocationClient client;
 
-    @Override
-    protected void doSetup() throws Exception {
+    @Before
+    public void setup() {
         scheduler = OrderedScheduler.newSchedulerBuilder()
             .name("location-client-test")
             .numThreads(1)
             .build();
         StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
+            .addEndpoints(getExsternalStreamEndpoints().toArray(new Endpoint[getNumBookies()]))
             .usePlaintext(true)
             .build();
         client = new LocationClientImpl(
@@ -59,8 +61,8 @@ public class LocationClientTest extends StorageServerTestBase {
             scheduler);
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != client) {
             client.close();
         }
@@ -80,13 +82,15 @@ public class LocationClientTest extends StorageServerTestBase {
         assertEquals(StatusCode.SUCCESS, oneResponse.getStatusCode());
 
         Endpoint endpoint = oneResponse.getEndpoint().getRwEndpoint();
-        log.info("Current cluster endpoints = {}", cluster.getRpcEndpoints());
+        log.info("Current cluster endpoints = {}", getInternalStreamEndpoints());
         log.info("Response : rw endpoint = {}", endpoint);
-        assertTrue(cluster.getRpcEndpoints().contains(endpoint));
+        assertTrue(getInternalStreamEndpoints().contains(endpoint));
 
         assertEquals(1, oneResponse.getEndpoint().getRoEndpointCount());
         endpoint = oneResponse.getEndpoint().getRoEndpoint(0);
         log.info("Response : ro endpoint = {}", endpoint);
-        assertTrue(cluster.getRpcEndpoints().contains(endpoint));
+        assertTrue(getInternalStreamEndpoints().contains(endpoint));
     }
+
+
 }
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
new file mode 100644
index 0000000..b86cc72
--- /dev/null
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.integration.stream;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase;
+import org.apache.bookkeeper.tests.integration.topologies.BKClusterSpec;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Similar as {@link org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase},
+ * but enabled stream storage for testing stream storage related features.
+ */
+@Slf4j
+public abstract class StreamClusterTestBase extends BookKeeperClusterTestBase {
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        BKClusterSpec spec = BKClusterSpec.builder()
+            .clusterName(RandomStringUtils.randomAlphabetic(8))
+            .numBookies(3)
+            .extraServerComponents("org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent")
+            .build();
+        BookKeeperClusterTestBase.setupCluster(spec);
+    }
+
+    @AfterClass
+    public static void teardownCluster() {
+        BookKeeperClusterTestBase.teardownCluster();
+    }
+
+    protected static int getNumBookies() {
+        return bkCluster.getBookieContainers().size();
+    }
+
+    protected static List<Endpoint> getExsternalStreamEndpoints() {
+        return bkCluster.getBookieContainers().values().stream()
+            .map(container ->
+                NetUtils.parseEndpoint(container.getExternalGrpcEndpointStr()))
+            .collect(Collectors.toList());
+    }
+
+    protected static List<Endpoint> getInternalStreamEndpoints() {
+        return bkCluster.getBookieContainers().values().stream()
+            .map(container ->
+                NetUtils.parseEndpoint(container.getInternalGrpcEndpointStr()))
+            .collect(Collectors.toList());
+    }
+
+
+}
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
index c1e9e84..80ae906 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
@@ -18,9 +18,11 @@
 
 package org.apache.bookkeeper.tests.integration.topologies;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -29,6 +31,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
 import org.apache.bookkeeper.tests.containers.BookieContainer;
 import org.apache.bookkeeper.tests.containers.MetadataStoreContainer;
 import org.apache.bookkeeper.tests.containers.ZKContainer;
@@ -40,21 +44,37 @@ import org.testcontainers.containers.Network;
 @Slf4j
 public class BKCluster {
 
+    /**
+     * BookKeeper Cluster Spec.
+     *
+     * @param spec bookkeeper cluster spec.
+     * @return the built bookkeeper cluster
+     */
+    public static BKCluster forSpec(BKClusterSpec spec) {
+        return new BKCluster(spec);
+    }
+
+    private final BKClusterSpec spec;
     @Getter
     private final String clusterName;
     private final Network network;
     private final MetadataStoreContainer metadataContainer;
     private final Map<String, BookieContainer> bookieContainers;
     private final int numBookies;
+    private final String extraServerComponents;
+    private volatile boolean enableContainerLog;
 
-    public BKCluster(String clusterName, int numBookies) {
-        this.clusterName = clusterName;
+    private BKCluster(BKClusterSpec spec) {
+        this.spec = spec;
+        this.clusterName = spec.clusterName();
         this.network = Network.newNetwork();
         this.metadataContainer = (MetadataStoreContainer) new ZKContainer(clusterName)
             .withNetwork(network)
             .withNetworkAliases(ZKContainer.HOST_NAME);
         this.bookieContainers = Maps.newTreeMap();
-        this.numBookies = numBookies;
+        this.numBookies = spec.numBookies();
+        this.extraServerComponents = spec.extraServerComponents();
+        this.enableContainerLog = spec.enableContainerLog();
     }
 
     public String getExternalServiceUri() {
@@ -67,10 +87,28 @@ public class BKCluster {
 
     public void start() throws Exception {
         // start the metadata store
+        if (enableContainerLog) {
+            this.metadataContainer.tailContainerLog();
+        }
         this.metadataContainer.start();
+        log.info("Successfully started metadata store container.");
 
         // init a new cluster
         initNewCluster(metadataContainer.getExternalServiceUri());
+        log.info("Successfully initialized metadata service uri : {}",
+            metadataContainer.getExternalServiceUri());
+
+        if (!Strings.isNullOrEmpty(extraServerComponents)) {
+            int numStorageContainers = numBookies > 0 ? 2 * numBookies : 8;
+            // initialize the stream storage.
+            new ZkClusterInitializer(
+                ZKMetadataDriverBase.getZKServersFromServiceUri(URI.create(metadataContainer.getExternalServiceUri()))
+            ).initializeCluster(
+                URI.create(metadataContainer.getInternalServiceUri()),
+                numStorageContainers);
+            log.info("Successfully initialized stream storage metadata with {} storage containers",
+                numStorageContainers);
+        }
 
         // create bookies
         createBookies("bookie", numBookies);
@@ -123,20 +161,36 @@ public class BKCluster {
         return container;
     }
 
-    public synchronized BookieContainer createBookie(String bookieName) {
-        BookieContainer container = getBookie(bookieName);
-        if (null == container) {
-            container = (BookieContainer) new BookieContainer(clusterName, bookieName, ZKContainer.SERVICE_URI)
-                .withNetwork(network)
-                .withNetworkAliases(bookieName);
+    public BookieContainer createBookie(String bookieName) {
+        boolean shouldStart = false;
+        BookieContainer container;
+        synchronized (this) {
+            container = getBookie(bookieName);
+            if (null == container) {
+                shouldStart = true;
+                log.info("Creating bookie {}", bookieName);
+                container = (BookieContainer) new BookieContainer(
+                    clusterName, bookieName, ZKContainer.SERVICE_URI, extraServerComponents
+                ).withNetwork(network).withNetworkAliases(bookieName);
+                if (enableContainerLog) {
+                    container.tailContainerLog();
+                }
+
+                log.info("Created bookie {}", bookieName);
+                bookieContainers.put(bookieName, container);
+            }
+        }
+
+        if (shouldStart) {
+            log.info("Starting bookie {}", bookieName);
             container.start();
-            bookieContainers.put(bookieName, container);
         }
         return container;
     }
 
-    public synchronized Map<String, BookieContainer> createBookies(String bookieNamePrefix, int numBookies)
+    public Map<String, BookieContainer> createBookies(String bookieNamePrefix, int numBookies)
             throws Exception {
+        log.info("Creating {} bookies with bookie name prefix '{}'", numBookies, bookieNamePrefix);
         List<CompletableFuture<Void>> startFutures = Lists.newArrayListWithExpectedSize(numBookies);
         Map<String, BookieContainer> containers = Maps.newHashMap();
         for (int i = 0; i < numBookies; i++) {
@@ -144,6 +198,7 @@ public class BKCluster {
             startFutures.add(
                 CompletableFuture.runAsync(() -> {
                     String bookieName = String.format("%s-%03d", bookieNamePrefix, idx);
+                    log.info("Starting bookie {} at cluster {}", bookieName, clusterName);
                     BookieContainer container = createBookie(bookieName);
                     synchronized (containers) {
                         containers.put(bookieName, container);
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java
new file mode 100644
index 0000000..e919d1a
--- /dev/null
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKClusterSpec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.integration.topologies;
+
+import lombok.Builder;
+import lombok.Builder.Default;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+/**
+ * Spec to build {@link BKCluster}.
+ */
+@Builder
+@Accessors(fluent = true)
+@Getter
+@Setter
+public class BKClusterSpec {
+
+    /**
+     * Returns the cluster name.
+     *
+     * @return the cluster name.
+     */
+    String clusterName;
+
+    /**
+     * Returns the number of bookies.
+     *
+     * @return the number of bookies;
+     */
+    @Default
+    int numBookies = 0;
+
+    /**
+     * Returns the extra server components.
+     *
+     * @return the extra server components.
+     */
+    @Default
+    String extraServerComponents = "";
+
+    /**
+     * Returns the flag whether to enable/disable container log.
+     *
+     * @return the flag whether to enable/disable container log.
+     */
+    @Default
+    boolean enableContainerLog = false;
+
+}

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.

[bookkeeper] 02/10: Allow bookie to start if failed to load extra server components

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit d869f7ca5b8189382deca1a80f00cde608bcbc1c
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue May 22 01:43:46 2018 -0700

    Allow bookie to start if failed to load extra server components
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    `extraServerComponents` provides the flexibility to allow bookie to start with extra server components.
    It acts as a plugin mechanism to extend the functionality of bookies. However it is okay to allow bookie
    start up when failed to load extra server components.
    
    This is a change for allowing starting table service as extra components in bookies.
    
    *Solution*
    
    Catch the exception and log it when failed to load extra server components.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1420 from sijie/allow_start_bookie
---
 .../main/java/org/apache/bookkeeper/server/Main.java | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
index 4815fee..1ef56de 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
@@ -48,6 +48,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.StringUtils;
 
 /**
  * A bookie server is a server that run bookie and serving rpc requests.
@@ -327,13 +328,18 @@ public class Main {
         // 5. build extra services
         String[] extraComponents = conf.getServerConf().getExtraServerComponents();
         if (null != extraComponents) {
-            List<ServerLifecycleComponent> components = loadServerComponents(
-                extraComponents,
-                conf,
-                rootStatsLogger);
-            for (ServerLifecycleComponent component : components) {
-                serverBuilder.addComponent(component);
-                log.info("Load lifecycle component : {}", component.getClass().getName());
+            try {
+                List<ServerLifecycleComponent> components = loadServerComponents(
+                    extraComponents,
+                    conf,
+                    rootStatsLogger);
+                for (ServerLifecycleComponent component : components) {
+                    serverBuilder.addComponent(component);
+                    log.info("Load lifecycle component : {}", component.getClass().getName());
+                }
+            } catch (Exception e) {
+                log.info("Failed to load extra components '{}' - {}. Continuing without those components.",
+                    StringUtils.join(extraComponents), e.getMessage());
             }
         }
 

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.