You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 21:01:25 UTC

[bookkeeper] 04/10: [TABLE SERVICE] Move integration tests under `stream/tests/integration` to `tests/integration/cluster`

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 86e23c26a51c6ef0b49f6164b3cb22bb44e08564
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue May 22 19:11:16 2018 -0700

    [TABLE SERVICE] Move integration tests under `stream/tests/integration` to `tests/integration/cluster`
    
    Descriptions of the changes in this PR:
    
    The original integration tests were written based a non-dockerized standalone stream cluster. Moved them to use
    the dockerized integration test framework. So all the integration tests are actually testing the table service run as part of bookies.
    
    This change is based on #1422 .
    a371ff2 is the change in this PR to be reviewed.
    
    Master Issue: #1205
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1423 from sijie/move_more_stream_it_tests
---
 .../bookkeeper/clients/StorageClientImpl.java      |   1 -
 .../clients/config/StorageClientSettings.java      |  11 ++
 .../clients/impl/channel/StorageServerChannel.java |  19 +++-
 .../impl/channel/StorageServerChannelManager.java  |   2 +-
 .../internal/StorageServerClientManagerImpl.java   |   2 +-
 .../clients/resolver/EndpointResolver.java         |  48 +++++++++
 stream/pom.xml                                     |   1 -
 .../util/StorageContainerPlacementPolicy.java      |   8 ++
 .../bookkeeper/stream/cluster/StreamCluster.java   |   3 +-
 .../bookkeeper/stream/server/StorageServer.java    |  25 +++--
 .../server/StreamStorageLifecycleComponent.java    |   1 -
 .../api/sc/StorageContainerManagerFactory.java     |   4 +-
 .../stream/storage/RangeStoreBuilder.java          |  22 +++-
 .../stream/storage/impl/RangeStoreImpl.java        |   8 +-
 .../stream/storage/impl/TestRangeStoreImpl.java    |   2 +-
 stream/tests/integration/pom.xml                   | 110 --------------------
 .../tests/integration/StorageClientTest.java       | 111 ---------------------
 .../tests/integration/StorageServerTestBase.java   |  80 ---------------
 .../src/test/resources/log4j.properties            |  55 ----------
 stream/tests/pom.xml                               |  32 ------
 .../integration/stream/LocationClientTest.java     |   6 +-
 .../stream}/StorageAdminClientTest.java            |  37 ++-----
 .../integration/stream/StreamClusterTestBase.java  |  34 +++++++
 .../integration/stream}/TableClientSimpleTest.java |  44 ++------
 .../tests/integration/stream}/TableClientTest.java |  44 ++------
 .../tests/integration/topologies/BKCluster.java    |  19 +++-
 26 files changed, 208 insertions(+), 521 deletions(-)

diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
index aff322c..37c8223 100644
--- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
+++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
@@ -116,7 +116,6 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable implements StorageCli
             serverManager.close();
             closeFuture.complete(null);
             SharedResourceManager.shared().release(resources.scheduler(), scheduler);
-            scheduler.shutdown();
         });
     }
 
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
index 20b5821..87768fa 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
@@ -24,6 +24,7 @@ import io.grpc.ManagedChannelBuilder;
 import io.grpc.NameResolver;
 import java.util.List;
 import java.util.Optional;
+import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.ClientConstants;
 import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
