You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/03 03:03:42 UTC

[pulsar] branch master updated: Package management bookkeeper storage implementation (#8744)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a2b006  Package management bookkeeper storage implementation (#8744)
4a2b006 is described below

commit 4a2b006f854b36f042a845150acebcc3d7a50358
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Dec 3 11:03:23 2020 +0800

    Package management bookkeeper storage implementation (#8744)
    
    Master Issue: #8676
    
    ### Motivation
    
    Support bookkeeper storage for packages management service.
    
    ### Modifications
    
    - Add bookkeeper storage provider
    - Add bookkeeper storage unit tests
---
 .../{core => bookkeeper-storage}/pom.xml           |  46 +-
 .../bookkeeper/BookKeeperPackagesStorage.java      | 176 +++++++
 .../BookKeeperPackagesStorageConfiguration.java    |  70 +++
 .../BookKeeperPackagesStorageProvider.java}        |  19 +-
 .../storage/bookkeeper/DLInputStream.java          |  94 ++++
 .../storage/bookkeeper/DLOutputStream.java         | 124 +++++
 .../storage/bookkeeper/package-info.java}          |  20 +-
 .../bookkeeper/BookKeeperPackagesStorageTest.java  | 181 ++++++++
 .../storage/bookkeeper/DLInputStreamTest.java      | 146 ++++++
 .../storage/bookkeeper/DLOutputStreamTest.java     | 130 ++++++
 .../bookkeeper/test/BookKeeperClusterTestCase.java | 506 +++++++++++++++++++++
 .../bookkeeper/test/MockedBookKeeperTestCase.java  | 139 ++++++
 .../bookkeeper/bookkeeper/test/PortManager.java    | 127 ++++++
 .../bookkeeper/bookkeeper/test/ZooKeeperUtil.java  | 137 ++++++
 pulsar-package-management/core/pom.xml             |   2 +-
 .../packages/management/core/PackagesStorage.java  |   4 +
 .../core/PackagesStorageConfiguration.java         |   4 +-
 .../impl/DefaultPackagesStorageConfiguration.java} |  17 +-
 .../management/core/MockedPackagesStorage.java     |   5 +
 .../core/common/PackageMetadataSerdeTest.java      |   4 +-
 .../core/impl/PackagesManagementImplTest.java      |   3 +-
 pulsar-package-management/pom.xml                  |   1 +
 22 files changed, 1904 insertions(+), 51 deletions(-)

diff --git a/pulsar-package-management/core/pom.xml b/pulsar-package-management/bookkeeper-storage/pom.xml
similarity index 51%
copy from pulsar-package-management/core/pom.xml
copy to pulsar-package-management/bookkeeper-storage/pom.xml
index 3c4af21..0274f0e 100644
--- a/pulsar-package-management/core/pom.xml
+++ b/pulsar-package-management/bookkeeper-storage/pom.xml
@@ -29,23 +29,53 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>core</artifactId>
-    <name>Apache Pulsar :: Package Management :: Core</name>
+    <artifactId>bookkeeper-storage</artifactId>
+    <name>Apache Pulsar :: Package Management :: BookKeeper Storage</name>
 
     <dependencies>
         <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-package-core</artifactId>
+            <version>${project.version}</version>
         </dependency>
 
         <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
