You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/05/23 02:11:27 UTC

[GitHub] sijie closed pull request #1423: [table service] Move integration tests under `stream/tests/integration` to `tests/integration/cluster`

sijie closed pull request #1423: [table service]  Move integration tests under `stream/tests/integration` to `tests/integration/cluster`
URL: https://github.com/apache/bookkeeper/pull/1423
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
index aff322c88..37c822347 100644
--- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
+++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
@@ -116,7 +116,6 @@ protected void closeAsyncOnce(CompletableFuture<Void> closeFuture) {
             serverManager.close();
             closeFuture.complete(null);
             SharedResourceManager.shared().release(resources.scheduler(), scheduler);
-            scheduler.shutdown();
         });
     }
 
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
index 20b582110..87768faaa 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
@@ -24,6 +24,7 @@
 import io.grpc.NameResolver;
 import java.util.List;
 import java.util.Optional;
+import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.ClientConstants;
 import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
@@ -56,6 +57,15 @@
      */
     List<Endpoint> endpoints();
 
+    /**
+     * Return the endpoint resolver for resolving individual endpoints.
+     *
+     * <p>The default resolver is an identity resolver.
+     *
+     * @return the endpoint resolver for resolving endpoints.
+     */
+    EndpointResolver endpointResolver();
+
     /**
      * Returns the builder to create the managed channel.
      *
@@ -99,6 +109,7 @@
             numWorkerThreads(Runtime.getRuntime().availableProcessors());
             usePlaintext(true);
             backoffPolicy(ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+            endpointResolver(EndpointResolver.identity());
         }
 
         @Override
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index e8e72db7f..34dc95790 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -24,6 +24,8 @@
 import java.util.Optional;
 import java.util.function.Function;
 import javax.annotation.concurrent.GuardedBy;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc;
@@ -42,8 +44,12 @@
  */
 public class StorageServerChannel implements AutoCloseable {
 
-    public static Function<Endpoint, StorageServerChannel> factory(boolean usePlaintext) {
-        return (endpoint) -> new StorageServerChannel(endpoint, Optional.empty(), usePlaintext);
+    public static Function<Endpoint, StorageServerChannel> factory(StorageClientSettings settings) {
+        return (endpoint) -> new StorageServerChannel(
+            endpoint,
+            Optional.empty(),
+            settings.usePlaintext(),
+            settings.endpointResolver());
     }
 
     private final Optional<String> token;
@@ -63,14 +69,17 @@
      *
      * @param endpoint range server endpoint.
      * @param token    token used to access range server
+     * @param usePlainText whether to plain text protocol or not
      */
     public StorageServerChannel(Endpoint endpoint,
                                 Optional<String> token,
-                                boolean usePlainText) {
+                                boolean usePlainText,
+                                EndpointResolver endpointResolver) {
         this.token = token;
+        Endpoint resolvedEndpoint = endpointResolver.resolve(endpoint);
         this.channel = ManagedChannelBuilder.forAddress(
-            endpoint.getHostname(),
-            endpoint.getPort())
+            resolvedEndpoint.getHostname(),
+            resolvedEndpoint.getPort())
             .usePlaintext(usePlainText)
             .build();
     }
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
index 67b3d0fb6..308d25db2 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
@@ -40,7 +40,7 @@
     private final Function<Endpoint, StorageServerChannel> channelFactory;
 
     public StorageServerChannelManager(StorageClientSettings settings) {
-        this(StorageServerChannel.factory(settings.usePlaintext()));
+        this(StorageServerChannel.factory(settings));
     }
 
     @VisibleForTesting
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
index d219b6aec..3ae3b6bb1 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
@@ -65,7 +65,7 @@ public StorageServerClientManagerImpl(StorageClientSettings settings,
         this(
             settings,
             schedulerResource,
-            StorageServerChannel.factory(settings.usePlaintext()));
+            StorageServerChannel.factory(settings));
     }
 
     public StorageServerClientManagerImpl(StorageClientSettings settings,
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java
new file mode 100644
index 000000000..e5002b10d
--- /dev/null
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.resolver;
+
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+
+/**
+ * Resolve an endpoint to another endpoint.
+ *
+ * <p>The resolver can be used for resolving the right ip address for an advertised endpoint. It is typically useful
+ * in dockerized integration tests, where the test clients are typically outside of the docker network.
+ */
+public interface EndpointResolver {
+
+    /**
+     * Returns a resolver that always returns its input endpoint.
+     *
+     * @return a function that always returns its input endpoint
+     */
+    static EndpointResolver identity() {
+        return endpoint -> endpoint;
+    }
+
+    /**
+     * Resolve <tt>endpoint</tt> to another endpoint.
+     *
+     * @param endpoint endpoint to resolve
+     * @return the resolved endpoint.
+     */
+    Endpoint resolve(Endpoint endpoint);
+
+}
diff --git a/stream/pom.xml b/stream/pom.xml
index 208e6d0a1..f110d5201 100644
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -38,7 +38,6 @@
     <module>storage</module>
     <module>server</module>
     <module>cli</module>
-    <module>tests</module>
   </modules>
 
   <build>
diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
index 8c886ea7f..340a73197 100644
--- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
+++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
@@ -31,8 +31,16 @@
 /**
  * Placement policy to place ranges to group.
  */
+@FunctionalInterface
 public interface StorageContainerPlacementPolicy {
 
+    @FunctionalInterface
+    interface Factory {
+
+        StorageContainerPlacementPolicy newPlacementPolicy();
+
+    }
+
     long placeStreamRange(long streamId, long rangeId);
 
 }
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 11bc8af7d..ff0cda249 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -187,8 +187,7 @@ private LifecycleComponent startServer() throws Exception {
                     bookiePort, grpcPort, bkDir, rangesStoreDir, spec.serveReadOnlyTable);
                 server = StorageServer.buildStorageServer(
                     serverConf,
-                    grpcPort,
-                    spec.numServers() * 2);
+                    grpcPort);
                 server.start();
                 log.info("Started storage server at (bookie port = {}, grpc port = {})",
                     bookiePort, grpcPort);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index e0b87ef99..94c858787 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -55,6 +55,7 @@
 import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
 import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
 import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactoryImpl;
 import org.apache.commons.configuration.CompositeConfiguration;