@@ -57,6 +58,15 @@ public interface StorageClientSettings {
     List<Endpoint> endpoints();
 
     /**
+     * Return the endpoint resolver for resolving individual endpoints.
+     *
+     * <p>The default resolver is an identity resolver.
+     *
+     * @return the endpoint resolver for resolving endpoints.
+     */
+    EndpointResolver endpointResolver();
+
+    /**
      * Returns the builder to create the managed channel.
      *
      * @return
@@ -99,6 +109,7 @@ public interface StorageClientSettings {
             numWorkerThreads(Runtime.getRuntime().availableProcessors());
             usePlaintext(true);
             backoffPolicy(ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+            endpointResolver(EndpointResolver.identity());
         }
 
         @Override
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index e8e72db..34dc957 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -24,6 +24,8 @@ import io.grpc.ManagedChannelBuilder;
 import java.util.Optional;
 import java.util.function.Function;
 import javax.annotation.concurrent.GuardedBy;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc;
@@ -42,8 +44,12 @@ import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceF
  */
 public class StorageServerChannel implements AutoCloseable {
 
-    public static Function<Endpoint, StorageServerChannel> factory(boolean usePlaintext) {
-        return (endpoint) -> new StorageServerChannel(endpoint, Optional.empty(), usePlaintext);
+    public static Function<Endpoint, StorageServerChannel> factory(StorageClientSettings settings) {
+        return (endpoint) -> new StorageServerChannel(
+            endpoint,
+            Optional.empty(),
+            settings.usePlaintext(),
+            settings.endpointResolver());
     }
 
     private final Optional<String> token;
@@ -63,14 +69,17 @@ public class StorageServerChannel implements AutoCloseable {
      *
      * @param endpoint range server endpoint.
      * @param token    token used to access range server
+     * @param usePlainText whether to plain text protocol or not
      */
     public StorageServerChannel(Endpoint endpoint,
                                 Optional<String> token,
-                                boolean usePlainText) {
+                                boolean usePlainText,
+                                EndpointResolver endpointResolver) {
         this.token = token;
+        Endpoint resolvedEndpoint = endpointResolver.resolve(endpoint);
         this.channel = ManagedChannelBuilder.forAddress(
-            endpoint.getHostname(),
-            endpoint.getPort())
+            resolvedEndpoint.getHostname(),
+            resolvedEndpoint.getPort())
             .usePlaintext(usePlainText)
             .build();
     }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
index 67b3d0f..308d25d 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
@@ -40,7 +40,7 @@ public class StorageServerChannelManager implements AutoCloseable {
     private final Function<Endpoint, StorageServerChannel> channelFactory;
 
     public StorageServerChannelManager(StorageClientSettings settings) {
-        this(StorageServerChannel.factory(settings.usePlaintext()));
+        this(StorageServerChannel.factory(settings));
     }
 
     @VisibleForTesting
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
index d219b6a..3ae3b6b 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
@@ -65,7 +65,7 @@ public class StorageServerClientManagerImpl
         this(
             settings,
             schedulerResource,
-            StorageServerChannel.factory(settings.usePlaintext()));
+            StorageServerChannel.factory(settings));
     }
 
     public StorageServerClientManagerImpl(StorageClientSettings settings,
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java
new file mode 100644
index 0000000..e5002b1
--- /dev/null
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.resolver;
+
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+
+/**
+ * Resolve an endpoint to another endpoint.
+ *
+ * <p>The resolver can be used for resolving the right ip address for an advertised endpoint. It is typically useful
+ * in dockerized integration tests, where the test clients are typically outside of the docker network.
+ */
+public interface EndpointResolver {
+
+    /**
+     * Returns a resolver that always returns its input endpoint.
+     *
+     * @return a function that always returns its input endpoint
+     */
+    static EndpointResolver identity() {
+        return endpoint -> endpoint;
+    }
+
+    /**
+     * Resolve <tt>endpoint</tt> to another endpoint.
+     *
+     * @param endpoint endpoint to resolve
+     * @return the resolved endpoint.
+     */
+    Endpoint resolve(Endpoint endpoint);
+
+}
diff --git a/stream/pom.xml b/stream/pom.xml
index 09afddd..f2580ec 100644
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -37,7 +37,6 @@
     <module>storage</module>
     <module>server</module>
     <module>cli</module>
-    <module>tests</module>
   </modules>
 
   <build>
diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
index 8c886ea..340a731 100644
--- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
+++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
@@ -31,8 +31,16 @@ package org.apache.bookkeeper.stream.protocol.util;
 /**
  * Placement policy to place ranges to group.
  */
+@FunctionalInterface
 public interface StorageContainerPlacementPolicy {
 
+    @FunctionalInterface
+    interface Factory {
+
+        StorageContainerPlacementPolicy newPlacementPolicy();
+
+    }
+
     long placeStreamRange(long streamId, long rangeId);
 
 }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 11bc8af..ff0cda2 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -187,8 +187,7 @@ public class StreamCluster
                     bookiePort, grpcPort, bkDir, rangesStoreDir, spec.serveReadOnlyTable);
                 server = StorageServer.buildStorageServer(
                     serverConf,
-                    grpcPort,
-                    spec.numServers() * 2);
+                    grpcPort);
                 server.start();
                 log.info("Started storage server at (bookie port = {}, grpc port = {})",
                     bookiePort, grpcPort);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index e0b87ef..94c8587 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -55,6 +55,7 @@ import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
 import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
 import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
 import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactoryImpl;
 import org.apache.commons.configuration.CompositeConfiguration;
@@ -143,8 +144,7 @@ public class StorageServer {
         try {
             storageServer = buildStorageServer(
                 conf,
-                grpcPort,
-                1024);
+                grpcPort);
         } catch (ConfigurationException e) {
             log.error("Invalid storage configuration", e);
             return ExitCode.INVALID_CONF.code();
@@ -168,15 +168,13 @@ public class StorageServer {
     }
 
     public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
-                                                        int grpcPort,
-                                                        int numStorageContainers)
+                                                        int grpcPort)
             throws UnknownHostException, ConfigurationException {
-        return buildStorageServer(conf, grpcPort, numStorageContainers, true, NullStatsLogger.INSTANCE);
+        return buildStorageServer(conf, grpcPort, true, NullStatsLogger.INSTANCE);
     }
 
     public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
                                                         int grpcPort,