+            <groupId>org.apache.distributedlog</groupId>
+            <artifactId>distributedlog-core</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>net.jpountz.lz4</groupId>
+                    <artifactId>lz4</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>managed-ledger</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>testmocks</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
+
 </project>
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
new file mode 100644
index 0000000..f0409b4
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.pulsar.packages.management.core.PackagesStorage;
+import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
+import org.apache.zookeeper.KeeperException;
+
+
+/**
+ * Packages management storage implementation with bookkeeper.
+ */
+@Slf4j
+public class BookKeeperPackagesStorage implements PackagesStorage {
+
+    private final static String NS_CLIENT_ID = "packages-management";
+    final BookKeeperPackagesStorageConfiguration configuration;
+    private Namespace namespace;
+
+    BookKeeperPackagesStorage(PackagesStorageConfiguration configuration) {
+        this.configuration = new BookKeeperPackagesStorageConfiguration(configuration);
+    }
+
+    @Override
+    public void initialize() {
+        DistributedLogConfiguration conf = new DistributedLogConfiguration()
+            .setImmediateFlushEnabled(true)
+            .setOutputBufferSize(0)
+            .setWriteQuorumSize(configuration.getNumReplicas())
+            .setEnsembleSize(configuration.getNumReplicas())
+            .setAckQuorumSize(configuration.getNumReplicas())
+            .setLockTimeout(DistributedLogConstants.LOCK_IMMEDIATE);
+        if (!Strings.isNullOrEmpty(configuration.getBookkeeperClientAuthenticationPlugin())) {
+            conf.setProperty("bkc.clientAuthProviderFactoryClass",
+                configuration.getBookkeeperClientAuthenticationPlugin());
+            if (!Strings.isNullOrEmpty(configuration.getBookkeeperClientAuthenticationParametersName())) {
+                conf.setProperty("bkc." + configuration.getBookkeeperClientAuthenticationParametersName(),
+                    configuration.getBookkeeperClientAuthenticationParameters());
+            }
+        }
+        try {
+            this.namespace = NamespaceBuilder.newBuilder()
+                .conf(conf).clientId(NS_CLIENT_ID).uri(initializeDlogNamespace()).build();
+        } catch (IOException e) {
+            throw new RuntimeException("Initialize distributed log for packages management service failed.", e);
+        }
+        log.info("Packages management bookKeeper storage initialized successfully");
+    }
+
+    private URI initializeDlogNamespace() throws IOException {
+        BKDLConfig bkdlConfig = new BKDLConfig(configuration.getZkServers(), configuration.getLedgersRootPath());
+        DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
+        URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages", configuration.getZkServers()));
+        try {
+            dlMetadata.create(dlogURI);
+        } catch (ZKException e) {
+            if (e.getKeeperExceptionCode() == KeeperException.Code.NODEEXISTS) {
+                return dlogURI;
+            }
+        }
+        return dlogURI;
+    }
+
+    private CompletableFuture<DistributedLogManager> openLogManagerAsync(String path) {
+        CompletableFuture<DistributedLogManager> logFuture = new CompletableFuture<>();
+        CompletableFuture.runAsync(() -> {
+            try {
+                logFuture.complete(namespace.openLog(path));
+            } catch (IOException e) {
+                logFuture.completeExceptionally(e);
+            }
+        });
+        return logFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> writeAsync(String path, InputStream inputStream) {
+        return openLogManagerAsync(path)
+            .thenCompose(DLOutputStream::openWriterAsync)
+            .thenCompose(dlOutputStream -> dlOutputStream.writeAsync(inputStream))
+            .thenCompose(DLOutputStream::closeAsync);
+    }
+
+    @Override
+    public CompletableFuture<Void> readAsync(String path, OutputStream outputStream) {
+        return openLogManagerAsync(path)
+            .thenCompose(DLInputStream::openReaderAsync)
+            .thenCompose(dlInputStream -> dlInputStream.readAsync(outputStream))
+            .thenCompose(DLInputStream::closeAsync);
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteAsync(String path) {
+        return namespace.getNamespaceDriver().getLogMetadataStore().getLogLocation(path)
+            .thenCompose(uri -> uri.map(value -> namespace.getNamespaceDriver()
+                .getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).deleteLog(value, path))
+                .orElse(null));
+    }
+
+
+    @Override
+    public CompletableFuture<List<String>> listAsync(String path) {
+        return namespace.getNamespaceDriver().getLogMetadataStore().getLogs(path)
+            .thenApply(logs -> {
+                ArrayList<String> packages = new ArrayList<>();
+                logs.forEachRemaining(packages::add);
+                return packages;
+            });
+    }
+
+    @Override
+    public CompletableFuture<Boolean> existAsync(String path) {
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
+        namespace.getNamespaceDriver().getLogMetadataStore().getLogLocation(path)
+            .whenComplete((uriOptional, throwable) -> {
+                if (throwable != null) {
+                    result.complete(false);
+                    return;
+                }
+
+                if (uriOptional.isPresent()) {
+                    namespace.getNamespaceDriver()
+                        .getLogStreamMetadataStore(NamespaceDriver.Role.WRITER)
+                        .logExists(uriOptional.get(), path)
+                        .whenComplete((ignore, e) -> {
+                            if (e != null) {
+                                result.complete(false);
+                            } else {
+                                result.complete(true);
+                            }
+                        });
+                } else {
+                    result.complete(false);
+                }
+            });
+        return result;    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return CompletableFuture.runAsync(() -> this.namespace.close());
+    }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
new file mode 100644
index 0000000..74b031e
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
+import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration;
+
+public class BookKeeperPackagesStorageConfiguration implements PackagesStorageConfiguration {
+
+    private final PackagesStorageConfiguration configuration;
+
+    BookKeeperPackagesStorageConfiguration() {
+        this.configuration = new DefaultPackagesStorageConfiguration();
+    }
+
+    BookKeeperPackagesStorageConfiguration(PackagesStorageConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    int getNumReplicas() {
+        return Integer.parseInt(getProperty("numReplicas"));
+    }
+
+    String getZkServers() {
+        return getProperty("zkServers");
+    }
+
+    String getLedgersRootPath() {
+        return getProperty("ledgerRootPath");
+    }
+
+    String getBookkeeperClientAuthenticationPlugin() {
+        return getProperty("bookkeeperClientAuthenticationPlugin");
+    }
+
+    String getBookkeeperClientAuthenticationParametersName() {
+        return getProperty("bookkeeperClientAuthenticationParametersName");
+    }
+
+    String getBookkeeperClientAuthenticationParameters() {
+        return getProperty("bookkeeperClientAuthenticationParameters");
+    }
+
+
+    @Override
+    public String getProperty(String key) {
+        return configuration.getProperty(key);
+    }
+
+    @Override
+    public void setProperty(String key, String value) {
+        configuration.setProperty(key, value);
+    }
+}
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageProvider.java
similarity index 62%
copy from pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
copy to pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageProvider.java
index ada2d6c..e23ecaa 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageProvider.java
@@ -16,20 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.packages.management.core;
+package org.apache.pulsar.packages.management.storage.bookkeeper;
 
-import java.util.HashMap;
-
-public class MockedPackagesStorageConfiguration implements PackagesStorageConfiguration {
-    private HashMap<String, Object> properties = new HashMap<>();
-
-    @Override
-    public Object getProperty(String key) {
-        return properties.get(key);
-    }
+import org.apache.pulsar.packages.management.core.PackagesStorage;
+import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
+import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
 
+public class BookKeeperPackagesStorageProvider implements PackagesStorageProvider {
     @Override
-    public void setProperty(String key, Object value) {
-        properties.put(key, value);
+    public PackagesStorage getStorage(PackagesStorageConfiguration config) {
+        return new BookKeeperPackagesStorage(config);
     }
 }
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLInputStream.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLInputStream.java
new file mode 100644
index 0000000..863c598
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLInputStream.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.exceptions.EndOfStreamException;
+
+/**
+ * DistributedLog Input Stream.
+ */
+class DLInputStream {
+
+    private final DistributedLogManager distributedLogManager;
+    private final AsyncLogReader reader;
+
+    private DLInputStream(DistributedLogManager distributedLogManager, AsyncLogReader reader) {
+        this.distributedLogManager = distributedLogManager;
+        this.reader = reader;
+    }
+
+    static CompletableFuture<DLInputStream> openReaderAsync(DistributedLogManager distributedLogManager) {
+        return distributedLogManager.openAsyncLogReader(DLSN.InitialDLSN)
+            .thenApply(r -> new DLInputStream(distributedLogManager, r));
+    }
+
+    /**
+     * Read data to output stream.
+     *
+     * @param outputStream the data write to
+     * @return
+     */
+    CompletableFuture<DLInputStream> readAsync(OutputStream outputStream) {
+        CompletableFuture<Void> outputFuture = new CompletableFuture<>();
+        read(outputStream, outputFuture, 10);
+        return outputFuture.thenApply(ignore -> this);
+    }
+
+    /**
+     * When reading the end of a stream, it will throw an EndOfStream exception. So we can use this to
+     * check if we read to the end.
+     *
+     * @param outputStream the data write to
+     * @param readFuture a future that wait to read complete
+     * @param num how many entries read in one time
+     */
+    private void read(OutputStream outputStream, CompletableFuture<Void> readFuture, int num) {
+        reader.readBulk(num)
+            .whenComplete((logRecordWithDLSNS, throwable) -> {
+                if (null != throwable) {
+                    if (throwable instanceof EndOfStreamException) {
+                        readFuture.complete(null);
+                    } else {
+                        readFuture.completeExceptionally(throwable);
+                    }
+                    return;
+                }
+                CompletableFuture.runAsync(() -> {
+                    logRecordWithDLSNS.forEach(logRecord -> {
+                        try {
+                            outputStream.write(logRecord.getPayload());
+                        } catch (IOException e) {
+                            readFuture.completeExceptionally(e);
+                        }
+                    });
+                }).thenRun(() -> read(outputStream, readFuture, num));
+            });
+    }
+
+    CompletableFuture<Void> closeAsync() {
+        return reader.asyncClose().thenCompose(ignore -> distributedLogManager.asyncClose());
+    }
+}
+
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
new file mode 100644
index 0000000..e56fbb3
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+
+/**
+ * DistributedLog Output Stream.
+ */
+@Slf4j
+class DLOutputStream {
+
+    private final DistributedLogManager distributedLogManager;
+    private final AsyncLogWriter writer;
+    private long offset = 0L;
+
+    private DLOutputStream(DistributedLogManager distributedLogManager, AsyncLogWriter writer) {
+        this.distributedLogManager = distributedLogManager;
+        this.writer = writer;
+    }
+
+    static CompletableFuture<DLOutputStream> openWriterAsync(DistributedLogManager distributedLogManager) {
+        log.info("Open a dlog manager");
+        return distributedLogManager.openAsyncLogWriter().thenApply(w -> new DLOutputStream(distributedLogManager, w));
+    }
+
+    private CompletableFuture<List<LogRecord>> getRecords(InputStream inputStream) {
+        CompletableFuture<List<LogRecord>> future = new CompletableFuture<>();
+        CompletableFuture.runAsync(() -> {
+            byte[] readBuffer = new byte[8192];
+            List<LogRecord> records = new ArrayList<>();
+            try {
+                int read = 0;
+                while ((read = inputStream.read(readBuffer)) != -1) {
+                    log.info("write something into the ledgers " + offset);
+                    ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, read);
+                    offset += writeBuf.readableBytes();
+                    LogRecord record = new LogRecord(offset, writeBuf);
+                    records.add(record);
+                }
+                future.complete(records);
+            } catch (IOException e) {
+                log.error("Failed to get all records from the input stream", e);
+                future.completeExceptionally(e);
+            }
+        });
+        return future;
+    }
+
+    /**
+     * Write all input stream data to the distribute log.
+     *
+     * @param inputStream the data we need to write
+     * @return
+     */
+    CompletableFuture<DLOutputStream> writeAsync(InputStream inputStream) {
+        return getRecords(inputStream)
+            .thenCompose(this::writeAsync);
+    }
+
+    /**
+     * Write a ByteBuf data to the distribute log.
+     *
+     * @param data the data we need to write
+     * @return
+     */
+    private CompletableFuture<DLOutputStream> writeAsync(ByteBuf data) {
+        synchronized (this) {
+            offset += data.readableBytes();
+            LogRecord record = new LogRecord(offset, data);
+            log.info("execute write to the dlog " + offset);
+            return writer.write(record).whenComplete((dlsn, throwable) -> {
+                if (throwable != null) {
+                    throwable.printStackTrace();
+                } else {
+                    log.info("DLSN is {} {}", dlsn.toString(), offset);
+                }
+            }).thenApply(ignore -> this);
+        }
+    }
+
+    private CompletableFuture<DLOutputStream> writeAsync(List<LogRecord> records) {
+        return writer.writeBulk(records).thenApply(ignore -> this);
+    }
+
+    /**
+     * Every package will be a stream. So we need mark the stream as EndOfStream when the stream
+     * write done.
+     *
+     * @return
+     */
+    CompletableFuture<Void> closeAsync() {
+        return writer.markEndOfStream()
+            .thenCompose(ignore -> writer.asyncClose())
+            .thenCompose(ignore -> distributedLogManager.asyncClose());
+    }
+}
+
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/package-info.java
similarity index 64%
copy from pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
copy to pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/package-info.java
index ada2d6c..d82dba7 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/package-info.java
@@ -16,20 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.packages.management.core;
 
-import java.util.HashMap;
-
-public class MockedPackagesStorageConfiguration implements PackagesStorageConfiguration {
-    private HashMap<String, Object> properties = new HashMap<>();
-
-    @Override
-    public Object getProperty(String key) {
-        return properties.get(key);
-    }
-
-    @Override
-    public void setProperty(String key, Object value) {
-        properties.put(key, value);
-    }
-}
+/**
+ * Packages management storage implementation with bookkeeper.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
new file mode 100644
index 0000000..5e0954a
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.apache.pulsar.packages.management.core.PackagesStorage;
+import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
+import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration;
+import org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
+    private PackagesStorage storage;
+
+    public BookKeeperPackagesStorageTest() {
+        super(1);
+    }
+
+    @BeforeMethod()
+    public void setup() throws Exception {
+        PackagesStorageProvider provider = PackagesStorageProvider
+            .newProvider(BookKeeperPackagesStorageProvider.class.getName());
+        DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
+        configuration.setProperty("zkServers", zkUtil.getZooKeeperConnectString());
+        configuration.setProperty("numReplicas", "1");
+        configuration.setProperty("ledgerRootPath", "/ledgers");
+        storage = provider.getStorage(configuration);
+        storage.initialize();
+    }
+
+    @AfterMethod
+    public void teardown() throws Exception {
+        if (storage != null) {
+            storage.closeAsync().get();
+        }
+    }
+
+    @Test(timeOut = 60000)
+    public void testConfiguration() {
+        assertTrue(storage instanceof BookKeeperPackagesStorage);
+        BookKeeperPackagesStorage bkStorage = (BookKeeperPackagesStorage) storage;
+        assertEquals(bkStorage.configuration.getZkServers(), zkUtil.getZooKeeperConnectString());
+        assertEquals(bkStorage.configuration.getNumReplicas(), 1);
+        assertEquals(bkStorage.configuration.getLedgersRootPath(), "/ledgers");
+    }
+
+    @Test(timeOut = 60000)
+    public void testReadWriteOperations() throws ExecutionException, InterruptedException {
+        String testData = "test-data";
+        ByteArrayInputStream testDataStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
+        String testPath = "test-read-write";
+
+        // write some data to the dlog
+        storage.writeAsync(testPath, testDataStream).get();
+
+        // read the data from the dlog
+        ByteArrayOutputStream readData = new ByteArrayOutputStream();
+        storage.readAsync(testPath, readData).get();
+        String readResult = new String(readData.toByteArray(), StandardCharsets.UTF_8);
+
+        assertTrue(readResult.equals(testData));
+    }
+
+    @Test(timeOut = 60000)
+    public void testReadNonExistentData() {
+        String testPath = "non-existent-path";
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+        try {
+            storage.readAsync(testPath, outputStream).get();
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof LogNotFoundException);
+        }
+    }
+
+    @Test(timeOut = 60000)
+    public void testListOperation() throws ExecutionException, InterruptedException {
+        // write the data to different path
+        String rootPath = "pulsar";
+        String testData = "test-data";
+        ByteArrayInputStream inputStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
+
+        List<String> writePaths = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            String path = "test-" + i;
+            writePaths.add(path);
+            storage.writeAsync(rootPath + "/" + path, inputStream).get();
+        }
+
+        // list all path under the root path
+        List<String> paths = storage.listAsync(rootPath).get();
+
+        // verify the paths number
+        Assert.assertEquals(writePaths.size(), paths.size());
+        paths.forEach(p -> writePaths.remove(p));
+        Assert.assertEquals(0, writePaths.size());
+
+        // list non-existent path
+        try {
+            storage.listAsync("non-existent").get();
+        } catch (Exception e) {
+            // should not throw any exception
+            fail(e.getMessage());
+        }
+    }
+
+    @Test(timeOut = 60000)
+    public void testDeleteOperation() throws ExecutionException, InterruptedException {
+        String testPath = "test-delete-path";
+        String testData = "test-data";
+        ByteArrayInputStream inputStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
+
+        // write the data to the test path
+        storage.writeAsync(testPath, inputStream).get();
+
+        // list path should have one file
+        List<String> paths = storage.listAsync("").get();
+        Assert.assertEquals(1, paths.size());
+        Assert.assertEquals(testPath, paths.get(0));
+
+        // delete the path
+        storage.deleteAsync(testPath).get();
+
+        // list again and not file under the path
+        paths= storage.listAsync("").get();
+        Assert.assertEquals(0, paths.size());
+
+
+        // delete non-existent path
+        try {
+            storage.deleteAsync("non-existent").get();
+            fail("should throw exception");
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause() instanceof ZKException);
+        }
+    }
+
+    @Test(timeOut = 60000)
+    public void testExistOperation() throws ExecutionException, InterruptedException {
+        Boolean exist = storage.existAsync("test-path").get();
+        assertFalse(exist);
+
+        storage.writeAsync("test-path", new ByteArrayInputStream("test".getBytes())).get();
+
+        exist = storage.existAsync("test-path").get();
+        Assert.assertTrue(exist);
+    }
+
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLInputStreamTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLInputStreamTest.java
new file mode 100644
index 0000000..5406cd2
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLInputStreamTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.eq;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.exceptions.EndOfStreamException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class DLInputStreamTest {
+    private DistributedLogManager dlm;
+    private AsyncLogReader reader;
+
+    @BeforeMethod
+    public void setup() {
+        dlm = mock(DistributedLogManager.class);
+        reader = mock(AsyncLogReader.class);
+
+        when(dlm.openAsyncLogReader(any(DLSN.class))).thenReturn(CompletableFuture.completedFuture(reader));
+        when(dlm.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
+        when(reader.readBulk(anyInt())).thenReturn(failedFuture(new EndOfStreamException("eos")));
+        when(reader.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
+    }
+
+    @AfterMethod
+    public void teardown() throws IOException {
+        if (dlm != null) {
+            dlm.close();
+        }
+    }
+
+    /**
+     * Test Case: reader hits eos (end of stream)
+     */
+    @Test
+    public void testReadEos() throws Exception {
+        OutputStream outputStream = new ByteArrayOutputStream();
+        try {
+            DLInputStream.openReaderAsync(dlm)
+                .thenCompose(d -> d.readAsync(outputStream))
+                .thenCompose(DLInputStream::closeAsync).get();
+        } catch (Exception e) {
+            if (e.getCause() instanceof EndOfStreamException) {
+                // no-op
+            } else {
+                fail(e.getMessage());
+            }
+        }
+
+        verify(dlm, times(1)).openAsyncLogReader(eq(DLSN.InitialDLSN));
+        verify(reader, times(1)).readBulk(eq(10));
+        verify(reader, times(1)).asyncClose();
+        verify(dlm, times(1)).asyncClose();
+    }
+
+
+    /**
+     * Test Case: read records from the input stream. And output it to a output stream.
+     */
+    @Test
+    public void testReadToOutputStream() {
+        // prepare test data
+        byte[] data = "test-read".getBytes();
+        LogRecordWithDLSN record = mock(LogRecordWithDLSN.class);
+        List<LogRecordWithDLSN> records = new ArrayList<LogRecordWithDLSN>();
+        records.add(record);
+
+        when(record.getPayload()).thenReturn(data);
+        when(reader.readBulk(anyInt()))
+            .thenReturn(CompletableFuture.completedFuture(records))
+            .thenReturn(failedFuture(new EndOfStreamException("eos")));
+
+
+        // test code
+        OutputStream outputStream = new ByteArrayOutputStream();
+        try {
+            DLInputStream.openReaderAsync(dlm)
+                .thenCompose(d -> d.readAsync(outputStream))
+                .thenCompose(DLInputStream::closeAsync).get();
+        } catch (Exception e) {
+            if (e.getCause() instanceof EndOfStreamException) {
+                // no-op
+            } else {
+                fail(e.getMessage());
+            }
+        }
+
+        byte[] result = ((ByteArrayOutputStream) outputStream).toByteArray();
+        assertEquals("test-read", new String(result));
+
+    }
+
+    @Test
+    public void openAsyncLogReaderFailed() {
+        when(dlm.openAsyncLogReader(any(DLSN.class))).thenReturn(failedFuture(new Exception("Open reader was failed")));
+
+        try {
+            DLInputStream.openReaderAsync(dlm).get();
+        } catch (Exception e) {
+            assertEquals(e.getCause().getMessage(), "Open reader was failed");
+        }
+    }
+
+    private <T> CompletableFuture<T> failedFuture(Throwable throwable) {
+        CompletableFuture<T> completableFuture = new CompletableFuture<>();
+        completableFuture.completeExceptionally(throwable);
+        return completableFuture;
+    }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
new file mode 100644
index 0000000..6a23357
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static org.testng.AssertJUnit.assertEquals;
+
+public class DLOutputStreamTest {
+
+    private DistributedLogManager dlm;
+    private AsyncLogWriter writer;
+
+    @BeforeMethod
+    public void setup() {
+        dlm = mock(DistributedLogManager.class);
+        writer = mock(AsyncLogWriter.class);
+
+        when(dlm.openAsyncLogWriter()).thenReturn(CompletableFuture.completedFuture(writer));
+        when(dlm.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
+        when(writer.markEndOfStream()).thenReturn(CompletableFuture.completedFuture(null));
+        when(writer.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
+        when(writer.writeBulk(any(List.class)))
+            .thenReturn(CompletableFuture.completedFuture(Collections.singletonList(DLSN.InitialDLSN))); }
+
+    @AfterMethod
+    public void teardown() throws IOException {
+        if (dlm != null) {
+            dlm.close();
+        }
+    }
+
+    /**
+     * Test Case: write data using input stream.
+     */
+    @Test
+    public void writeInputStreamData() throws ExecutionException, InterruptedException {
+        byte[] data = "test-write".getBytes();
+        DLOutputStream.openWriterAsync(dlm)
+            .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
+                .thenCompose(DLOutputStream::closeAsync)).get();
+
+        verify(writer, times(1)).writeBulk(any(List.class));
+        verify(writer, times(1)).markEndOfStream();
+        verify(writer, times(1)).asyncClose();
+        verify(dlm, times(1)).asyncClose();
+    }
+
+    /**
+     * Test Case: write data with byte array.
+     */
+    @Test
+    public void writeBytesArrayData() throws ExecutionException, InterruptedException {
+        byte[] data = "test-write".getBytes();
+        DLOutputStream.openWriterAsync(dlm)
+            .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
+                .thenCompose(DLOutputStream::closeAsync)).get();
+
+        verify(writer, times(1)).writeBulk(any(List.class));
+        verify(writer, times(1)).markEndOfStream();
+        verify(writer, times(1)).asyncClose();
+        verify(dlm, times(1)).asyncClose();
+    }
+
+    @Test
+    public void openAsyncLogWriterFailed() {
+        when(dlm.openAsyncLogWriter()).thenReturn(failedFuture(new Exception("Open writer was failed")));
+
+        try {
+            DLOutputStream.openWriterAsync(dlm).get();
+        } catch (Exception e) {
+            assertEquals(e.getCause().getMessage(), "Open writer was failed");
+        }
+    }
+
+    @Test
+    public void writeRecordFailed() {
+        when(writer.writeBulk(any(List.class)))
+            .thenReturn(failedFuture(new Exception("Write data was failed")));
+
+        byte[] data = "test-write".getBytes();
+        try {
+            DLOutputStream.openWriterAsync(dlm)
+                .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data)))
+                .thenCompose(DLOutputStream::closeAsync).get();
+        } catch (Exception e) {
+            assertEquals(e.getCause().getMessage(), "Write data was failed");
+        }
+    }
+
+    private <T> CompletableFuture<T> failedFuture(Throwable throwable) {
+        CompletableFuture<T> completableFuture = new CompletableFuture<>();
+        completableFuture.completeExceptionally(throwable);
+        return completableFuture;
+    }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
new file mode 100644
index 0000000..042e7fa
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -0,0 +1,506 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This file is derived from BookKeeperClusterTestCase from Apache BookKeeper
+ * http://bookkeeper.apache.org
+ */
+
+package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
+
+import io.netty.buffer.ByteBufAllocator;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.metastore.InMemoryMetaStore;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.AutoRecoveryMain;
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A class runs several bookie servers for testing.
+ */
+public abstract class BookKeeperClusterTestCase {
+
+    static final Logger LOG = LoggerFactory.getLogger(org.apache.bookkeeper.test.BookKeeperClusterTestCase.class);
+
+    // ZooKeeper related variables
+    protected ZooKeeperUtil zkUtil = new ZooKeeperUtil();
+    protected ZooKeeper zkc;
+
+    // BookKeeper related variables
+    protected List<File> tmpDirs = new LinkedList<File>();
+    protected List<BookieServer> bs = new LinkedList<BookieServer>();
+    protected List<ServerConfiguration> bsConfs = new LinkedList<ServerConfiguration>();
+    protected int numBookies;
+    protected BookKeeperTestClient bkc;
+
+    protected ServerConfiguration baseConf = new ServerConfiguration();
+    protected ClientConfiguration baseClientConf = new ClientConfiguration();
+
+    private final Map<BookieServer, AutoRecoveryMain> autoRecoveryProcesses = new HashMap<BookieServer, AutoRecoveryMain>();
+
+    private boolean isAutoRecoveryEnabled;
+
+    protected ExecutorService executor;
+
+    public BookKeeperClusterTestCase(int numBookies) {
+        this.numBookies = numBookies;
+    }
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        executor = Executors.newCachedThreadPool();
+        InMemoryMetaStore.reset();
+        setMetastoreImplClass(baseConf);
+        setMetastoreImplClass(baseClientConf);
+
+        try {
+            // start zookeeper service
+            startZKCluster();
+            // start bookkeeper service
+            startBKCluster();
+
+            zkc.create("/pulsar", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (Exception e) {
+            LOG.error("Error setting up", e);
+            throw e;
+        }
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        // stop bookkeeper service
+        stopBKCluster();
+        // stop zookeeper service
+        stopZKCluster();
+        executor.shutdown();
+    }
+
+    /**
+     * Start zookeeper cluster
+     *
+     * @throws Exception
+     */
+    protected void startZKCluster() throws Exception {
+        zkUtil.startServer();
+        zkc = zkUtil.getZooKeeperClient();
+    }
+
+    /**
+     * Stop zookeeper cluster
+     *
+     * @throws Exception
+     */
+    protected void stopZKCluster() throws Exception {
+        zkUtil.killServer();
+    }
+
+    /**
+     * Start cluster. Also, starts the auto recovery process for each bookie, if isAutoRecoveryEnabled is true.
+     *
+     * @throws Exception
+     */
+    protected void startBKCluster() throws Exception {
+        baseClientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        baseClientConf.setUseV2WireProtocol(true);
+        baseClientConf.setEnableDigestTypeAutodetection(true);
+        baseClientConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
+        if (numBookies > 0) {
+            bkc = new BookKeeperTestClient(baseClientConf);
+        }
+
+        // Create Bookie Servers (B1, B2, B3)
+        for (int i = 0; i < numBookies; i++) {
+            startNewBookie();
+        }
+    }
+
+    /**
+     * Stop cluster. Also, stops all the auto recovery processes for the bookie cluster, if isAutoRecoveryEnabled is
+     * true.
+     *
+     * @throws Exception
+     */
+    protected void stopBKCluster() throws Exception {
+        if (bkc != null) {
+            bkc.close();
+        }
+
+        for (BookieServer server : bs) {
+            server.shutdown();
+            AutoRecoveryMain autoRecovery = autoRecoveryProcesses.get(server);
+            if (autoRecovery != null && isAutoRecoveryEnabled()) {
+                autoRecovery.shutdown();
+                LOG.debug("Shutdown auto recovery for bookieserver:" + server.getLocalAddress());
+            }
+        }
+        bs.clear();
+        for (File f : tmpDirs) {
+            FileUtils.deleteDirectory(f);
+        }
+    }
+
+    protected ServerConfiguration newServerConfiguration() throws Exception {
+        File f = File.createTempFile("bookie", "test");
+        tmpDirs.add(f);
+        f.delete();
+        f.mkdir();
+
+        int port = PortManager.nextFreePort();
+        return newServerConfiguration(port, zkUtil.getZooKeeperConnectString(), f, new File[] { f });
+    }
+
+    protected ClientConfiguration newClientConfiguration() {
+        return new ClientConfiguration(baseConf);
+    }
+
+    protected ServerConfiguration newServerConfiguration(int port, String zkServers, File journalDir,
+                                                         File[] ledgerDirs) {
+        ServerConfiguration conf = new ServerConfiguration(baseConf);
+        conf.setBookiePort(port);
+        conf.setZkServers(zkServers);
+        conf.setJournalDirName(journalDir.getPath());
+        conf.setAllowLoopback(true);
+        conf.setFlushInterval(60 * 1000);
+        conf.setGcWaitTime(60 * 1000);
+        conf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
+        String[] ledgerDirNames = new String[ledgerDirs.length];
+        for (int i = 0; i < ledgerDirs.length; i++) {
+            ledgerDirNames[i] = ledgerDirs[i].getPath();
+        }
+        conf.setLedgerDirNames(ledgerDirNames);
+        conf.setLedgerStorageClass("org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage");
+        conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+        conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+        return conf;
+    }
+
+    /**
+     * Get bookie address for bookie at index
+     */
+    public BookieSocketAddress getBookie(int index) throws Exception {
+        if (bs.size() <= index || index < 0) {
+            throw new IllegalArgumentException(
+                    "Invalid index, there are only " + bs.size() + " bookies. Asked for " + index);
+        }
+        return bs.get(index).getLocalAddress();
+    }
+
+    /**
+     * Kill a bookie by its socket address. Also, stops the autorecovery process for the corresponding bookie server, if
+     * isAutoRecoveryEnabled is true.
+     *
+     * @param addr
+     *            Socket Address
+     * @return the configuration of killed bookie
+     * @throws InterruptedException
+     */
+    public ServerConfiguration killBookie(InetSocketAddress addr) throws Exception {
+        BookieServer toRemove = null;
+        int toRemoveIndex = 0;
+        for (BookieServer server : bs) {
+            if (server.getLocalAddress().equals(addr)) {
+                server.shutdown();
+                toRemove = server;
+                break;
+            }
+            ++toRemoveIndex;
+        }
+        if (toRemove != null) {
+            stopAutoRecoveryService(toRemove);
+            bs.remove(toRemove);
+            return bsConfs.remove(toRemoveIndex);
+        }
+        return null;
+    }
+
+    /**
+     * Kill a bookie by index. Also, stops the respective auto recovery process for this bookie, if
+     * isAutoRecoveryEnabled is true.
+     *
+     * @param index
+     *            Bookie Index
+     * @return the configuration of killed bookie
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    public ServerConfiguration killBookie(int index) throws Exception {
+        if (index >= bs.size()) {
+            throw new IOException("Bookie does not exist");
+        }
+        BookieServer server = bs.get(index);
+        server.shutdown();
+        stopAutoRecoveryService(server);
+        bs.remove(server);
+        return bsConfs.remove(index);
+    }
+
+    /**
+     * Sleep a bookie
+     *
+     * @param addr
+     *            Socket Address
+     * @param seconds
+     *            Sleep seconds
+     * @return Count Down latch which will be counted down when sleep finishes
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    public CountDownLatch sleepBookie(InetSocketAddress addr, final int seconds) throws Exception {
+        for (final BookieServer bookie : bs) {
+            if (bookie.getLocalAddress().equals(addr)) {
+                final CountDownLatch l = new CountDownLatch(1);
+                Thread sleeper = new Thread() {
+                    @Override
+                    public void run() {
+                        try {
+                            bookie.suspendProcessing();
+                            l.countDown();
+                            Thread.sleep(seconds * 1000);
+                            bookie.resumeProcessing();
+                        } catch (Exception e) {
+                            LOG.error("Error suspending bookie", e);
+                        }
+                    }
+                };
+                sleeper.start();
+                return l;
+            }
+        }
+        throw new IOException("Bookie not found");
+    }
+
+    /**
+     * Sleep a bookie until I count down the latch
+     *
+     * @param addr
+     *            Socket Address
+     * @param latch
+     *            Latch to wait on
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    public void sleepBookie(InetSocketAddress addr, final CountDownLatch l) throws Exception {
+        for (final BookieServer bookie : bs) {
+            if (bookie.getLocalAddress().equals(addr)) {
+                Thread sleeper = new Thread() {
+                    @Override
+                    public void run() {
+                        try {
+                            bookie.suspendProcessing();
+                            l.await();
+                            bookie.resumeProcessing();
+                        } catch (Exception e) {
+                            LOG.error("Error suspending bookie", e);
+                        }
+                    }
+                };
+                sleeper.start();
+                return;
+            }
+        }
+        throw new IOException("Bookie not found");
+    }
+
+    /**
+     * Restart bookie servers. Also restarts all the respective auto recovery process, if isAutoRecoveryEnabled is true.
+     *
+     * @throws InterruptedException
+     * @throws IOException
+     * @throws KeeperException
+     * @throws BookieException
+     */
+    public void restartBookies() throws Exception {
+        restartBookies(null);
+    }
+
+    /**
+     * Restart bookie servers using new configuration settings. Also restart the respective auto recovery process, if
+     * isAutoRecoveryEnabled is true.
+     *
+     * @param newConf
+     *            New Configuration Settings
+     * @throws InterruptedException
+     * @throws IOException
+     * @throws KeeperException
+     * @throws BookieException
+     */
+    public void restartBookies(ServerConfiguration newConf) throws Exception {
+        // shut down bookie server
+        for (BookieServer server : bs) {
+            server.shutdown();
+            stopAutoRecoveryService(server);
+        }
+        bs.clear();
+        Thread.sleep(1000);
+        // restart them to ensure we can't
+
+        List<ServerConfiguration> bsConfsCopy = new ArrayList<ServerConfiguration>(bsConfs);
+        bsConfs.clear();
+        for (ServerConfiguration conf : bsConfsCopy) {
+            if (null != newConf) {
+                conf.loadConf(newConf);
+            }
+            startBookie(conf);
+        }
+    }
+
+    /**
+     * Helper method to startup a new bookie server with the indicated port number. Also, starts the auto recovery
+     * process, if the isAutoRecoveryEnabled is set true.
+     *
+     * @param port
+     *            Port to start the new bookie server on
+     * @throws IOException
+     */
+    public int startNewBookie() throws Exception {
+        ServerConfiguration conf = newServerConfiguration();
+        startBookie(conf);
+
+        return conf.getBookiePort();
+    }
+
+    /**
+     * Helper method to startup a bookie server using a configuration object. Also, starts the auto recovery process if
+     * isAutoRecoveryEnabled is true.
+     *
+     * @param conf
+     *            Server Configuration Object
+     *
+     */
+    protected BookieServer startBookie(ServerConfiguration conf) throws Exception {
+        BookieServer server = new BookieServer(conf);
+        bsConfs.add(conf);
+        bs.add(server);
+
+        server.start();
+
+        if (bkc == null) {
+            bkc = new BookKeeperTestClient(baseClientConf);
+        }
+
+        int port = conf.getBookiePort();
+        while (bkc.getZkHandle().exists(
+                "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, false) == null) {
+            Thread.sleep(500);
+        }
+
+        bkc.readBookiesBlocking();
+        LOG.info("New bookie on port " + port + " has been created.");
+
+        return server;
+    }
+
+    /**
+     * Start a bookie with the given bookie instance. Also, starts the auto recovery for this bookie, if
+     * isAutoRecoveryEnabled is true.
+     */
+    protected BookieServer startBookie(ServerConfiguration conf, final Bookie b) throws Exception {
+        BookieServer server = new BookieServer(conf) {
+            protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator)
+                    throws IOException, KeeperException, InterruptedException, BookieException {
+                return b;
+            }
+        };
+        server.start();
+
+        int port = conf.getBookiePort();
+        while (bkc.getZkHandle().exists(
+                "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, false) == null) {
+            Thread.sleep(500);
+        }
+
+        bkc.readBookiesBlocking();
+        LOG.info("New bookie on port " + port + " has been created.");
+        return server;
+    }
+
+    public void setMetastoreImplClass(AbstractConfiguration conf) {
+        conf.setMetastoreImplClass(InMemoryMetaStore.class.getName());
+    }
+
+    /**
+     * Flags used to enable/disable the auto recovery process. If it is enabled, starting the bookie server will starts
+     * the auto recovery process for that bookie. Also, stopping bookie will stops the respective auto recovery process.
+     *
+     * @param isAutoRecoveryEnabled
+     *            Value true will enable the auto recovery process. Value false will disable the auto recovery process
+     */
+    public void setAutoRecoveryEnabled(boolean isAutoRecoveryEnabled) {
+        this.isAutoRecoveryEnabled = isAutoRecoveryEnabled;
+    }
+
+    /**
+     * Flag used to check whether auto recovery process is enabled/disabled. By default the flag is false.
+     *
+     * @return true, if the auto recovery is enabled. Otherwise return false.
+     */
+    public boolean isAutoRecoveryEnabled() {
+        return isAutoRecoveryEnabled;
+    }
+
+    private void stopAutoRecoveryService(BookieServer toRemove) throws Exception {
+        AutoRecoveryMain autoRecoveryMain = autoRecoveryProcesses.remove(toRemove);
+        if (null != autoRecoveryMain && isAutoRecoveryEnabled()) {
+            autoRecoveryMain.shutdown();
+            LOG.debug("Shutdown auto recovery for bookieserver:" + toRemove.getLocalAddress());
+        }
+    }
+
+    /**
+     * Will stops all the auto recovery processes for the bookie cluster, if isAutoRecoveryEnabled is true.
+     */
+    public void stopReplicationService() throws Exception {
+        if (false == isAutoRecoveryEnabled()) {
+            return;
+        }
+        for (Entry<BookieServer, AutoRecoveryMain> autoRecoveryProcess : autoRecoveryProcesses.entrySet()) {
+            autoRecoveryProcess.getValue().shutdown();
+            LOG.debug("Shutdown Auditor Recovery for the bookie:" + autoRecoveryProcess.getKey().getLocalAddress());
+        }
+    }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
new file mode 100644
index 0000000..f1d0f08
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
+
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A class runs several bookie servers for testing.
+ */
+public abstract class MockedBookKeeperTestCase {
+
+    static final Logger LOG = LoggerFactory.getLogger(MockedBookKeeperTestCase.class);
+
+    // ZooKeeper related variables
+    protected MockZooKeeper zkc;
+
+    // BookKeeper related variables
+    protected PulsarMockBookKeeper bkc;
+    protected int numBookies;
+
+    protected ManagedLedgerFactoryImpl factory;
+
+    protected ClientConfiguration baseClientConf = new ClientConfiguration();
+
+    protected OrderedScheduler executor;
+    protected ExecutorService cachedExecutor;
+
+    public MockedBookKeeperTestCase() {
+        // By default start a 3 bookies cluster
+        this(3);
+    }
+
+    public MockedBookKeeperTestCase(int numBookies) {
+        this.numBookies = numBookies;
+    }
+
+    @BeforeMethod
+    public void setUp(Method method) throws Exception {
+        LOG.info(">>>>>> starting {}", method);
+        try {
+            // start bookkeeper service
+            startBookKeeper();
+        } catch (Exception e) {
+            LOG.error("Error setting up", e);
+            throw e;
+        }
+
+        ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
+        factory = new ManagedLedgerFactoryImpl(bkc, zkc, conf);
+
+        zkc.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown(Method method) {
+        try {
+            LOG.info("@@@@@@@@@ stopping " + method);
+            factory.shutdown();
+            factory = null;
+            stopBookKeeper();
+            stopZooKeeper();
+            LOG.info("--------- stopped {}", method);
+        } catch (Exception e) {
+            LOG.error("tearDown Error", e);
+        }
+    }
+
+    @BeforeClass
+    public void setUpClass() {
+        executor = OrderedScheduler.newSchedulerBuilder().numThreads(2).name("test").build();
+        cachedExecutor = Executors.newCachedThreadPool();
+    }
+
+    @AfterClass
+    public void tearDownClass() {
+        executor.shutdown();
+        cachedExecutor.shutdown();
+    }
+
+    /**
+     * Start cluster
+     *
+     * @throws Exception
+     */
+    protected void startBookKeeper() throws Exception {
+        zkc = MockZooKeeper.newInstance();
+        for (int i = 0; i < numBookies; i++) {
+            ZkUtils.createFullPathOptimistic(zkc, "/ledgers/available/192.168.1.1:" + (5000 + i), "".getBytes(), null,
+                    null);
+        }
+
+        zkc.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(), null, null);
+
+        bkc = new PulsarMockBookKeeper(zkc, executor.chooseThread(this));
+    }
+
+    protected void stopBookKeeper() throws Exception {
+        bkc.shutdown();
+    }
+
+    protected void stopZooKeeper() throws Exception {
+        zkc.shutdown();
+    }
+
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java
new file mode 100644
index 0000000..daed597
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
+
+import lombok.Cleanup;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.Inet4Address;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * Port manager allows a base port to be specified on the commandline. Tests will then use ports, counting up from this
+ * base port. This allows multiple instances of the bookkeeper tests to run at once.
+ */
+public class PortManager {
+
+    private static final String lockFilename = System.getProperty("test.lockFilename",
+            "/tmp/pulsar-test-port-manager.lock");
+    private static final int basePort = Integer.parseInt(System.getProperty("test.basePort", "15000"));
+
+    private static final int maxPort = 32000;
+
+    /**
+     * Return a TCP port that is currently unused.
+     *
+     * Keeps track of assigned ports and avoid race condition between different processes
+     */
+    public synchronized static int nextFreePort() {
+        Path path = Paths.get(lockFilename);
+
+        try {
+            @Cleanup
+            FileChannel fileChannel = FileChannel.open(path,
+                    StandardOpenOption.CREATE,
+                    StandardOpenOption.WRITE,
+                    StandardOpenOption.READ);
+
+            @Cleanup
+            FileLock lock = fileChannel.lock();
+
+            ByteBuffer buffer = ByteBuffer.allocate(32);
+            int len = fileChannel.read(buffer, 0L);
+            buffer.flip();
+
+            int lastUsedPort = basePort;
+            if (len > 0) {
+                byte[] bytes = new byte[buffer.remaining()];
+                buffer.get(bytes);
+                String lastUsedPortStr = new String(bytes);
+                lastUsedPort = Integer.parseInt(lastUsedPortStr);
+            }
+
+            int freePort = probeFreePort(lastUsedPort + 1);
+
+            buffer.clear();
+            buffer.put(Integer.toString(freePort).getBytes());
+            buffer.flip();
+            fileChannel.write(buffer, 0L);
+            fileChannel.truncate(buffer.position());
+            fileChannel.force(true);
+
+            return freePort;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static final int MAX_PORT_CONFLICTS = 10;
+
+    private synchronized static int probeFreePort(int port) {
+        int exceptionCount = 0;
+        while (true) {
+            if (port == maxPort) {
+                // Rollover the port probe
+                port = basePort;
+            }
+
+            try (Socket s = new Socket()) {
+                s.connect(new InetSocketAddress(Inet4Address.getLoopbackAddress(), port), 100);
+
+                // If we succeed to connect it means the port is being used
+
+            } catch (ConnectException e) {
+                return port;
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+            port++;
+            exceptionCount++;
+            if (exceptionCount > MAX_PORT_CONFLICTS) {
+                throw new RuntimeException("Failed to find an open port");
+            }
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        while (true) {
+            System.out.println("Port: " + nextFreePort());
+            Thread.sleep(100);
+        }
+    }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/ZooKeeperUtil.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/ZooKeeperUtil.java
new file mode 100644
index 0000000..a06134f
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/ZooKeeperUtil.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This file is derived from ZooKeeperUtil from Apache BookKeeper
+ * http://bookkeeper.apache.org
+ */
+
+package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
+
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+
+public class ZooKeeperUtil {
+    static final Logger LOG = LoggerFactory.getLogger(org.apache.bookkeeper.test.ZooKeeperUtil.class);
+
+    // ZooKeeper related variables
+    protected final static Integer zooKeeperPort = PortManager.nextFreePort();
+    private final InetSocketAddress zkaddr;
+
+    protected ZooKeeperServer zks;
+    protected ZooKeeper zkc; // zookeeper client
+    protected NIOServerCnxnFactory serverFactory;
+    protected File ZkTmpDir;
+    private final String connectString;
+
+    public ZooKeeperUtil() {
+        zkaddr = new InetSocketAddress(zooKeeperPort);
+        connectString = "localhost:" + zooKeeperPort;
+    }
+
+    public ZooKeeper getZooKeeperClient() {
+        return zkc;
+    }
+
+    public String getZooKeeperConnectString() {
+        return connectString;
+    }
+
+    public void startServer() throws Exception {
+        // create a ZooKeeper server(dataDir, dataLogDir, port)
+        LOG.debug("Running ZK server");
+        // ServerStats.registerAsConcrete();
+        ClientBase.setupTestEnv();
+        ZkTmpDir = File.createTempFile("zookeeper", "test");
+        ZkTmpDir.delete();
+        ZkTmpDir.mkdir();
+
+        zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperServer.DEFAULT_TICK_TIME);
+        serverFactory = new NIOServerCnxnFactory();
+        serverFactory.configure(zkaddr, 100);
+        serverFactory.startup(zks);
+
+        boolean b = ClientBase.waitForServerUp(getZooKeeperConnectString(), ClientBase.CONNECTION_TIMEOUT);
+        LOG.debug("Server up: " + b);
+
+        // create a zookeeper client
+        LOG.debug("Instantiate ZK Client");
+        zkc = ZooKeeperClient.newBuilder().connectString(getZooKeeperConnectString()).build();
+
+        // initialize the zk client with values
+        zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+
+    @SuppressWarnings("deprecation")
+    public void sleepServer(final int seconds, final CountDownLatch l) throws InterruptedException, IOException {
+        Thread[] allthreads = new Thread[Thread.activeCount()];
+        Thread.enumerate(allthreads);
+        for (final Thread t : allthreads) {
+            if (t.getName().contains("SyncThread:0")) {
+                Thread sleeper = new Thread() {
+                    public void run() {
+                        try {
+                            t.suspend();
+                            l.countDown();
+                            Thread.sleep(seconds * 1000);
+                            t.resume();
+                        } catch (Exception e) {
+                            LOG.error("Error suspending thread", e);
+                        }
+                    }
+                };
+                sleeper.start();
+                return;
+            }
+        }
+        throw new IOException("ZooKeeper thread not found");
+    }
+
+    public void killServer() throws Exception {
+        if (zkc != null) {
+            zkc.close();
+        }
+
+        // shutdown ZK server
+        if (serverFactory != null) {
+            serverFactory.shutdown();
+            Assert.assertTrue(ClientBase.waitForServerDown(getZooKeeperConnectString(), ClientBase.CONNECTION_TIMEOUT),
+                    "waiting for server down");
+        }
+        if (zks != null) {
+            zks.getTxnLogFactory().close();
+        }
+        // ServerStats.unregister();
+        FileUtils.deleteDirectory(ZkTmpDir);
+    }
+}
diff --git a/pulsar-package-management/core/pom.xml b/pulsar-package-management/core/pom.xml
index 3c4af21..cb54e1e 100644
--- a/pulsar-package-management/core/pom.xml
+++ b/pulsar-package-management/core/pom.xml
@@ -29,7 +29,7 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>core</artifactId>
+    <artifactId>pulsar-package-core</artifactId>
     <name>Apache Pulsar :: Package Management :: Core</name>
 
     <dependencies>
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorage.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorage.java
index e01e815..9672cf8 100644
--- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorage.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorage.java
@@ -25,6 +25,10 @@ import java.util.concurrent.CompletableFuture;
 
 public interface PackagesStorage {
     /**
+     * Initialize the packages management service with the given storage.
+     */
+    void initialize();
+    /**
      * Write a input stream to a path.
      *
      * @param path
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java
index 4525a97..23e9028 100644
--- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java
@@ -29,7 +29,7 @@ public interface PackagesStorageConfiguration {
      *          property key
      * @return the value
      */
-    Object getProperty(String key);
+    String getProperty(String key);
 
     /**
      * Set a property with the key.
@@ -39,5 +39,5 @@ public interface PackagesStorageConfiguration {
      * @param value
      *          property value
      */
-    void setProperty(String key, Object value);
+    void setProperty(String key, String value);
 }
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java
similarity index 62%
rename from pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
rename to pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java
index ada2d6c..92ff426 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java
@@ -16,20 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.packages.management.core;
+package org.apache.pulsar.packages.management.core.impl;
 
-import java.util.HashMap;
+import java.util.Properties;
+import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
 
-public class MockedPackagesStorageConfiguration implements PackagesStorageConfiguration {
-    private HashMap<String, Object> properties = new HashMap<>();
+public class DefaultPackagesStorageConfiguration implements PackagesStorageConfiguration {
+    private final Properties properties = new Properties();
 
     @Override
-    public Object getProperty(String key) {
-        return properties.get(key);
+    public String getProperty(String key) {
+        return (String) properties.get(key);
     }
 
     @Override
-    public void setProperty(String key, Object value) {
-        properties.put(key, value);
+    public void setProperty(String key, String value) {
+        properties.setProperty(key, value);
     }
 }
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
index c5a379e..8af3195 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
+++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
@@ -36,6 +36,11 @@ public class MockedPackagesStorage implements PackagesStorage {
     }
 
     @Override
+    public void initialize() {
+
+    }
+
+    @Override
     public CompletableFuture<Void> writeAsync(String path, InputStream inputStream) {
         CompletableFuture<Void> future = new CompletableFuture<>();
         CompletableFuture.runAsync(() -> {
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java
index 940be34..98b2168 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java
+++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.packages.management.core.common;
 
 import java.util.HashMap;
 import org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException;
-import org.junit.Test;
-import org.junit.Assert;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
 public class PackageMetadataSerdeTest {
     @Test
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
index bae08f1..ff150c4 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
+++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
@@ -23,7 +23,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
-import org.apache.pulsar.packages.management.core.MockedPackagesStorageConfiguration;
 import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
 import org.apache.pulsar.packages.management.core.PackagesManagement;
 import org.apache.pulsar.packages.management.core.PackagesStorage;
@@ -43,7 +42,7 @@ public class PackagesManagementImplTest {
     @BeforeClass
     public static void setup() throws IOException {
         PackagesStorageProvider storageProvider = PackagesStorageProvider.newProvider(MockedPackagesStorageProvider.class.getName());
-        MockedPackagesStorageConfiguration packagesStorageConfiguration = new MockedPackagesStorageConfiguration();
+        DefaultPackagesStorageConfiguration packagesStorageConfiguration = new DefaultPackagesStorageConfiguration();
         storage = storageProvider.getStorage(packagesStorageConfiguration);
 
         packagesManagement = new PackagesManagementImpl();
diff --git a/pulsar-package-management/pom.xml b/pulsar-package-management/pom.xml
index 98a9db5..232a058 100644
--- a/pulsar-package-management/pom.xml
+++ b/pulsar-package-management/pom.xml
@@ -35,6 +35,7 @@
     <name>Apache Pulsar :: Package Management</name>
     <modules>
         <module>core</module>
+        <module>bookkeeper-storage</module>
     </modules>
 
     <build>