You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 20:52:37 UTC

[bookkeeper] 04/06: Add a docker based `BookKeeperClusterTestBase` for failure related integration tests

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 272b8e78303b544436fde98a4a2350137fede521
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri May 18 12:49:31 2018 -0700

    Add a docker based `BookKeeperClusterTestBase` for failure related integration tests
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    Currently we don't have any failure related testing for table service. Since we are using docker as the integration testing infrastructure,
    It is better to use container for those failure testings, rather than going down the path as what we did before.
    
    *Solution*
    
    This change provides the basic test base for bookkeeper cluster using dockers. `BookKeeperClusterTestBase` provides the similar functionalities
    to start/stop bookies as what we did in the unit test.
    
    `bookkeeper/tests/containers` in `integration-tests-topologies` provides all the basic containers used for testing.
    `tests/integration/cluster` and `tests/integration/topologies` provides the test base for writing tests using dockerized bookkeeper cluster.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>
    
    This closes #1412 from sijie/docker_cluster_dev
---
 conf/bk_server.conf                                |   7 +-
 .../docker-images/current-version-image/Dockerfile |   2 +-
 tests/integration-tests-topologies/pom.xml         |   5 +
 .../tests/containers/BKStandaloneContainer.java    |  50 +----
 .../tests/containers/BookieContainer.java          | 116 +++++++++++
 ...tandaloneContainer.java => ChaosContainer.java} |  93 +++------
 .../tests/containers/MetadataStoreContainer.java   |  33 ++++
 .../bookkeeper/tests/containers/ZKContainer.java   |  62 ++++++
 .../tests/containers/wait/HttpWaitStrategy.java    | 215 +++++++++++++++++++++
 .../tests/containers/wait/ZKWaitStrategy.java      |  63 ++++++
 .../bookkeeper/tests/BookKeeperClusterUtils.java   |   5 +-
 tests/integration/cluster/pom.xml                  |  70 +++++++
 .../cluster/BookKeeperClusterTestBase.java         |  94 +++++++++
 .../integration/cluster/SimpleClusterTest.java     |  78 ++++++++
 .../tests/integration/topologies/BKCluster.java    | 156 +++++++++++++++
 .../cluster/src/test/resources/log4j.properties    |  42 ++++
 tests/integration/pom.xml                          |   1 +
 17 files changed, 981 insertions(+), 111 deletions(-)

diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 946e6af..2080b68 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -191,10 +191,13 @@ bookiePort=3181
 #############################################################################
 
 # The flag enables/disables starting the admin http server. Default value is 'false'.
-# httpServerEnabled=false
+httpServerEnabled=false
 
 # The http server port to listen on. Default value is 8080.
-# httpServerPort=8080
+httpServerPort=8080
+
+# The http server class
+httpServerClass=org.apache.bookkeeper.http.vertx.VertxHttpServer
 
 ############################################## Security ##############################################
 
diff --git a/tests/docker-images/current-version-image/Dockerfile b/tests/docker-images/current-version-image/Dockerfile
index 285ecf6..4006daf 100644
--- a/tests/docker-images/current-version-image/Dockerfile
+++ b/tests/docker-images/current-version-image/Dockerfile
@@ -45,6 +45,6 @@ WORKDIR /opt/bookkeeper
 COPY target/scripts/apply-config-from-env.py target/scripts/entrypoint.sh target/scripts/healthcheck.sh /opt/bookkeeper/
 
 ENTRYPOINT [ "/bin/bash", "/opt/bookkeeper/entrypoint.sh" ]
-CMD ["/opt/bookkeepr/bin/bookkeeper", "bookie"]
+CMD ["/opt/bookkeeper/bin/bookkeeper", "bookie"]
 
 HEALTHCHECK --interval=10s --timeout=60s CMD /bin/bash /opt/bookkeeper/healthcheck.sh
diff --git a/tests/integration-tests-topologies/pom.xml b/tests/integration-tests-topologies/pom.xml
index 000059f..b5353eb 100644
--- a/tests/integration-tests-topologies/pom.xml
+++ b/tests/integration-tests-topologies/pom.xml
@@ -39,6 +39,11 @@
       <artifactId>junit</artifactId>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper.tests</groupId>
+      <artifactId>integration-tests-utils</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BKStandaloneContainer.java b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BKStandaloneContainer.java
index 6ced634..6efec4c 100644
--- a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BKStandaloneContainer.java
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BKStandaloneContainer.java
@@ -18,25 +18,18 @@
 
 package org.apache.bookkeeper.tests.containers;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.time.temporal.ChronoUnit.SECONDS;
 