-                                                        int numStorageContainers,
                                                         boolean startBookieAndStartProvider,
                                                         StatsLogger externalStatsLogger)
         throws ConfigurationException, UnknownHostException {
@@ -250,12 +248,21 @@ public class StorageServer {
             .withStorageConfiguration(storageConf)
             // the storage resources shared across multiple components
             .withStorageResources(storageResources)
-            // the number of storage containers
-            .withNumStorageContainers(numStorageContainers)
+            // the placement policy
+            .withStorageContainerPlacementPolicyFactory(() -> {
+                long numStorageContainers;
+                try (ZkClusterMetadataStore store = new ZkClusterMetadataStore(
+                    curatorProviderService.get(),
+                    ZKMetadataDriverBase.resolveZkServers(bkServerConf),
+                    ZK_METADATA_ROOT_PATH)) {
+                    numStorageContainers = store.getClusterMetadata().getNumStorageContainers();
+                }
+                return StorageContainerPlacementPolicyImpl.of((int) numStorageContainers);
+            })
             // the default log backend uri
             .withDefaultBackendUri(dlNamespaceProvider.getDlogUri())
             // with zk-based storage container manager
-            .withStorageContainerManagerFactory((ignored, storeConf, registry) ->
+            .withStorageContainerManagerFactory((storeConf, registry) ->
                 new ZkStorageContainerManager(
                     myEndpoint,
                     storageConf,
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
index 5e6b0b5..6cf73b0 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
@@ -42,7 +42,6 @@ public class StreamStorageLifecycleComponent extends ServerLifecycleComponent {
         this.streamStorage = StorageServer.buildStorageServer(
             conf.getUnderlyingConf(),
             ssConf.getGrpcPort(),
-            1024, /* indicator */
             false,
             statsLogger.scope("stream"));
     }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
index daa130b..b89a465 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
@@ -24,13 +24,11 @@ public interface StorageContainerManagerFactory {
     /**
      * Create a storage container manager to manage lifecycles of {@link StorageContainer}.
      *
-     * @param numStorageContainers num of storage containers.
      * @param conf                 storage configuration
      * @param registry             storage container registry
      * @return storage container manager.
      */
-    StorageContainerManager create(int numStorageContainers,
-                                   StorageConfiguration conf,
+    StorageContainerManager create(StorageConfiguration conf,
                                    StorageContainerRegistry registry);
 
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
index 1bd5510..c03b374 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
@@ -19,10 +19,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import java.net.URI;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
 import org.apache.bookkeeper.stream.storage.api.RangeStore;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
 
 /**
@@ -38,8 +40,9 @@ public final class RangeStoreBuilder {
     private StorageConfiguration storeConf = null;
     private StorageResources storeResources = null;
     private StorageContainerManagerFactory scmFactory = null;
+    private StorageContainerPlacementPolicy.Factory placementPolicyFactory = () ->
+        StorageContainerPlacementPolicyImpl.of(1024);
     private MVCCStoreFactory mvccStoreFactory = null;
-    private int numStorageContainers = 1024;
     private URI defaultBackendUri = null;
 
     private RangeStoreBuilder() {
@@ -52,7 +55,19 @@ public final class RangeStoreBuilder {
      * @return range store builder
      */
     public RangeStoreBuilder withNumStorageContainers(int numStorageContainers) {
-        this.numStorageContainers = numStorageContainers;
+        this.placementPolicyFactory = () -> StorageContainerPlacementPolicyImpl.of(numStorageContainers);
+        return this;
+    }
+
+    /**
+     * Build the range store with the provided <tt>placementPolicyFactory</tt>.
+     *
+     * @param placementPolicyFactory placement policy factor to create placement policies.
+     * @return range store builder.
+     */
+    public RangeStoreBuilder withStorageContainerPlacementPolicyFactory(
+        StorageContainerPlacementPolicy.Factory placementPolicyFactory) {
+        this.placementPolicyFactory = placementPolicyFactory;
         return this;
     }
 
@@ -130,6 +145,7 @@ public final class RangeStoreBuilder {
         checkNotNull(storeConf, "StorageConfiguration is not provided");
         checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided");
         checkNotNull(defaultBackendUri, "Default backend uri is not provided");
+        checkNotNull(placementPolicyFactory, "Storage Container Placement Policy Factory is not provided");
 
         return new RangeStoreImpl(
             storeConf,
@@ -137,7 +153,7 @@ public final class RangeStoreBuilder {
             scmFactory,
             mvccStoreFactory,
             defaultBackendUri,
-            numStorageContainers,
+            placementPolicyFactory,
             statsLogger);
     }
 
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
index 42fe40b..f175a55 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
@@ -48,7 +48,6 @@ import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactor
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
-import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
 
@@ -71,15 +70,14 @@ public class RangeStoreImpl
                           StorageContainerManagerFactory factory,
                           MVCCStoreFactory mvccStoreFactory,
                           URI defaultBackendUri,
-                          int numStorageContainers,
+                          StorageContainerPlacementPolicy.Factory placementPolicyFactory,
                           StatsLogger statsLogger) {
         super("range-service", conf, statsLogger);
         this.schedulerResource = schedulerResource;
         this.scheduler = SharedResourceManager.shared().get(schedulerResource);
         this.scmFactory = factory;
-        StorageContainerPlacementPolicy placementPolicy =
-            StorageContainerPlacementPolicyImpl.of(numStorageContainers);
         this.storeFactory = mvccStoreFactory;
+        StorageContainerPlacementPolicy placementPolicy = placementPolicyFactory.newPlacementPolicy();
         this.scRegistry = new StorageContainerRegistryImpl(
             new DefaultStorageContainerFactory(
                 conf,
@@ -88,7 +86,7 @@ public class RangeStoreImpl
                 storeFactory,
                 defaultBackendUri),
             scheduler);
-        this.scManager = scmFactory.create(numStorageContainers, conf, scRegistry);
+        this.scManager = scmFactory.create(conf, scRegistry);
     }
 
     @Override
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
index 1befe82..ac4bb78 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
@@ -211,7 +211,7 @@ public class TestRangeStoreImpl {
         rangeStore = (RangeStoreImpl) RangeStoreBuilder.newBuilder()
             .withStorageConfiguration(storageConf)
             .withStorageResources(storageResources)
-            .withStorageContainerManagerFactory((numScs, storeConf, rgRegistry)
+            .withStorageContainerManagerFactory((storeConf, rgRegistry)
                 -> new LocalStorageContainerManager(endpoint, storeConf, rgRegistry, 2))
             .withRangeStoreFactory(storeFactory)
             .withDefaultBackendUri(URI.create("distributedlog://127.0.0.1/stream/storage"))
diff --git a/stream/tests/integration/pom.xml b/stream/tests/integration/pom.xml
deleted file mode 100644
index 2eca696..0000000
--- a/stream/tests/integration/pom.xml
+++ /dev/null
@@ -1,110 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.bookkeeper.tests</groupId>
-    <artifactId>stream-storage-tests-parent</artifactId>
-    <version>4.7.1-SNAPSHOT</version>
-  </parent>
-  <artifactId>stream-storage-integration-test</artifactId>
-  <name>Apache BookKeeper :: Stream Storage :: Tests :: Integration</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>stream-storage-java-client</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>stream-storage-server</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.distributedlog</groupId>
-      <artifactId>distributedlog-core</artifactId>
-      <version>${project.parent.version}</version>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-        <version>${maven-jar-plugin.version}</version>
-        <executions>
-          <execution>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <version>${maven-surefire-plugin.version}</version>
-        <configuration>
-          <!-- only run tests when -DstreamIntegrationTests is specified //-->
-          <skipTests>true</skipTests>
-          <redirectTestOutputToFile>true</redirectTestOutputToFile>
-          <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G</argLine>
-          <forkMode>always</forkMode>
-          <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
-  <profiles>
-    <profile>
-      <id>streamIntegrationTests</id>
-      <activation>
-        <property>
-          <name>streamIntegrationTests</name>
-        </property>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <skipTests>false</skipTests>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-deploy-plugin</artifactId>
-            <version>${maven-deploy-plugin.version}</version>
-            <configuration>
-              <skip>true</skip>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-</project>
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
deleted file mode 100644
index ebe99f0..0000000
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.tests.integration;
-
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_RETENTION_POLICY;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SEGMENT_ROLLING_POLICY;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SPLIT_POLICY;
-import static org.junit.Assert.assertEquals;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.api.StorageClient;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
-import org.apache.bookkeeper.clients.admin.StorageAdminClient;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
-import org.apache.bookkeeper.stream.proto.RangeKeyType;
-import org.apache.bookkeeper.stream.proto.StreamConfiguration;
-import org.apache.bookkeeper.stream.proto.StreamProperties;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Integration test for stream client test.
- */
-@Slf4j
-public class StorageClientTest extends StorageServerTestBase {
-
-    @Rule
-    public final TestName testName = new TestName();
-
-    private String nsName;
-    private String streamName;
-    private StorageAdminClient adminClient;
-    private StorageClient client;
-    private final StreamConfiguration streamConf = StreamConfiguration.newBuilder()
-        .setKeyType(RangeKeyType.HASH)
-        .setInitialNumRanges(4)
-        .setMinNumRanges(4)
-        .setRetentionPolicy(DEFAULT_RETENTION_POLICY)
-        .setRollingPolicy(DEFAULT_SEGMENT_ROLLING_POLICY)
-        .setSplitPolicy(DEFAULT_SPLIT_POLICY)
-        .build();
-    private final NamespaceConfiguration colConf = NamespaceConfiguration.newBuilder()
-        .setDefaultStreamConf(streamConf)
-        .build();
-
-    @Override
-    protected void doSetup() throws Exception {
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
-        nsName = "test_namespace";
-        FutureUtils.result(
-            adminClient.createNamespace(nsName, colConf));
-        client = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(nsName)
-            .build();
-        streamName = "test_stream";
-        createStream(streamName);
-    }
-
-    @Override
-    protected void doTeardown() throws Exception {
-        if (null != client) {
-            client.closeAsync();
-        }
-        if (null != adminClient) {
-            adminClient.closeAsync();
-        }
-    }
-
-    private void createStream(String streamName) throws Exception {
-        FutureUtils.result(
-            adminClient.createStream(
-                nsName,
-                streamName,
-                streamConf));
-    }
-
-    @Test
-    public void testAdmin() throws Exception {
-        StreamProperties properties =
-            FutureUtils.result(adminClient.getStream(nsName, streamName));
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build()
-            , properties.getStreamConf());
-    }
-
-}
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
deleted file mode 100644
index 21dc004..0000000
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.tests.integration;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.stream.cluster.StreamCluster;
-import org.apache.bookkeeper.stream.cluster.StreamClusterSpec;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-
-/**
- * Test Base for Range Server related tests.
- */
-@Slf4j
-public abstract class StorageServerTestBase {
-
-    static {
-        // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words
-        // are disabled by default due to security reasons
-        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
-    }
-
-    @Rule
-    public final TemporaryFolder testDir = new TemporaryFolder();
-
-    protected StreamClusterSpec spec;
-    protected StreamCluster cluster;
-
-    protected StorageServerTestBase() {
-        this(StreamClusterSpec.builder()
-            .baseConf(new CompositeConfiguration())
-            .numServers(3)
-            .build());
-    }
-
-    protected StorageServerTestBase(StreamClusterSpec spec) {
-        this.spec = spec;
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        spec = spec.storageRootDir(testDir.newFolder("tests"));
-        this.cluster = StreamCluster.build(spec);
-        this.cluster.start();
-        doSetup();
-    }
-
-    protected abstract void doSetup() throws Exception;
-
-    @After
-    public void tearDown() throws Exception {
-        doTeardown();
-        if (null != this.cluster) {
-            this.cluster.stop();
-        }
-    }
-
-    protected abstract void doTeardown() throws Exception;
-
-}
diff --git a/stream/tests/integration/src/test/resources/log4j.properties b/stream/tests/integration/src/test/resources/log4j.properties
deleted file mode 100644
index 614fe75..0000000
--- a/stream/tests/integration/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,55 +0,0 @@
-#/**
-# * Copyright 2007 The Apache Software Foundation
-# *
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-#
-# DisributedLog Logging Configuration
-#
-
-# Example with rolling log file
-log4j.rootLogger=INFO, CONSOLE
-
-#disable zookeeper logging
-log4j.logger.org.apache.zookeeper=OFF
-#Set the bookkeeper level to warning
-log4j.logger.org.apache.bookkeeper=INFO
-#disable helix logging
-log4j.logger.org.apache.helix=OFF
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.Threshold=DEBUG
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
-
-# Add ROLLINGFILE to rootLogger to get log file output
-#    Log DEBUG level and above messages to a log file
-#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
-#log4j.appender.ROLLINGFILE.Threshold=INFO
-#log4j.appender.ROLLINGFILE.File=stream.log
-#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
-#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm
-#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
-
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.Threshold=TRACE
-log4j.appender.R.File=target/error.log
-log4j.appender.R.MaxFileSize=200MB
-log4j.appender.R.MaxBackupIndex=7
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
diff --git a/stream/tests/pom.xml b/stream/tests/pom.xml
deleted file mode 100644
index 08c5332..0000000
--- a/stream/tests/pom.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <packaging>pom</packaging>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.bookkeeper</groupId>
-    <artifactId>stream-storage-parent</artifactId>
-    <version>4.7.1-SNAPSHOT</version>
-  </parent>
-  <groupId>org.apache.bookkeeper.tests</groupId>
-  <artifactId>stream-storage-tests-parent</artifactId>
-  <name>Apache BookKeeper :: Stream Storage :: Tests</name>
-  <modules>
-    <module>integration</module>
-  </modules>
-</project>
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
index 42b428e..92e39c3 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
@@ -52,12 +52,8 @@ public class LocationClientTest extends StreamClusterTestBase {
             .name("location-client-test")
             .numThreads(1)
             .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(getExsternalStreamEndpoints().toArray(new Endpoint[getNumBookies()]))
-            .usePlaintext(true)
-            .build();
         client = new LocationClientImpl(
-            settings,
+            newStorageClientSettings(),
             scheduler);
     }
 
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java
similarity index 86%
rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java
index 03edfb8..569cf7c 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 import static org.junit.Assert.assertEquals;
@@ -39,6 +39,8 @@ import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -46,37 +48,23 @@ import org.junit.rules.TestName;
 /**
  * Integration test for stream admin client test.
  */
-public class StorageAdminClientTest extends StorageServerTestBase {
+public class StorageAdminClientTest extends StreamClusterTestBase {
 
     @Rule
     public final TestName testName = new TestName();
 
-    private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
 
-    @Override
-    protected void doSetup() throws Exception {
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("admin-client-test")
-            .numThreads(1)
-            .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
+    @Before
+    public void setup() {
+        adminClient = createStorageAdminClient(newStorageClientSettings());
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != adminClient) {
             adminClient.close();
         }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
     }
 
     @Test
@@ -152,11 +140,6 @@ public class StorageAdminClientTest extends StorageServerTestBase {
             .build();
         StreamProperties streamProps = FutureUtils.result(adminClient.createStream(nsName, streamName, streamConf));
         assertEquals(streamName, streamProps.getStreamName());
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build(),
-            streamProps.getStreamConf());
 
         // create a duplicated stream
         try {
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
index b86cc72..fe7c914 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -21,6 +21,10 @@ package org.apache.bookkeeper.tests.integration.stream;
 import java.util.List;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.admin.StorageAdminClient;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.utils.NetUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase;
@@ -69,5 +73,35 @@ public abstract class StreamClusterTestBase extends BookKeeperClusterTestBase {
             .collect(Collectors.toList());
     }
 
+    //
+    // Test Util Methods
+    //
+
+    protected static StorageClientSettings newStorageClientSettings() {
+        return StorageClientSettings.newBuilder()
+            .addEndpoints(getExsternalStreamEndpoints().toArray(new Endpoint[getNumBookies()]))
+            .endpointResolver(endpoint -> {
+                String internalEndpointStr = NetUtils.endpointToString(endpoint);
+                String externalEndpointStr =
+                    bkCluster.resolveExternalGrpcEndpointStr(internalEndpointStr);
+                log.info("Resolve endpoint {} to {}", internalEndpointStr, externalEndpointStr);
+                return NetUtils.parseEndpoint(externalEndpointStr);
+            })
+            .usePlaintext(true)
+            .build();
+    }
+
+    protected static StorageAdminClient createStorageAdminClient(StorageClientSettings settings) {
+        return StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .buildAdmin();
+    }
+
+    protected static StorageClient createStorageClient(StorageClientSettings settings, String namespace) {
+        return StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .withNamespace(namespace)
+            .build();
+    }
 
 }
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
similarity index 85%
rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
index 7add6d7..d1ff091 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
@@ -38,15 +38,14 @@ import org.apache.bookkeeper.api.kv.PTable;
 import org.apache.bookkeeper.api.kv.exceptions.KvApiException;
 import org.apache.bookkeeper.api.kv.result.Code;
 import org.apache.bookkeeper.api.kv.result.KeyValue;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -55,47 +54,31 @@ import org.junit.rules.TestName;
  * Integration test for table service.
  */
 @Slf4j
-public class TableClientSimpleTest extends StorageServerTestBase {
+public class TableClientSimpleTest extends StreamClusterTestBase {
 
     @Rule
     public final TestName testName = new TestName();
 
     private final String namespace = "test_namespace";
-    private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
     private StorageClient storageClient;
 
-    @Override
-    protected void doSetup() throws Exception {
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("table-client-test")
-            .numThreads(1)
-            .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
+    @Before
+    public void setup() {
+        StorageClientSettings settings = newStorageClientSettings();
         String namespace = "test_namespace";
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
-        storageClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(namespace)
-            .build();
+        adminClient = createStorageAdminClient(settings);
+        storageClient = createStorageClient(settings, namespace);
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != adminClient) {
             adminClient.close();
         }
         if (null != storageClient) {
             storageClient.close();
         }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
     }
 
     private static ByteBuf getLKey(int i) {
@@ -123,11 +106,6 @@ public class TableClientSimpleTest extends StorageServerTestBase {
         StreamProperties streamProps = result(
             adminClient.createStream(namespace, streamName, streamConf));
         assertEquals(streamName, streamProps.getStreamName());
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build(),
-            streamProps.getStreamConf());
 
         // Open the table
         PTable<ByteBuf, ByteBuf> table = result(storageClient.openPTable(streamName));
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
similarity index 89%
rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
index 7db265b..0dff7fa 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
@@ -47,16 +47,15 @@ import org.apache.bookkeeper.api.kv.result.PutResult;
 import org.apache.bookkeeper.api.kv.result.RangeResult;
 import org.apache.bookkeeper.api.kv.result.Result;
 import org.apache.bookkeeper.api.kv.result.TxnResult;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -65,48 +64,32 @@ import org.junit.rules.TestName;
  * Integration test for table service.
  */
 @Slf4j
-public class TableClientTest extends StorageServerTestBase {
+public class TableClientTest extends StreamClusterTestBase {
 
     @Rule
     public final TestName testName = new TestName();
 
     private final String namespace = "test_namespace";
-    private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
     private StorageClient storageClient;
     private final OptionFactory<ByteBuf> optionFactory = new OptionFactoryImpl<>();
 
-    @Override
-    protected void doSetup() throws Exception {
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("table-client-test")
-            .numThreads(1)
-            .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
+    @Before
+    public void setup() {
+        StorageClientSettings settings = newStorageClientSettings();
         String namespace = "test_namespace";
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
-        storageClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(namespace)
-            .build();
+        adminClient = createStorageAdminClient(settings);
+        storageClient = createStorageClient(settings, namespace);
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != adminClient) {
             adminClient.close();
         }
         if (null != storageClient) {
             storageClient.close();
         }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
     }
 
     private static ByteBuf getLKey(int i) {
@@ -134,11 +117,6 @@ public class TableClientTest extends StorageServerTestBase {
         StreamProperties streamProps = FutureUtils.result(
             adminClient.createStream(namespace, streamName, streamConf));
         assertEquals(streamName, streamProps.getStreamName());
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build(),
-            streamProps.getStreamConf());
 
         // Open the table
         PTable<ByteBuf, ByteBuf> table = FutureUtils.result(storageClient.openPTable(streamName));
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
index 80ae906..9a7cd86 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
@@ -18,6 +18,8 @@
 
 package org.apache.bookkeeper.tests.integration.topologies;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -60,6 +62,7 @@ public class BKCluster {
     private final Network network;
     private final MetadataStoreContainer metadataContainer;
     private final Map<String, BookieContainer> bookieContainers;
+    private final Map<String, String> internalEndpointsToExternalEndpoints;
     private final int numBookies;
     private final String extraServerComponents;
     private volatile boolean enableContainerLog;
@@ -72,6 +75,7 @@ public class BKCluster {
             .withNetwork(network)
             .withNetworkAliases(ZKContainer.HOST_NAME);
         this.bookieContainers = Maps.newTreeMap();
+        this.internalEndpointsToExternalEndpoints = Maps.newConcurrentMap();
         this.numBookies = spec.numBookies();
         this.extraServerComponents = spec.extraServerComponents();
         this.enableContainerLog = spec.enableContainerLog();
@@ -114,6 +118,13 @@ public class BKCluster {
         createBookies("bookie", numBookies);
     }
 
+    public String resolveExternalGrpcEndpointStr(String internalGrpcEndpointStr) {
+        String externalGrpcEndpointStr = internalEndpointsToExternalEndpoints.get(internalGrpcEndpointStr);
+        checkNotNull(externalGrpcEndpointStr,
+            "No internal grpc endpoint is found : " + internalGrpcEndpointStr);
+        return externalGrpcEndpointStr;
+    }
+
     public void stop() {
         synchronized (this) {
             bookieContainers.values().forEach(BookieContainer::stop);
@@ -155,6 +166,7 @@ public class BKCluster {
         synchronized (this) {
             container = bookieContainers.remove(bookieName);
             if (null != container) {
+                internalEndpointsToExternalEndpoints.remove(container.getInternalGrpcEndpointStr());
                 container.stop();
             }
         }
@@ -175,8 +187,6 @@ public class BKCluster {
                 if (enableContainerLog) {
                     container.tailContainerLog();
                 }
-
-                log.info("Created bookie {}", bookieName);
                 bookieContainers.put(bookieName, container);
             }
         }
@@ -184,6 +194,11 @@ public class BKCluster {
         if (shouldStart) {
             log.info("Starting bookie {}", bookieName);
             container.start();
+            log.info("Started bookie {} : internal endpoint = {}, external endpoint = {}",
+                bookieName, container.getInternalGrpcEndpointStr(), container.getExternalGrpcEndpointStr());
+            internalEndpointsToExternalEndpoints.put(
+                container.getInternalGrpcEndpointStr(),
+                container.getExternalGrpcEndpointStr());
         }
         return container;
     }

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.