@@ -143,8 +144,7 @@ static int doMain(String[] args) {
         try {
             storageServer = buildStorageServer(
                 conf,
-                grpcPort,
-                1024);
+                grpcPort);
         } catch (ConfigurationException e) {
             log.error("Invalid storage configuration", e);
             return ExitCode.INVALID_CONF.code();
@@ -168,15 +168,13 @@ static int doMain(String[] args) {
     }
 
     public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
-                                                        int grpcPort,
-                                                        int numStorageContainers)
+                                                        int grpcPort)
             throws UnknownHostException, ConfigurationException {
-        return buildStorageServer(conf, grpcPort, numStorageContainers, true, NullStatsLogger.INSTANCE);
+        return buildStorageServer(conf, grpcPort, true, NullStatsLogger.INSTANCE);
     }
 
     public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
                                                         int grpcPort,
-                                                        int numStorageContainers,
                                                         boolean startBookieAndStartProvider,
                                                         StatsLogger externalStatsLogger)
         throws ConfigurationException, UnknownHostException {
@@ -250,12 +248,21 @@ public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
             .withStorageConfiguration(storageConf)
             // the storage resources shared across multiple components
             .withStorageResources(storageResources)
-            // the number of storage containers
-            .withNumStorageContainers(numStorageContainers)
+            // the placement policy
+            .withStorageContainerPlacementPolicyFactory(() -> {
+                long numStorageContainers;
+                try (ZkClusterMetadataStore store = new ZkClusterMetadataStore(
+                    curatorProviderService.get(),
+                    ZKMetadataDriverBase.resolveZkServers(bkServerConf),
+                    ZK_METADATA_ROOT_PATH)) {
+                    numStorageContainers = store.getClusterMetadata().getNumStorageContainers();
+                }
+                return StorageContainerPlacementPolicyImpl.of((int) numStorageContainers);
+            })
             // the default log backend uri
             .withDefaultBackendUri(dlNamespaceProvider.getDlogUri())
             // with zk-based storage container manager
-            .withStorageContainerManagerFactory((ignored, storeConf, registry) ->
+            .withStorageContainerManagerFactory((storeConf, registry) ->
                 new ZkStorageContainerManager(
                     myEndpoint,
                     storageConf,
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
index 5e6b0b51b..6cf73b07d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
@@ -42,7 +42,6 @@ public StreamStorageLifecycleComponent(BookieConfiguration conf, StatsLogger sta
         this.streamStorage = StorageServer.buildStorageServer(
             conf.getUnderlyingConf(),
             ssConf.getGrpcPort(),
-            1024, /* indicator */
             false,
             statsLogger.scope("stream"));
     }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
index daa130b75..b89a4657d 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
@@ -24,13 +24,11 @@
     /**
      * Create a storage container manager to manage lifecycles of {@link StorageContainer}.
      *
-     * @param numStorageContainers num of storage containers.
      * @param conf                 storage configuration
      * @param registry             storage container registry
      * @return storage container manager.
      */
-    StorageContainerManager create(int numStorageContainers,
-                                   StorageConfiguration conf,
+    StorageContainerManager create(StorageConfiguration conf,
                                    StorageContainerRegistry registry);
 
 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
index 1bd551056..c03b374ed 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
@@ -19,10 +19,12 @@
 import java.net.URI;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
 import org.apache.bookkeeper.stream.storage.api.RangeStore;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
+import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
 
 /**
@@ -38,8 +40,9 @@ public static RangeStoreBuilder newBuilder() {
     private StorageConfiguration storeConf = null;
     private StorageResources storeResources = null;
     private StorageContainerManagerFactory scmFactory = null;
+    private StorageContainerPlacementPolicy.Factory placementPolicyFactory = () ->
+        StorageContainerPlacementPolicyImpl.of(1024);
     private MVCCStoreFactory mvccStoreFactory = null;
-    private int numStorageContainers = 1024;
     private URI defaultBackendUri = null;
 
     private RangeStoreBuilder() {
@@ -52,7 +55,19 @@ private RangeStoreBuilder() {
      * @return range store builder
      */
     public RangeStoreBuilder withNumStorageContainers(int numStorageContainers) {
-        this.numStorageContainers = numStorageContainers;
+        this.placementPolicyFactory = () -> StorageContainerPlacementPolicyImpl.of(numStorageContainers);
+        return this;
+    }
+
+    /**
+     * Build the range store with the provided <tt>placementPolicyFactory</tt>.
+     *
+     * @param placementPolicyFactory placement policy factor to create placement policies.
+     * @return range store builder.
+     */
+    public RangeStoreBuilder withStorageContainerPlacementPolicyFactory(
+        StorageContainerPlacementPolicy.Factory placementPolicyFactory) {
+        this.placementPolicyFactory = placementPolicyFactory;
         return this;
     }
 
@@ -130,6 +145,7 @@ public RangeStore build() {
         checkNotNull(storeConf, "StorageConfiguration is not provided");
         checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided");
         checkNotNull(defaultBackendUri, "Default backend uri is not provided");
+        checkNotNull(placementPolicyFactory, "Storage Container Placement Policy Factory is not provided");
 
         return new RangeStoreImpl(
             storeConf,
@@ -137,7 +153,7 @@ public RangeStore build() {
             scmFactory,
             mvccStoreFactory,
             defaultBackendUri,
-            numStorageContainers,
+            placementPolicyFactory,
             statsLogger);
     }
 
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
index 42fe40bf0..f175a55da 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
@@ -48,7 +48,6 @@
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
-import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
 
@@ -71,15 +70,14 @@ public RangeStoreImpl(StorageConfiguration conf,
                           StorageContainerManagerFactory factory,
                           MVCCStoreFactory mvccStoreFactory,
                           URI defaultBackendUri,
-                          int numStorageContainers,
+                          StorageContainerPlacementPolicy.Factory placementPolicyFactory,
                           StatsLogger statsLogger) {
         super("range-service", conf, statsLogger);
         this.schedulerResource = schedulerResource;
         this.scheduler = SharedResourceManager.shared().get(schedulerResource);
         this.scmFactory = factory;
-        StorageContainerPlacementPolicy placementPolicy =
-            StorageContainerPlacementPolicyImpl.of(numStorageContainers);
         this.storeFactory = mvccStoreFactory;
+        StorageContainerPlacementPolicy placementPolicy = placementPolicyFactory.newPlacementPolicy();
         this.scRegistry = new StorageContainerRegistryImpl(
             new DefaultStorageContainerFactory(
                 conf,
@@ -88,7 +86,7 @@ public RangeStoreImpl(StorageConfiguration conf,
                 storeFactory,
                 defaultBackendUri),
             scheduler);
-        this.scManager = scmFactory.create(numStorageContainers, conf, scRegistry);
+        this.scManager = scmFactory.create(conf, scRegistry);
     }
 
     @Override
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
index 1befe8272..ac4bb78ac 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
@@ -211,7 +211,7 @@ public void setUp() throws Exception {
         rangeStore = (RangeStoreImpl) RangeStoreBuilder.newBuilder()
             .withStorageConfiguration(storageConf)
             .withStorageResources(storageResources)
-            .withStorageContainerManagerFactory((numScs, storeConf, rgRegistry)
+            .withStorageContainerManagerFactory((storeConf, rgRegistry)
                 -> new LocalStorageContainerManager(endpoint, storeConf, rgRegistry, 2))
             .withRangeStoreFactory(storeFactory)
             .withDefaultBackendUri(URI.create("distributedlog://127.0.0.1/stream/storage"))
diff --git a/stream/tests/integration/pom.xml b/stream/tests/integration/pom.xml
deleted file mode 100644
index 77e697aac..000000000
--- a/stream/tests/integration/pom.xml
+++ /dev/null
@@ -1,111 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.bookkeeper.tests</groupId>
-    <artifactId>stream-storage-tests-parent</artifactId>
-    <version>4.8.0-SNAPSHOT</version>
-  </parent>
-  <artifactId>stream-storage-integration-test</artifactId>
-  <name>Apache BookKeeper :: Stream Storage :: Tests :: Integration</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>stream-storage-java-client</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>stream-storage-server</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.distributedlog</groupId>
-      <artifactId>distributedlog-core</artifactId>
-      <version>${project.parent.version}</version>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-        <version>${maven-jar-plugin.version}</version>
-        <executions>
-          <execution>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <version>${maven-surefire-plugin.version}</version>
-        <configuration>
-          <!-- only run tests when -DstreamIntegrationTests is specified //-->
-          <skipTests>true</skipTests>
-          <redirectTestOutputToFile>true</redirectTestOutputToFile>
-          <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G</argLine>
-          <forkMode>always</forkMode>
-          <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
-  <profiles>
-    <profile>
-      <id>streamIntegrationTests</id>
-      <activation>
-        <property>
-          <name>streamIntegrationTests</name>
-        </property>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <skipTests>false</skipTests>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-deploy-plugin</artifactId>
-            <version>${maven-deploy-plugin.version}</version>
-            <configuration>
-              <skip>true</skip>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-</project>
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
deleted file mode 100644
index ebe99f04e..000000000
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.tests.integration;
-
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_RETENTION_POLICY;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SEGMENT_ROLLING_POLICY;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SPLIT_POLICY;
-import static org.junit.Assert.assertEquals;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.api.StorageClient;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
-import org.apache.bookkeeper.clients.admin.StorageAdminClient;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
-import org.apache.bookkeeper.stream.proto.RangeKeyType;
-import org.apache.bookkeeper.stream.proto.StreamConfiguration;
-import org.apache.bookkeeper.stream.proto.StreamProperties;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Integration test for stream client test.
- */
-@Slf4j
-public class StorageClientTest extends StorageServerTestBase {
-
-    @Rule
-    public final TestName testName = new TestName();
-
-    private String nsName;
-    private String streamName;
-    private StorageAdminClient adminClient;
-    private StorageClient client;
-    private final StreamConfiguration streamConf = StreamConfiguration.newBuilder()
-        .setKeyType(RangeKeyType.HASH)
-        .setInitialNumRanges(4)
-        .setMinNumRanges(4)
-        .setRetentionPolicy(DEFAULT_RETENTION_POLICY)
-        .setRollingPolicy(DEFAULT_SEGMENT_ROLLING_POLICY)
-        .setSplitPolicy(DEFAULT_SPLIT_POLICY)
-        .build();
-    private final NamespaceConfiguration colConf = NamespaceConfiguration.newBuilder()
-        .setDefaultStreamConf(streamConf)
-        .build();
-
-    @Override
-    protected void doSetup() throws Exception {
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
-        nsName = "test_namespace";
-        FutureUtils.result(
-            adminClient.createNamespace(nsName, colConf));
-        client = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(nsName)
-            .build();
-        streamName = "test_stream";
-        createStream(streamName);
-    }
-
-    @Override
-    protected void doTeardown() throws Exception {
-        if (null != client) {
-            client.closeAsync();
-        }
-        if (null != adminClient) {
-            adminClient.closeAsync();
-        }
-    }
-
-    private void createStream(String streamName) throws Exception {
-        FutureUtils.result(
-            adminClient.createStream(
-                nsName,
-                streamName,
-                streamConf));
-    }
-
-    @Test
-    public void testAdmin() throws Exception {
-        StreamProperties properties =
-            FutureUtils.result(adminClient.getStream(nsName, streamName));
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build()
-            , properties.getStreamConf());
-    }
-
-}
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
deleted file mode 100644
index 21dc004fc..000000000
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.tests.integration;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.stream.cluster.StreamCluster;
-import org.apache.bookkeeper.stream.cluster.StreamClusterSpec;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-
-/**
- * Test Base for Range Server related tests.
- */
-@Slf4j
-public abstract class StorageServerTestBase {
-
-    static {
-        // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words
-        // are disabled by default due to security reasons
-        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
-    }
-
-    @Rule
-    public final TemporaryFolder testDir = new TemporaryFolder();
-
-    protected StreamClusterSpec spec;
-    protected StreamCluster cluster;
-
-    protected StorageServerTestBase() {
-        this(StreamClusterSpec.builder()
-            .baseConf(new CompositeConfiguration())
-            .numServers(3)
-            .build());
-    }
-
-    protected StorageServerTestBase(StreamClusterSpec spec) {
-        this.spec = spec;
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        spec = spec.storageRootDir(testDir.newFolder("tests"));
-        this.cluster = StreamCluster.build(spec);
-        this.cluster.start();
-        doSetup();
-    }
-
-    protected abstract void doSetup() throws Exception;
-
-    @After
-    public void tearDown() throws Exception {
-        doTeardown();
-        if (null != this.cluster) {
-            this.cluster.stop();
-        }
-    }
-
-    protected abstract void doTeardown() throws Exception;
-
-}
diff --git a/stream/tests/integration/src/test/resources/log4j.properties b/stream/tests/integration/src/test/resources/log4j.properties
deleted file mode 100644
index 614fe7558..000000000
--- a/stream/tests/integration/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,55 +0,0 @@
-#/**
-# * Copyright 2007 The Apache Software Foundation
-# *
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-#
-# DisributedLog Logging Configuration
-#
-
-# Example with rolling log file
-log4j.rootLogger=INFO, CONSOLE
-
-#disable zookeeper logging
-log4j.logger.org.apache.zookeeper=OFF
-#Set the bookkeeper level to warning
-log4j.logger.org.apache.bookkeeper=INFO
-#disable helix logging
-log4j.logger.org.apache.helix=OFF
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.Threshold=DEBUG
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
-
-# Add ROLLINGFILE to rootLogger to get log file output
-#    Log DEBUG level and above messages to a log file
-#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
-#log4j.appender.ROLLINGFILE.Threshold=INFO
-#log4j.appender.ROLLINGFILE.File=stream.log
-#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
-#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm
-#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
-
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.Threshold=TRACE
-log4j.appender.R.File=target/error.log
-log4j.appender.R.MaxFileSize=200MB
-log4j.appender.R.MaxBackupIndex=7
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
diff --git a/stream/tests/pom.xml b/stream/tests/pom.xml
deleted file mode 100644
index 0f6029b4b..000000000
--- a/stream/tests/pom.xml
+++ /dev/null
@@ -1,33 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <packaging>pom</packaging>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.bookkeeper</groupId>
-    <artifactId>stream-storage-parent</artifactId>
-    <version>4.8.0-SNAPSHOT</version>
-  </parent>
-  <groupId>org.apache.bookkeeper.tests</groupId>
-  <artifactId>stream-storage-tests-parent</artifactId>
-  <name>Apache BookKeeper :: Stream Storage :: Tests</name>
-  <modules>
-    <module>integration</module>
-  </modules>
-</project>
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
index 42b428e37..92e39c3e9 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
@@ -52,12 +52,8 @@ public void setup() {
             .name("location-client-test")
             .numThreads(1)
             .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(getExsternalStreamEndpoints().toArray(new Endpoint[getNumBookies()]))
-            .usePlaintext(true)
-            .build();
         client = new LocationClientImpl(
-            settings,
+            newStorageClientSettings(),
             scheduler);
     }
 
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java
similarity index 86%
rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java
index 03edfb8df..569cf7c95 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 import static org.junit.Assert.assertEquals;
@@ -39,6 +39,8 @@
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -46,37 +48,23 @@
 /**
  * Integration test for stream admin client test.
  */
-public class StorageAdminClientTest extends StorageServerTestBase {
+public class StorageAdminClientTest extends StreamClusterTestBase {
 
     @Rule
     public final TestName testName = new TestName();
 
-    private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
 
-    @Override
-    protected void doSetup() throws Exception {
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("admin-client-test")
-            .numThreads(1)
-            .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
+    @Before
+    public void setup() {
+        adminClient = createStorageAdminClient(newStorageClientSettings());
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != adminClient) {
             adminClient.close();
         }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
     }
 
     @Test
@@ -152,11 +140,6 @@ public void testStreamAPI() throws Exception {
             .build();
         StreamProperties streamProps = FutureUtils.result(adminClient.createStream(nsName, streamName, streamConf));
         assertEquals(streamName, streamProps.getStreamName());
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build(),
-            streamProps.getStreamConf());
 
         // create a duplicated stream
         try {
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
index b86cc7260..fe7c91431 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -21,6 +21,10 @@
 import java.util.List;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.admin.StorageAdminClient;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.utils.NetUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase;
@@ -69,5 +73,35 @@ protected static int getNumBookies() {
             .collect(Collectors.toList());
     }
 
+    //
+    // Test Util Methods
+    //
+
+    protected static StorageClientSettings newStorageClientSettings() {
+        return StorageClientSettings.newBuilder()
+            .addEndpoints(getExsternalStreamEndpoints().toArray(new Endpoint[getNumBookies()]))
+            .endpointResolver(endpoint -> {
+                String internalEndpointStr = NetUtils.endpointToString(endpoint);
+                String externalEndpointStr =
+                    bkCluster.resolveExternalGrpcEndpointStr(internalEndpointStr);
+                log.info("Resolve endpoint {} to {}", internalEndpointStr, externalEndpointStr);
+                return NetUtils.parseEndpoint(externalEndpointStr);
+            })
+            .usePlaintext(true)
+            .build();
+    }
+
+    protected static StorageAdminClient createStorageAdminClient(StorageClientSettings settings) {
+        return StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .buildAdmin();
+    }
+
+    protected static StorageClient createStorageClient(StorageClientSettings settings, String namespace) {
+        return StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .withNamespace(namespace)
+            .build();
+    }
 
 }
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
similarity index 85%
rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
index 7add6d72d..d1ff09154 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
@@ -38,15 +38,14 @@
 import org.apache.bookkeeper.api.kv.exceptions.KvApiException;
 import org.apache.bookkeeper.api.kv.result.Code;
 import org.apache.bookkeeper.api.kv.result.KeyValue;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -55,47 +54,31 @@
  * Integration test for table service.
  */
 @Slf4j
-public class TableClientSimpleTest extends StorageServerTestBase {
+public class TableClientSimpleTest extends StreamClusterTestBase {
 
     @Rule
     public final TestName testName = new TestName();
 
     private final String namespace = "test_namespace";
-    private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
     private StorageClient storageClient;
 
-    @Override
-    protected void doSetup() throws Exception {
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("table-client-test")
-            .numThreads(1)
-            .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
+    @Before
+    public void setup() {
+        StorageClientSettings settings = newStorageClientSettings();
         String namespace = "test_namespace";
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
-        storageClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(namespace)
-            .build();
+        adminClient = createStorageAdminClient(settings);
+        storageClient = createStorageClient(settings, namespace);
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != adminClient) {
             adminClient.close();
         }
         if (null != storageClient) {
             storageClient.close();
         }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
     }
 
     private static ByteBuf getLKey(int i) {
@@ -123,11 +106,6 @@ public void testTableSimpleAPI() throws Exception {
         StreamProperties streamProps = result(
             adminClient.createStream(namespace, streamName, streamConf));
         assertEquals(streamName, streamProps.getStreamName());
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build(),
-            streamProps.getStreamConf());
 
         // Open the table
         PTable<ByteBuf, ByteBuf> table = result(storageClient.openPTable(streamName));
diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
similarity index 89%
rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
index 7db265b50..0dff7fa3a 100644
--- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
@@ -47,16 +47,15 @@
 import org.apache.bookkeeper.api.kv.result.RangeResult;
 import org.apache.bookkeeper.api.kv.result.Result;
 import org.apache.bookkeeper.api.kv.result.TxnResult;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -65,48 +64,32 @@
  * Integration test for table service.
  */
 @Slf4j
-public class TableClientTest extends StorageServerTestBase {
+public class TableClientTest extends StreamClusterTestBase {
 
     @Rule
     public final TestName testName = new TestName();
 
     private final String namespace = "test_namespace";
-    private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
     private StorageClient storageClient;
     private final OptionFactory<ByteBuf> optionFactory = new OptionFactoryImpl<>();
 
-    @Override
-    protected void doSetup() throws Exception {
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("table-client-test")
-            .numThreads(1)
-            .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
+    @Before
+    public void setup() {
+        StorageClientSettings settings = newStorageClientSettings();
         String namespace = "test_namespace";
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
-        storageClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(namespace)
-            .build();
+        adminClient = createStorageAdminClient(settings);
+        storageClient = createStorageClient(settings, namespace);
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != adminClient) {
             adminClient.close();
         }
         if (null != storageClient) {
             storageClient.close();
         }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
     }
 
     private static ByteBuf getLKey(int i) {
@@ -134,11 +117,6 @@ public void testTableAPI() throws Exception {
         StreamProperties streamProps = FutureUtils.result(
             adminClient.createStream(namespace, streamName, streamConf));
         assertEquals(streamName, streamProps.getStreamName());
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                .setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build(),
-            streamProps.getStreamConf());
 
         // Open the table
         PTable<ByteBuf, ByteBuf> table = FutureUtils.result(storageClient.openPTable(streamName));
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
index 80ae90690..9a7cd8674 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
@@ -18,6 +18,8 @@
 
 package org.apache.bookkeeper.tests.integration.topologies;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -60,6 +62,7 @@ public static BKCluster forSpec(BKClusterSpec spec) {
     private final Network network;
     private final MetadataStoreContainer metadataContainer;
     private final Map<String, BookieContainer> bookieContainers;
+    private final Map<String, String> internalEndpointsToExternalEndpoints;
     private final int numBookies;
     private final String extraServerComponents;
     private volatile boolean enableContainerLog;
@@ -72,6 +75,7 @@ private BKCluster(BKClusterSpec spec) {
             .withNetwork(network)
             .withNetworkAliases(ZKContainer.HOST_NAME);
         this.bookieContainers = Maps.newTreeMap();
+        this.internalEndpointsToExternalEndpoints = Maps.newConcurrentMap();
         this.numBookies = spec.numBookies();
         this.extraServerComponents = spec.extraServerComponents();
         this.enableContainerLog = spec.enableContainerLog();
@@ -114,6 +118,13 @@ public void start() throws Exception {
         createBookies("bookie", numBookies);
     }
 
+    public String resolveExternalGrpcEndpointStr(String internalGrpcEndpointStr) {
+        String externalGrpcEndpointStr = internalEndpointsToExternalEndpoints.get(internalGrpcEndpointStr);
+        checkNotNull(externalGrpcEndpointStr,
+            "No internal grpc endpoint is found : " + internalGrpcEndpointStr);
+        return externalGrpcEndpointStr;
+    }
+
     public void stop() {
         synchronized (this) {
             bookieContainers.values().forEach(BookieContainer::stop);
@@ -155,6 +166,7 @@ public BookieContainer killBookie(String bookieName) {
         synchronized (this) {
             container = bookieContainers.remove(bookieName);
             if (null != container) {
+                internalEndpointsToExternalEndpoints.remove(container.getInternalGrpcEndpointStr());
                 container.stop();
             }
         }
@@ -175,8 +187,6 @@ public BookieContainer createBookie(String bookieName) {
                 if (enableContainerLog) {
                     container.tailContainerLog();
                 }
-
-                log.info("Created bookie {}", bookieName);
                 bookieContainers.put(bookieName, container);
             }
         }
@@ -184,6 +194,11 @@ public BookieContainer createBookie(String bookieName) {
         if (shouldStart) {
             log.info("Starting bookie {}", bookieName);
             container.start();
+            log.info("Started bookie {} : internal endpoint = {}, external endpoint = {}",
+                bookieName, container.getInternalGrpcEndpointStr(), container.getExternalGrpcEndpointStr());
+            internalEndpointsToExternalEndpoints.put(
+                container.getInternalGrpcEndpointStr(),
+                container.getExternalGrpcEndpointStr());
         }
         return container;
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services