-import com.github.dockerjava.api.command.LogContainerCmd;
-import com.github.dockerjava.api.model.Frame;
-import com.github.dockerjava.core.command.LogContainerResultCallback;
 import java.time.Duration;
 import java.util.Objects;
-import lombok.EqualsAndHashCode;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
-import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
 
 /**
  * Test Container for Bookies.
  */
 @Slf4j
-public class BKStandaloneContainer<SELF extends BKStandaloneContainer<SELF>> extends GenericContainer<SELF> {
+public class BKStandaloneContainer<SELF extends BKStandaloneContainer<SELF>> extends ChaosContainer<SELF> {
 
     private static final int ZK_PORT = 2181;
     private static final int BOOKIE_BASE_PORT = 3181;
@@ -44,52 +37,17 @@ public class BKStandaloneContainer<SELF extends BKStandaloneContainer<SELF>> ext
     private static final String IMAGE_NAME = "apachebookkeeper/bookkeeper-current:latest";
 
     private static final String STANDALONE_HOST_NAME = "standalone";
-    private static final String CONTAINER_NAME_BASE = "bk-standalone-test";
 
-    private final String containerName;
     private final int numBookies;
 
-    public BKStandaloneContainer(String containerName, int numBookies) {
-        super(IMAGE_NAME);
-        this.containerName = containerName;
+    public BKStandaloneContainer(String clusterName, int numBookies) {
+        super(clusterName, IMAGE_NAME);
         this.numBookies = numBookies;
     }
 
     @Override
     public String getContainerName() {
-        return CONTAINER_NAME_BASE + "-" + containerName + "-" + numBookies + "-bookies-" + System.currentTimeMillis();
-    }
-
-    public String getContainerLog() {
-        StringBuilder sb = new StringBuilder();
-
-        LogContainerCmd logContainerCmd = this.dockerClient.logContainerCmd(containerId);
-        logContainerCmd.withStdOut(true).withStdErr(true);
-        try {
-            logContainerCmd.exec(new LogContainerResultCallback() {
-                @Override
-                public void onNext(Frame item) {
-                    sb.append(new String(item.getPayload(), UTF_8));
-                }
-            }).awaitCompletion();
-        } catch (InterruptedException e) {
-
-        }
-        return sb.toString();
-    }
-
-    public ExecResult execCmd(String... cmd) throws Exception {
-        String cmdString = StringUtils.join(cmd, " ");
-
-        log.info("DOCKER.exec({}:{}): Executing ...", containerId, cmdString);
-
-        ExecResult result = execInContainer(cmd);
-
-        log.info("Docker.exec({}:{}): Done", containerId, cmdString);
-        log.info("Docker.exec({}:{}): Stdout -\n{}", containerId, cmdString, result.getStdout());
-        log.info("Docker.exec({}:{}): Stderr -\n{}", containerId, cmdString, result.getStderr());
-
-        return result;
+        return clusterName + "-standalone-" + numBookies + "-bookies";
     }
 
     @Override
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
new file mode 100644
index 0000000..874ec2a
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BookieContainer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.containers;
+
+import static java.time.temporal.ChronoUnit.SECONDS;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.tests.containers.wait.HttpWaitStrategy;
+
+/**
+ * Test Container for Bookies.
+ */
+@Slf4j
+public class BookieContainer<SELF extends BookieContainer<SELF>> extends ChaosContainer<SELF> {
+
+    private static final int BOOKIE_PORT = 3181;
+    private static final int BOOKIE_GRPC_PORT = 4181; // stream storage grpc port
+    private static final int BOOKIE_HTTP_PORT = 8080;
+
+    private static final String IMAGE_NAME = "apachebookkeeper/bookkeeper-current:latest";
+
+    private final String hostname;
+    private final String metadataServiceUri;
+
+    public BookieContainer(String clusterName,
+                           String hostname,
+                           String metadataServiceUri) {
+        super(clusterName, IMAGE_NAME);
+        this.hostname = hostname;
+        this.metadataServiceUri = metadataServiceUri;
+    }
+
+    @Override
+    public String getContainerName() {
+        return clusterName + "-" + hostname;
+    }
+
+    @Override
+    protected void configure() {
+        addExposedPorts(
+            BOOKIE_PORT,
+            BOOKIE_GRPC_PORT,
+            BOOKIE_HTTP_PORT
+        );
+        addEnv("BK_httpServerEnabled", "true");
+        addEnv("BK_httpServerPort", "" + BOOKIE_HTTP_PORT);
+        addEnv("BK_metadataServiceUri", metadataServiceUri);
+        addEnv("BK_useHostNameAsBookieID", "true");
+        if (metadataServiceUri.toLowerCase().startsWith("zk")) {
+            URI uri = URI.create(metadataServiceUri);
+            addEnv("BK_zkServers", uri.getAuthority());
+            addEnv("BK_zkLedgersRootPath", uri.getPath());
+        }
+    }
+
+    @Override
+    public void start() {
+        this.waitStrategy = new HttpWaitStrategy()
+            .forPath("/heartbeat")
+            .forStatusCode(200)
+            .forPort(BOOKIE_HTTP_PORT)
+            .withStartupTimeout(Duration.of(60, SECONDS));
+        this.withCreateContainerCmdModifier(createContainerCmd -> {
+            createContainerCmd.withHostName(hostname);
+            createContainerCmd.withName(getContainerName());
+        });
+
+        super.start();
+        log.info("Started bookie {} at cluster {}", hostname, clusterName);
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+        log.info("Stopped bookie {} at cluster {}", hostname, clusterName);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof BookieContainer)) {
+            return false;
+        }
+
+        BookieContainer another = (BookieContainer) o;
+        return hostname.equals(another.hostname)
+            && metadataServiceUri.equals(another.metadataServiceUri)
+            && super.equals(another);
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 * super.hashCode() + Objects.hash(
+            hostname,
+            metadataServiceUri);
+    }
+
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BKStandaloneContainer.java b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ChaosContainer.java
similarity index 52%
copy from tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BKStandaloneContainer.java
copy to tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ChaosContainer.java
index 6ced634..33530f1 100644
--- a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/BKStandaloneContainer.java
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ChaosContainer.java
@@ -19,45 +19,49 @@
 package org.apache.bookkeeper.tests.containers;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.time.temporal.ChronoUnit.SECONDS;
 
 import com.github.dockerjava.api.command.LogContainerCmd;
 import com.github.dockerjava.api.model.Frame;
 import com.github.dockerjava.core.command.LogContainerResultCallback;
-import java.time.Duration;
 import java.util.Objects;
-import lombok.EqualsAndHashCode;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
 
 /**
- * Test Container for Bookies.
+ * A base container provides chaos capability.
  */
 @Slf4j
-public class BKStandaloneContainer<SELF extends BKStandaloneContainer<SELF>> extends GenericContainer<SELF> {
+public class ChaosContainer<SELF extends ChaosContainer<SELF>> extends GenericContainer<SELF> {
 
-    private static final int ZK_PORT = 2181;
-    private static final int BOOKIE_BASE_PORT = 3181;
+    protected final String clusterName;
 
-    private static final String IMAGE_NAME = "apachebookkeeper/bookkeeper-current:latest";
-
-    private static final String STANDALONE_HOST_NAME = "standalone";
-    private static final String CONTAINER_NAME_BASE = "bk-standalone-test";
-
-    private final String containerName;
-    private final int numBookies;
-
-    public BKStandaloneContainer(String containerName, int numBookies) {
-        super(IMAGE_NAME);
-        this.containerName = containerName;
-        this.numBookies = numBookies;
+    protected ChaosContainer(String clusterName, String image) {
+        super(image);
+        this.clusterName = clusterName;
     }
 
-    @Override
-    public String getContainerName() {
-        return CONTAINER_NAME_BASE + "-" + containerName + "-" + numBookies + "-bookies-" + System.currentTimeMillis();
+    public void tailContainerLog() {
+        CompletableFuture.runAsync(() -> {
+            while(null == containerId) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(100);
+                } catch (InterruptedException e) {
+                    return;
+                }
+            }
+
+            LogContainerCmd logContainerCmd = this.dockerClient.logContainerCmd(containerId);
+            logContainerCmd.withStdOut(true).withStdErr(true).withFollowStream(true);
+            logContainerCmd.exec(new LogContainerResultCallback() {
+                @Override
+                public void onNext(Frame item) {
+                    log.info(new String(item.getPayload(), UTF_8));
+                }
+            });
+        });
     }
 
     public String getContainerLog() {
@@ -93,53 +97,20 @@ public class BKStandaloneContainer<SELF extends BKStandaloneContainer<SELF>> ext
     }
 
     @Override
-    protected void configure() {
-        addExposedPorts(
-            ZK_PORT
-        );
-        for (int i = 0; i < numBookies; i++) {
-            addExposedPort(BOOKIE_BASE_PORT + i);
-        }
-        setCommand(
-            "/opt/bookkeeper/bin/bookkeeper",
-            "localbookie",
-            "" + numBookies
-        );
-        addEnv("JAVA_HOME", "/usr/lib/jvm/jre-1.8.0");
-    }
-
-    @Override
-    public void start() {
-        this.waitStrategy = new LogMessageWaitStrategy()
-            .withRegEx(".*ForceWrite Thread started.*\\s")
-            .withTimes(numBookies)
-            .withStartupTimeout(Duration.of(60, SECONDS));
-        this.withCreateContainerCmdModifier(createContainerCmd -> {
-            createContainerCmd.withHostName(STANDALONE_HOST_NAME);
-            createContainerCmd.withName(getContainerName());
-            createContainerCmd.withEntrypoint("/bin/bash");
-        });
-
-        super.start();
-        log.info("Start a standalone bookkeeper cluster at container {}", containerName);
-    }
-
-    @Override
     public boolean equals(Object o) {
-        if (!(o instanceof BKStandaloneContainer)) {
+        if (!(o instanceof ChaosContainer)) {
             return false;
         }
 
-        BKStandaloneContainer another = (BKStandaloneContainer) o;
-        return containerName.equals(another.containerName)
-            && numBookies == another.numBookies
+        ChaosContainer another = (ChaosContainer) o;
+        return clusterName.equals(another.clusterName)
             && super.equals(another);
     }
 
     @Override
     public int hashCode() {
         return 31 * super.hashCode() + Objects.hash(
-            containerName,
-            numBookies);
+            clusterName);
     }
+
 }
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/MetadataStoreContainer.java b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/MetadataStoreContainer.java
new file mode 100644
index 0000000..ec8d914
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/MetadataStoreContainer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.containers;
+
+/**
+ * An abstract class for metadata store container.
+ */
+public abstract class MetadataStoreContainer<SELF extends MetadataStoreContainer<SELF>> extends ChaosContainer<SELF> {
+    protected MetadataStoreContainer(String clusterName, String image) {
+        super(clusterName, image);
+    }
+
+    public abstract String getExternalServiceUri();
+
+    public abstract String getInternalServiceUri();
+
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java
new file mode 100644
index 0000000..ba1d384
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.containers;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.tests.containers.wait.ZKWaitStrategy;
+
+@Slf4j
+public class ZKContainer<SELF extends ZKContainer<SELF>> extends MetadataStoreContainer<SELF> {
+
+    private static final int ZK_PORT = 2181;
+
+    private static final String IMAGE_NAME = "zookeeper:3.4.11";
+    public static final String HOST_NAME = "metadata-store";
+    public static final String SERVICE_URI = "zk://" + HOST_NAME + ":" + ZK_PORT + "/ledgers";
+
+    public ZKContainer(String clusterName) {
+        super(clusterName, IMAGE_NAME);
+    }
+
+    @Override
+    public String getExternalServiceUri() {
+        return "zk://" + getContainerIpAddress() + ":" + getMappedPort(ZK_PORT) + "/ledgers";
+    }
+
+    @Override
+    public String getInternalServiceUri() {
+        return SERVICE_URI;
+    }
+
+    @Override
+    protected void configure() {
+        addExposedPort(ZK_PORT);
+    }
+
+    @Override
+    public void start() {
+        this.waitStrategy = new ZKWaitStrategy(ZK_PORT);
+        this.withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd.withHostName(HOST_NAME));
+
+        super.start();
+        log.info("Start a zookeeper server at container {} : external service uri = {}, internal service uri = {}",
+            containerName, getExternalServiceUri(), getInternalServiceUri());
+    }
+
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/wait/HttpWaitStrategy.java b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/wait/HttpWaitStrategy.java
new file mode 100644
index 0000000..e70140a
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/wait/HttpWaitStrategy.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.containers.wait;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess;
+
+import com.google.common.base.Strings;
+import com.google.common.io.BaseEncoding;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import org.rnorth.ducttape.TimeoutException;
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+
+/**
+ * Http Strategy
+ */
+public class HttpWaitStrategy extends AbstractWaitStrategy {
+    @java.lang.SuppressWarnings("all")
+    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(HttpWaitStrategy.class);
+    /**
+     * Authorization HTTP header.
+     */
+    private static final String HEADER_AUTHORIZATION = "Authorization";
+    /**
+     * Basic Authorization scheme prefix.
+     */
+    private static final String AUTH_BASIC = "Basic ";
+    private String path = "/";
+    private int statusCode = HttpURLConnection.HTTP_OK;
+    private boolean tlsEnabled;
+    private String username;
+    private String password;
+    private Predicate<String> responsePredicate;
+    private int port = 80;
+
+    /**
+     * Waits for the given status code.
+     *
+     * @param statusCode the expected status code
+     * @return this
+     */
+    public HttpWaitStrategy forStatusCode(int statusCode) {
+        this.statusCode = statusCode;
+        return this;
+    }
+
+    /**
+     * Waits for the given path.
+     *
+     * @param path the path to check
+     * @return this
+     */
+    public HttpWaitStrategy forPath(String path) {
+        this.path = path;
+        return this;
+    }
+
+    /**
+     * Wait for the given port.
+     *
+     * @param port the given port
+     * @return this
+     */
+    public HttpWaitStrategy forPort(int port) {
+        this.port = port;
+        return this;
+    }
+
+    /**
+     * Indicates that the status check should use HTTPS.
+     *
+     * @return this
+     */
+    public HttpWaitStrategy usingTls() {
+        this.tlsEnabled = true;
+        return this;
+    }
+
+    /**
+     * Authenticate with HTTP Basic Authorization credentials.
+     *
+     * @param username the username
+     * @param password the password
+     * @return this
+     */
+    public HttpWaitStrategy withBasicCredentials(String username, String password) {
+        this.username = username;
+        this.password = password;
+        return this;
+    }
+
+    /**
+     * Waits for the response to pass the given predicate
+     * @param responsePredicate The predicate to test the response against
+     * @return this
+     */
+    public HttpWaitStrategy forResponsePredicate(Predicate<String> responsePredicate) {
+        this.responsePredicate = responsePredicate;
+        return this;
+    }
+
+    @Override
+    protected void waitUntilReady() {
+        final String containerName = waitStrategyTarget.getContainerInfo().getName();
+        final int livenessCheckPort = waitStrategyTarget.getMappedPort(port);
+        final String uri = buildLivenessUri(livenessCheckPort).toString();
+        log.info("{}: Waiting for {} seconds for URL: {}", containerName, startupTimeout.getSeconds(), uri);
+        // try to connect to the URL
+        try {
+            retryUntilSuccess((int) startupTimeout.getSeconds(), TimeUnit.SECONDS, () -> {
+                getRateLimiter().doWhenReady(() -> {
+                    try {
+                        final HttpURLConnection connection = (HttpURLConnection) new URL(uri).openConnection();
+                        // authenticate
+                        if (!Strings.isNullOrEmpty(username)) {
+                            connection.setRequestProperty(HEADER_AUTHORIZATION, buildAuthString(username, password));
+                            connection.setUseCaches(false);
+                        }
+                        connection.setRequestMethod("GET");
+                        connection.connect();
+                        if (statusCode != connection.getResponseCode()) {
+                            throw new RuntimeException(String.format("HTTP response code was: %s",
+                                connection.getResponseCode()));
+                        }
+                        if (responsePredicate != null) {
+                            String responseBody = getResponseBody(connection);
+                            if (!responsePredicate.test(responseBody)) {
+                                throw new RuntimeException(String.format("Response: %s did not match predicate",
+                                    responseBody));
+                            }
+                        }
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+                return true;
+            });
+        } catch (TimeoutException e) {
+            throw new ContainerLaunchException(String.format(
+                "Timed out waiting for URL to be accessible (%s should return HTTP %s)", uri, statusCode));
+        }
+    }
+
+    /**
+     * Build the URI on which to check if the container is ready.
+     *
+     * @param livenessCheckPort the liveness port
+     * @return the liveness URI
+     */
+    private URI buildLivenessUri(int livenessCheckPort) {
+        final String scheme = (tlsEnabled ? "https" : "http") + "://";
+        final String host = waitStrategyTarget.getContainerIpAddress();
+        final String portSuffix;
+        if ((tlsEnabled && 443 == livenessCheckPort) || (!tlsEnabled && 80 == livenessCheckPort)) {
+            portSuffix = "";
+        } else {
+            portSuffix = ":" + String.valueOf(livenessCheckPort);
+        }
+        return URI.create(scheme + host + portSuffix + path);
+    }
+
+    /**
+     * @param username the username
+     * @param password the password
+     * @return a basic authentication string for the given credentials
+     */
+    private String buildAuthString(String username, String password) {
+        return AUTH_BASIC + BaseEncoding.base64().encode((username + ":" + password).getBytes(UTF_8));
+    }
+
+    private String getResponseBody(HttpURLConnection connection) throws IOException {
+        BufferedReader reader = null;
+        try {
+            if (200 <= connection.getResponseCode() && connection.getResponseCode() <= 299) {
+                reader = new BufferedReader(new InputStreamReader((connection.getInputStream()), UTF_8));
+            } else {
+                reader = new BufferedReader(new InputStreamReader((connection.getErrorStream()), UTF_8));
+            }
+            StringBuilder builder = new StringBuilder();
+            String line;
+            while ((line = reader.readLine()) != null) {
+                builder.append(line);
+            }
+            return builder.toString();
+        } finally {
+            if (null != reader) {
+                reader.close();
+            }
+        }
+    }
+}
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/wait/ZKWaitStrategy.java b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/wait/ZKWaitStrategy.java
new file mode 100644
index 0000000..bd3d916
--- /dev/null
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/wait/ZKWaitStrategy.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.containers.wait;
+
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.tests.BookKeeperClusterUtils;
+import org.rnorth.ducttape.TimeoutException;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+
+/**
+ * Wait Strategy until zookeeper container is up.
+ */
+@Slf4j
+public class ZKWaitStrategy extends AbstractWaitStrategy {
+
+    private final int zkPort;
+
+    public ZKWaitStrategy(int zkPort) {
+        this.zkPort = zkPort;
+    }
+
+    @Override
+    protected void waitUntilReady() {
+        String hostname = waitStrategyTarget.getContainerIpAddress();
+        int externalPort = waitStrategyTarget.getMappedPort(zkPort);
+
+        try {
+            Unreliables.retryUntilTrue(
+                (int) startupTimeout.getSeconds(),
+                TimeUnit.SECONDS,
+                () -> getRateLimiter().getWhenReady(
+                    () -> {
+                        log.info("Check if zookeeper is running at {}:{}", hostname, externalPort);
+                        return BookKeeperClusterUtils.zookeeperRunning(
+                            hostname, externalPort
+                        );
+                    }));
+        } catch (TimeoutException te) {
+            throw new ContainerLaunchException(
+                "Timed out waiting for zookeeper to be ready");
+        }
+    }
+
+}
diff --git a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/BookKeeperClusterUtils.java b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/BookKeeperClusterUtils.java
index 3cd370f..7020fff 100644
--- a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/BookKeeperClusterUtils.java
+++ b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/BookKeeperClusterUtils.java
@@ -60,7 +60,10 @@ public class BookKeeperClusterUtils {
 
     public static boolean zookeeperRunning(DockerClient docker, String containerId) {
         String ip = DockerUtils.getContainerIP(docker, containerId);
-        try (Socket socket = new Socket(ip, 2181)) {
+        return zookeeperRunning(ip, 2181);
+    }
+    public static boolean zookeeperRunning(String ip, int port) {
+        try (Socket socket = new Socket(ip, port)) {
             socket.setSoTimeout(1000);
             socket.getOutputStream().write("ruok".getBytes(UTF_8));
             byte[] resp = new byte[4];
diff --git a/tests/integration/cluster/pom.xml b/tests/integration/cluster/pom.xml
new file mode 100644
index 0000000..422269a
--- /dev/null
+++ b/tests/integration/cluster/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://maven.apache.org/POM/4.0.0   http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.bookkeeper.tests.integration</groupId>
+    <artifactId>tests-parent</artifactId>
+    <version>4.8.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.bookkeeper.tests.integration</groupId>
+  <artifactId>cluster</artifactId>
+  <packaging>jar</packaging>
+  <name>Apache BookKeeper :: Tests :: Integration :: Cluster test</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-server</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-server</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.bookkeeper.tests</groupId>
+      <artifactId>integration-tests-topologies</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${maven-surefire-plugin.version}</version>
+        <configuration>
+          <!-- smoke test should never flake //-->
+          <rerunFailingTestsCount>0</rerunFailingTestsCount>
+          <redirectTestOutputToFile>${redirectTestOutputToFile}</redirectTestOutputToFile>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
new file mode 100644
index 0000000..36d5d86
--- /dev/null
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.integration.cluster;
+
+import com.google.common.base.Stopwatch;
+import java.net.URI;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.tests.integration.topologies.BKCluster;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * A docker container based bookkeeper cluster test base, providing similar facility like the one in unit test.
+ */
+@Slf4j
+public abstract class BookKeeperClusterTestBase {
+
+    protected static BKCluster bkCluster;
+    protected static URI metadataServiceUri;
+    protected static MetadataClientDriver metadataClientDriver;
+    protected static ScheduledExecutorService executor;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        bkCluster = new BKCluster(RandomStringUtils.randomAlphabetic(8), 0);
+        bkCluster.start();
+
+        metadataServiceUri = URI.create(bkCluster.getExternalServiceUri());
+        ClientConfiguration conf = new ClientConfiguration()
+            .setMetadataServiceUri(metadataServiceUri.toString());
+        executor = Executors.newSingleThreadScheduledExecutor();
+        metadataClientDriver = MetadataDrivers.getClientDriver(metadataServiceUri);
+        metadataClientDriver.initialize(conf, executor, NullStatsLogger.INSTANCE, Optional.empty());
+    }
+
+    @AfterClass
+    public static void teardownCluster() {
+        if (null != metadataClientDriver) {
+            metadataClientDriver.close();
+        }
+        if (null != executor) {
+            executor.shutdown();
+        }
+        if (null != bkCluster) {
+            bkCluster.stop();
+        }
+    }
+
+    private boolean findIfBookieRegistered(String bookieName) throws Exception {
+        Set<BookieSocketAddress> bookies =
+            FutureUtils.result(metadataClientDriver.getRegistrationClient().getWritableBookies()).getValue();
+        Optional<BookieSocketAddress> registered =
+            bookies.stream().filter(addr -> addr.getHostName().equals(bookieName)).findFirst();
+        return registered.isPresent();
+    }
+
+    protected void waitUntilBookieUnregistered(String bookieName) throws Exception {
+        Stopwatch sw = Stopwatch.createStarted();
+        while (findIfBookieRegistered(bookieName)) {
+            TimeUnit.MILLISECONDS.sleep(1000);
+            log.info("Bookie {} is still registered in cluster {} after {} ms elapsed",
+                bookieName, bkCluster.getClusterName(), sw.elapsed(TimeUnit.MILLISECONDS));
+        }
+    }
+
+}
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/SimpleClusterTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/SimpleClusterTest.java
new file mode 100644
index 0000000..9d09c67
--- /dev/null
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/SimpleClusterTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.integration.cluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.tests.containers.BookieContainer;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+/**
+ * A simple test on bookkeeper cluster operations, e.g. start bookies, stop bookies and list bookies.
+ */
+@Slf4j
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class SimpleClusterTest extends BookKeeperClusterTestBase {
+
+    @Test
+    public void test000_ClusterIsEmpty() throws Exception {
+        Set<BookieSocketAddress> bookies =
+            FutureUtils.result(metadataClientDriver.getRegistrationClient().getWritableBookies()).getValue();
+        assertTrue(bookies.isEmpty());
+    }
+
+    @Test
+    public void test001_StartBookie() throws Exception {
+        String bookieName = "bookie-000";
+        BookieContainer container = bkCluster.createBookie(bookieName);
+        assertNotNull("Container should be started", container);
+        assertEquals(1, bkCluster.getBookieContainers().size());
+        assertSame(container, bkCluster.getBookie(bookieName));
+
+        Set<BookieSocketAddress> bookies =
+            FutureUtils.result(metadataClientDriver.getRegistrationClient().getWritableBookies()).getValue();
+        assertEquals(1, bookies.size());
+    }
+
+    @Test
+    public void test002_StopBookie() throws Exception {
+        String bookieName = "bookie-000";
+        BookieContainer container = bkCluster.killBookie(bookieName);
+        assertNotNull("Bookie '" + bookieName + "' doesn't exist", container);
+        assertEquals(0, bkCluster.getBookieContainers().size());
+        assertNull(bkCluster.getBookie(bookieName));
+
+        waitUntilBookieUnregistered(bookieName);
+
+        Set<BookieSocketAddress> bookies =
+            FutureUtils.result(metadataClientDriver.getRegistrationClient().getWritableBookies()).getValue();
+        assertEquals(0, bookies.size());
+    }
+
+}
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
new file mode 100644
index 0000000..c1e9e84
--- /dev/null
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tests.integration.topologies;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.tests.containers.BookieContainer;
+import org.apache.bookkeeper.tests.containers.MetadataStoreContainer;
+import org.apache.bookkeeper.tests.containers.ZKContainer;
+import org.testcontainers.containers.Network;
+
+/**
+ * BookKeeper Cluster in containers.
+ */
+@Slf4j
+public class BKCluster {
+
+    @Getter
+    private final String clusterName;
+    private final Network network;
+    private final MetadataStoreContainer metadataContainer;
+    private final Map<String, BookieContainer> bookieContainers;
+    private final int numBookies;
+
+    public BKCluster(String clusterName, int numBookies) {
+        this.clusterName = clusterName;
+        this.network = Network.newNetwork();
+        this.metadataContainer = (MetadataStoreContainer) new ZKContainer(clusterName)
+            .withNetwork(network)
+            .withNetworkAliases(ZKContainer.HOST_NAME);
+        this.bookieContainers = Maps.newTreeMap();
+        this.numBookies = numBookies;
+    }
+
+    public String getExternalServiceUri() {
+        return metadataContainer.getExternalServiceUri();
+    }
+
+    public String getInternalServiceUri() {
+        return metadataContainer.getInternalServiceUri();
+    }
+
+    public void start() throws Exception {
+        // start the metadata store
+        this.metadataContainer.start();
+
+        // init a new cluster
+        initNewCluster(metadataContainer.getExternalServiceUri());
+
+        // create bookies
+        createBookies("bookie", numBookies);
+    }
+
+    public void stop() {
+        synchronized (this) {
+            bookieContainers.values().forEach(BookieContainer::stop);
+        }
+
+        this.metadataContainer.stop();
+        try {
+            this.network.close();
+        } catch (Exception e) {
+            log.info("Failed to shutdown network for bookkeeper cluster {}", clusterName, e);
+        }
+    }
+
+    protected void initNewCluster(String metadataServiceUri) throws Exception {
+        MetadataDrivers.runFunctionWithRegistrationManager(
+            new ServerConfiguration().setMetadataServiceUri(metadataServiceUri),
+            rm -> {
+                try {
+                    rm.initNewCluster();
+                } catch (Exception e) {
+                    throw new UncheckedExecutionException("Failed to init a new cluster", e);
+                }
+                return null;
+            }
+        );
+    }
+
+    public synchronized Map<String, BookieContainer> getBookieContainers() {
+        return bookieContainers;
+    }
+
+
+    public synchronized BookieContainer getBookie(String bookieName) {
+        return bookieContainers.get(bookieName);
+    }
+
+    public BookieContainer killBookie(String bookieName) {
+        BookieContainer container;
+        synchronized (this) {
+            container = bookieContainers.remove(bookieName);
+            if (null != container) {
+                container.stop();
+            }
+        }
+        return container;
+    }
+
+    public synchronized BookieContainer createBookie(String bookieName) {
+        BookieContainer container = getBookie(bookieName);
+        if (null == container) {
+            container = (BookieContainer) new BookieContainer(clusterName, bookieName, ZKContainer.SERVICE_URI)
+                .withNetwork(network)
+                .withNetworkAliases(bookieName);
+            container.start();
+            bookieContainers.put(bookieName, container);
+        }
+        return container;
+    }
+
+    public synchronized Map<String, BookieContainer> createBookies(String bookieNamePrefix, int numBookies)
+            throws Exception {
+        List<CompletableFuture<Void>> startFutures = Lists.newArrayListWithExpectedSize(numBookies);
+        Map<String, BookieContainer> containers = Maps.newHashMap();
+        for (int i = 0; i < numBookies; i++) {
+            final int idx = i;
+            startFutures.add(
+                CompletableFuture.runAsync(() -> {
+                    String bookieName = String.format("%s-%03d", bookieNamePrefix, idx);
+                    BookieContainer container = createBookie(bookieName);
+                    synchronized (containers) {
+                        containers.put(bookieName, container);
+                    }
+                }));
+        }
+        FutureUtils.result(FutureUtils.collect(startFutures));
+        return containers;
+    }
+}
diff --git a/tests/integration/cluster/src/test/resources/log4j.properties b/tests/integration/cluster/src/test/resources/log4j.properties
new file mode 100644
index 0000000..10ae6bf
--- /dev/null
+++ b/tests/integration/cluster/src/test/resources/log4j.properties
@@ -0,0 +1,42 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+#
+# Bookkeeper Logging Configuration
+#
+
+# Format is "<default threshold> (, <appender>)+
+
+# DEFAULT: console appender only, level INFO
+bookkeeper.root.logger=INFO,CONSOLE
+log4j.rootLogger=${bookkeeper.root.logger}
+
+#
+# Log INFO level and above messages to the console
+#
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+log4j.logger.org.apache.bookkeeper.bookie=INFO
+log4j.logger.org.apache.bookkeeper.meta=INFO
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index dbd061d..bcf3622 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -30,6 +30,7 @@
   <modules>
     <module>smoke</module>
     <module>standalone</module>
+    <module>cluster</module>
   </modules>
 
   <build>

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.