You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/03 03:03:42 UTC
[pulsar] branch master updated: Package management bookkeeper
storage implementation (#8744)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4a2b006 Package management bookkeeper storage implementation (#8744)
4a2b006 is described below
commit 4a2b006f854b36f042a845150acebcc3d7a50358
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Dec 3 11:03:23 2020 +0800
Package management bookkeeper storage implementation (#8744)
Master Issue: #8676
### Motivation
Support bookkeeper storage for packages management service.
### Modifications
- Add bookkeeper storage provider
- Add bookkeeper storage unit tests
---
.../{core => bookkeeper-storage}/pom.xml | 46 +-
.../bookkeeper/BookKeeperPackagesStorage.java | 176 +++++++
.../BookKeeperPackagesStorageConfiguration.java | 70 +++
.../BookKeeperPackagesStorageProvider.java} | 19 +-
.../storage/bookkeeper/DLInputStream.java | 94 ++++
.../storage/bookkeeper/DLOutputStream.java | 124 +++++
.../storage/bookkeeper/package-info.java} | 20 +-
.../bookkeeper/BookKeeperPackagesStorageTest.java | 181 ++++++++
.../storage/bookkeeper/DLInputStreamTest.java | 146 ++++++
.../storage/bookkeeper/DLOutputStreamTest.java | 130 ++++++
.../bookkeeper/test/BookKeeperClusterTestCase.java | 506 +++++++++++++++++++++
.../bookkeeper/test/MockedBookKeeperTestCase.java | 139 ++++++
.../bookkeeper/bookkeeper/test/PortManager.java | 127 ++++++
.../bookkeeper/bookkeeper/test/ZooKeeperUtil.java | 137 ++++++
pulsar-package-management/core/pom.xml | 2 +-
.../packages/management/core/PackagesStorage.java | 4 +
.../core/PackagesStorageConfiguration.java | 4 +-
.../impl/DefaultPackagesStorageConfiguration.java} | 17 +-
.../management/core/MockedPackagesStorage.java | 5 +
.../core/common/PackageMetadataSerdeTest.java | 4 +-
.../core/impl/PackagesManagementImplTest.java | 3 +-
pulsar-package-management/pom.xml | 1 +
22 files changed, 1904 insertions(+), 51 deletions(-)
diff --git a/pulsar-package-management/core/pom.xml b/pulsar-package-management/bookkeeper-storage/pom.xml
similarity index 51%
copy from pulsar-package-management/core/pom.xml
copy to pulsar-package-management/bookkeeper-storage/pom.xml
index 3c4af21..0274f0e 100644
--- a/pulsar-package-management/core/pom.xml
+++ b/pulsar-package-management/bookkeeper-storage/pom.xml
@@ -29,23 +29,53 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>core</artifactId>
- <name>Apache Pulsar :: Package Management :: Core</name>
+ <artifactId>bookkeeper-storage</artifactId>
+ <name>Apache Pulsar :: Package Management :: BookKeeper Storage</name>
<dependencies>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-package-core</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>managed-ledger</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>testmocks</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
</dependency>
</dependencies>
+
</project>
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
new file mode 100644
index 0000000..f0409b4
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorage.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.pulsar.packages.management.core.PackagesStorage;
+import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
+import org.apache.zookeeper.KeeperException;
+
+
+/**
+ * Packages management storage implementation with bookkeeper.
+ */
+@Slf4j
+public class BookKeeperPackagesStorage implements PackagesStorage {
+
+ private final static String NS_CLIENT_ID = "packages-management";
+ final BookKeeperPackagesStorageConfiguration configuration;
+ private Namespace namespace;
+
+ BookKeeperPackagesStorage(PackagesStorageConfiguration configuration) {
+ this.configuration = new BookKeeperPackagesStorageConfiguration(configuration);
+ }
+
+ @Override
+ public void initialize() {
+ DistributedLogConfiguration conf = new DistributedLogConfiguration()
+ .setImmediateFlushEnabled(true)
+ .setOutputBufferSize(0)
+ .setWriteQuorumSize(configuration.getNumReplicas())
+ .setEnsembleSize(configuration.getNumReplicas())
+ .setAckQuorumSize(configuration.getNumReplicas())
+ .setLockTimeout(DistributedLogConstants.LOCK_IMMEDIATE);
+ if (!Strings.isNullOrEmpty(configuration.getBookkeeperClientAuthenticationPlugin())) {
+ conf.setProperty("bkc.clientAuthProviderFactoryClass",
+ configuration.getBookkeeperClientAuthenticationPlugin());
+ if (!Strings.isNullOrEmpty(configuration.getBookkeeperClientAuthenticationParametersName())) {
+ conf.setProperty("bkc." + configuration.getBookkeeperClientAuthenticationParametersName(),
+ configuration.getBookkeeperClientAuthenticationParameters());
+ }
+ }
+ try {
+ this.namespace = NamespaceBuilder.newBuilder()
+ .conf(conf).clientId(NS_CLIENT_ID).uri(initializeDlogNamespace()).build();
+ } catch (IOException e) {
+ throw new RuntimeException("Initialize distributed log for packages management service failed.", e);
+ }
+ log.info("Packages management bookKeeper storage initialized successfully");
+ }
+
+ private URI initializeDlogNamespace() throws IOException {
+ BKDLConfig bkdlConfig = new BKDLConfig(configuration.getZkServers(), configuration.getLedgersRootPath());
+ DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
+ URI dlogURI = URI.create(String.format("distributedlog://%s/pulsar/packages", configuration.getZkServers()));
+ try {
+ dlMetadata.create(dlogURI);
+ } catch (ZKException e) {
+ if (e.getKeeperExceptionCode() == KeeperException.Code.NODEEXISTS) {
+ return dlogURI;
+ }
+ }
+ return dlogURI;
+ }
+
+ private CompletableFuture<DistributedLogManager> openLogManagerAsync(String path) {
+ CompletableFuture<DistributedLogManager> logFuture = new CompletableFuture<>();
+ CompletableFuture.runAsync(() -> {
+ try {
+ logFuture.complete(namespace.openLog(path));
+ } catch (IOException e) {
+ logFuture.completeExceptionally(e);
+ }
+ });
+ return logFuture;
+ }
+
+ @Override
+ public CompletableFuture<Void> writeAsync(String path, InputStream inputStream) {
+ return openLogManagerAsync(path)
+ .thenCompose(DLOutputStream::openWriterAsync)
+ .thenCompose(dlOutputStream -> dlOutputStream.writeAsync(inputStream))
+ .thenCompose(DLOutputStream::closeAsync);
+ }
+
+ @Override
+ public CompletableFuture<Void> readAsync(String path, OutputStream outputStream) {
+ return openLogManagerAsync(path)
+ .thenCompose(DLInputStream::openReaderAsync)
+ .thenCompose(dlInputStream -> dlInputStream.readAsync(outputStream))
+ .thenCompose(DLInputStream::closeAsync);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAsync(String path) {
+ return namespace.getNamespaceDriver().getLogMetadataStore().getLogLocation(path)
+ .thenCompose(uri -> uri.map(value -> namespace.getNamespaceDriver()
+ .getLogStreamMetadataStore(NamespaceDriver.Role.WRITER).deleteLog(value, path))
+ .orElse(null));
+ }
+
+
+ @Override
+ public CompletableFuture<List<String>> listAsync(String path) {
+ return namespace.getNamespaceDriver().getLogMetadataStore().getLogs(path)
+ .thenApply(logs -> {
+ ArrayList<String> packages = new ArrayList<>();
+ logs.forEachRemaining(packages::add);
+ return packages;
+ });
+ }
+
+ @Override
+ public CompletableFuture<Boolean> existAsync(String path) {
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
+ namespace.getNamespaceDriver().getLogMetadataStore().getLogLocation(path)
+ .whenComplete((uriOptional, throwable) -> {
+ if (throwable != null) {
+ result.complete(false);
+ return;
+ }
+
+ if (uriOptional.isPresent()) {
+ namespace.getNamespaceDriver()
+ .getLogStreamMetadataStore(NamespaceDriver.Role.WRITER)
+ .logExists(uriOptional.get(), path)
+ .whenComplete((ignore, e) -> {
+ if (e != null) {
+ result.complete(false);
+ } else {
+ result.complete(true);
+ }
+ });
+ } else {
+ result.complete(false);
+ }
+ });
+ return result; }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return CompletableFuture.runAsync(() -> this.namespace.close());
+ }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
new file mode 100644
index 0000000..74b031e
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageConfiguration.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
+import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration;
+
+public class BookKeeperPackagesStorageConfiguration implements PackagesStorageConfiguration {
+
+ private final PackagesStorageConfiguration configuration;
+
+ BookKeeperPackagesStorageConfiguration() {
+ this.configuration = new DefaultPackagesStorageConfiguration();
+ }
+
+ BookKeeperPackagesStorageConfiguration(PackagesStorageConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ int getNumReplicas() {
+ return Integer.parseInt(getProperty("numReplicas"));
+ }
+
+ String getZkServers() {
+ return getProperty("zkServers");
+ }
+
+ String getLedgersRootPath() {
+ return getProperty("ledgerRootPath");
+ }
+
+ String getBookkeeperClientAuthenticationPlugin() {
+ return getProperty("bookkeeperClientAuthenticationPlugin");
+ }
+
+ String getBookkeeperClientAuthenticationParametersName() {
+ return getProperty("bookkeeperClientAuthenticationParametersName");
+ }
+
+ String getBookkeeperClientAuthenticationParameters() {
+ return getProperty("bookkeeperClientAuthenticationParameters");
+ }
+
+
+ @Override
+ public String getProperty(String key) {
+ return configuration.getProperty(key);
+ }
+
+ @Override
+ public void setProperty(String key, String value) {
+ configuration.setProperty(key, value);
+ }
+}
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageProvider.java
similarity index 62%
copy from pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
copy to pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageProvider.java
index ada2d6c..e23ecaa 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageProvider.java
@@ -16,20 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.packages.management.core;
+package org.apache.pulsar.packages.management.storage.bookkeeper;
-import java.util.HashMap;
-
-public class MockedPackagesStorageConfiguration implements PackagesStorageConfiguration {
- private HashMap<String, Object> properties = new HashMap<>();
-
- @Override
- public Object getProperty(String key) {
- return properties.get(key);
- }
+import org.apache.pulsar.packages.management.core.PackagesStorage;
+import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
+import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
+public class BookKeeperPackagesStorageProvider implements PackagesStorageProvider {
@Override
- public void setProperty(String key, Object value) {
- properties.put(key, value);
+ public PackagesStorage getStorage(PackagesStorageConfiguration config) {
+ return new BookKeeperPackagesStorage(config);
}
}
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLInputStream.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLInputStream.java
new file mode 100644
index 0000000..863c598
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLInputStream.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.exceptions.EndOfStreamException;
+
+/**
+ * DistributedLog Input Stream.
+ */
+class DLInputStream {
+
+ private final DistributedLogManager distributedLogManager;
+ private final AsyncLogReader reader;
+
+ private DLInputStream(DistributedLogManager distributedLogManager, AsyncLogReader reader) {
+ this.distributedLogManager = distributedLogManager;
+ this.reader = reader;
+ }
+
+ static CompletableFuture<DLInputStream> openReaderAsync(DistributedLogManager distributedLogManager) {
+ return distributedLogManager.openAsyncLogReader(DLSN.InitialDLSN)
+ .thenApply(r -> new DLInputStream(distributedLogManager, r));
+ }
+
+ /**
+ * Read data to output stream.
+ *
+ * @param outputStream the data write to
+ * @return
+ */
+ CompletableFuture<DLInputStream> readAsync(OutputStream outputStream) {
+ CompletableFuture<Void> outputFuture = new CompletableFuture<>();
+ read(outputStream, outputFuture, 10);
+ return outputFuture.thenApply(ignore -> this);
+ }
+
+ /**
+ * When reading the end of a stream, it will throw an EndOfStream exception. So we can use this to
+ * check if we read to the end.
+ *
+ * @param outputStream the data write to
+ * @param readFuture a future that wait to read complete
+ * @param num how many entries read in one time
+ */
+ private void read(OutputStream outputStream, CompletableFuture<Void> readFuture, int num) {
+ reader.readBulk(num)
+ .whenComplete((logRecordWithDLSNS, throwable) -> {
+ if (null != throwable) {
+ if (throwable instanceof EndOfStreamException) {
+ readFuture.complete(null);
+ } else {
+ readFuture.completeExceptionally(throwable);
+ }
+ return;
+ }
+ CompletableFuture.runAsync(() -> {
+ logRecordWithDLSNS.forEach(logRecord -> {
+ try {
+ outputStream.write(logRecord.getPayload());
+ } catch (IOException e) {
+ readFuture.completeExceptionally(e);
+ }
+ });
+ }).thenRun(() -> read(outputStream, readFuture, num));
+ });
+ }
+
+ CompletableFuture<Void> closeAsync() {
+ return reader.asyncClose().thenCompose(ignore -> distributedLogManager.asyncClose());
+ }
+}
+
diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
new file mode 100644
index 0000000..e56fbb3
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+
+/**
+ * DistributedLog Output Stream.
+ */
+@Slf4j
+class DLOutputStream {
+
+ private final DistributedLogManager distributedLogManager;
+ private final AsyncLogWriter writer;
+ private long offset = 0L;
+
+ private DLOutputStream(DistributedLogManager distributedLogManager, AsyncLogWriter writer) {
+ this.distributedLogManager = distributedLogManager;
+ this.writer = writer;
+ }
+
+ static CompletableFuture<DLOutputStream> openWriterAsync(DistributedLogManager distributedLogManager) {
+ log.info("Open a dlog manager");
+ return distributedLogManager.openAsyncLogWriter().thenApply(w -> new DLOutputStream(distributedLogManager, w));
+ }
+
+ private CompletableFuture<List<LogRecord>> getRecords(InputStream inputStream) {
+ CompletableFuture<List<LogRecord>> future = new CompletableFuture<>();
+ CompletableFuture.runAsync(() -> {
+ byte[] readBuffer = new byte[8192];
+ List<LogRecord> records = new ArrayList<>();
+ try {
+ int read = 0;
+ while ((read = inputStream.read(readBuffer)) != -1) {
+ log.info("write something into the ledgers " + offset);
+ ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, read);
+ offset += writeBuf.readableBytes();
+ LogRecord record = new LogRecord(offset, writeBuf);
+ records.add(record);
+ }
+ future.complete(records);
+ } catch (IOException e) {
+ log.error("Failed to get all records from the input stream", e);
+ future.completeExceptionally(e);
+ }
+ });
+ return future;
+ }
+
+ /**
+ * Write all input stream data to the distribute log.
+ *
+ * @param inputStream the data we need to write
+ * @return
+ */
+ CompletableFuture<DLOutputStream> writeAsync(InputStream inputStream) {
+ return getRecords(inputStream)
+ .thenCompose(this::writeAsync);
+ }
+
+ /**
+ * Write a ByteBuf data to the distribute log.
+ *
+ * @param data the data we need to write
+ * @return
+ */
+ private CompletableFuture<DLOutputStream> writeAsync(ByteBuf data) {
+ synchronized (this) {
+ offset += data.readableBytes();
+ LogRecord record = new LogRecord(offset, data);
+ log.info("execute write to the dlog " + offset);
+ return writer.write(record).whenComplete((dlsn, throwable) -> {
+ if (throwable != null) {
+ throwable.printStackTrace();
+ } else {
+ log.info("DLSN is {} {}", dlsn.toString(), offset);
+ }
+ }).thenApply(ignore -> this);
+ }
+ }
+
+ private CompletableFuture<DLOutputStream> writeAsync(List<LogRecord> records) {
+ return writer.writeBulk(records).thenApply(ignore -> this);
+ }
+
+ /**
+ * Every package will be a stream. So we need mark the stream as EndOfStream when the stream
+ * write done.
+ *
+ * @return
+ */
+ CompletableFuture<Void> closeAsync() {
+ return writer.markEndOfStream()
+ .thenCompose(ignore -> writer.asyncClose())
+ .thenCompose(ignore -> distributedLogManager.asyncClose());
+ }
+}
+
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/package-info.java
similarity index 64%
copy from pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
copy to pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/package-info.java
index ada2d6c..d82dba7 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
+++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/package-info.java
@@ -16,20 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.packages.management.core;
-import java.util.HashMap;
-
-public class MockedPackagesStorageConfiguration implements PackagesStorageConfiguration {
- private HashMap<String, Object> properties = new HashMap<>();
-
- @Override
- public Object getProperty(String key) {
- return properties.get(key);
- }
-
- @Override
- public void setProperty(String key, Object value) {
- properties.put(key, value);
- }
-}
+/**
+ * Packages management storage implementation with bookkeeper.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
new file mode 100644
index 0000000..5e0954a
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/BookKeeperPackagesStorageTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.apache.pulsar.packages.management.core.PackagesStorage;
+import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
+import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration;
+import org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class BookKeeperPackagesStorageTest extends BookKeeperClusterTestCase {
+ private PackagesStorage storage;
+
+ public BookKeeperPackagesStorageTest() {
+ super(1);
+ }
+
+ @BeforeMethod()
+ public void setup() throws Exception {
+ PackagesStorageProvider provider = PackagesStorageProvider
+ .newProvider(BookKeeperPackagesStorageProvider.class.getName());
+ DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
+ configuration.setProperty("zkServers", zkUtil.getZooKeeperConnectString());
+ configuration.setProperty("numReplicas", "1");
+ configuration.setProperty("ledgerRootPath", "/ledgers");
+ storage = provider.getStorage(configuration);
+ storage.initialize();
+ }
+
+ @AfterMethod
+ public void teardown() throws Exception {
+ if (storage != null) {
+ storage.closeAsync().get();
+ }
+ }
+
+ @Test(timeOut = 60000)
+ public void testConfiguration() {
+ assertTrue(storage instanceof BookKeeperPackagesStorage);
+ BookKeeperPackagesStorage bkStorage = (BookKeeperPackagesStorage) storage;
+ assertEquals(bkStorage.configuration.getZkServers(), zkUtil.getZooKeeperConnectString());
+ assertEquals(bkStorage.configuration.getNumReplicas(), 1);
+ assertEquals(bkStorage.configuration.getLedgersRootPath(), "/ledgers");
+ }
+
+ @Test(timeOut = 60000)
+ public void testReadWriteOperations() throws ExecutionException, InterruptedException {
+ String testData = "test-data";
+ ByteArrayInputStream testDataStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
+ String testPath = "test-read-write";
+
+ // write some data to the dlog
+ storage.writeAsync(testPath, testDataStream).get();
+
+ // read the data from the dlog
+ ByteArrayOutputStream readData = new ByteArrayOutputStream();
+ storage.readAsync(testPath, readData).get();
+ String readResult = new String(readData.toByteArray(), StandardCharsets.UTF_8);
+
+ assertTrue(readResult.equals(testData));
+ }
+
+ @Test(timeOut = 60000)
+ public void testReadNonExistentData() {
+ String testPath = "non-existent-path";
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ try {
+ storage.readAsync(testPath, outputStream).get();
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof LogNotFoundException);
+ }
+ }
+
+ @Test(timeOut = 60000)
+ public void testListOperation() throws ExecutionException, InterruptedException {
+ // write the data to different path
+ String rootPath = "pulsar";
+ String testData = "test-data";
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
+
+ List<String> writePaths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String path = "test-" + i;
+ writePaths.add(path);
+ storage.writeAsync(rootPath + "/" + path, inputStream).get();
+ }
+
+ // list all path under the root path
+ List<String> paths = storage.listAsync(rootPath).get();
+
+ // verify the paths number
+ Assert.assertEquals(writePaths.size(), paths.size());
+ paths.forEach(p -> writePaths.remove(p));
+ Assert.assertEquals(0, writePaths.size());
+
+ // list non-existent path
+ try {
+ storage.listAsync("non-existent").get();
+ } catch (Exception e) {
+ // should not throw any exception
+ fail(e.getMessage());
+ }
+ }
+
+ @Test(timeOut = 60000)
+ public void testDeleteOperation() throws ExecutionException, InterruptedException {
+ String testPath = "test-delete-path";
+ String testData = "test-data";
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
+
+ // write the data to the test path
+ storage.writeAsync(testPath, inputStream).get();
+
+ // list path should have one file
+ List<String> paths = storage.listAsync("").get();
+ Assert.assertEquals(1, paths.size());
+ Assert.assertEquals(testPath, paths.get(0));
+
+ // delete the path
+ storage.deleteAsync(testPath).get();
+
+ // list again and not file under the path
+ paths= storage.listAsync("").get();
+ Assert.assertEquals(0, paths.size());
+
+
+ // delete non-existent path
+ try {
+ storage.deleteAsync("non-existent").get();
+ fail("should throw exception");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getCause() instanceof ZKException);
+ }
+ }
+
+ @Test(timeOut = 60000)
+ public void testExistOperation() throws ExecutionException, InterruptedException {
+ Boolean exist = storage.existAsync("test-path").get();
+ assertFalse(exist);
+
+ storage.writeAsync("test-path", new ByteArrayInputStream("test".getBytes())).get();
+
+ exist = storage.existAsync("test-path").get();
+ Assert.assertTrue(exist);
+ }
+
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLInputStreamTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLInputStreamTest.java
new file mode 100644
index 0000000..5406cd2
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLInputStreamTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.eq;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.exceptions.EndOfStreamException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class DLInputStreamTest {
+ private DistributedLogManager dlm;
+ private AsyncLogReader reader;
+
+ @BeforeMethod
+ public void setup() {
+ dlm = mock(DistributedLogManager.class);
+ reader = mock(AsyncLogReader.class);
+
+ when(dlm.openAsyncLogReader(any(DLSN.class))).thenReturn(CompletableFuture.completedFuture(reader));
+ when(dlm.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
+ when(reader.readBulk(anyInt())).thenReturn(failedFuture(new EndOfStreamException("eos")));
+ when(reader.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
+ }
+
+ @AfterMethod
+ public void teardown() throws IOException {
+ if (dlm != null) {
+ dlm.close();
+ }
+ }
+
+ /**
+ * Test Case: reader hits eos (end of stream)
+ */
+ @Test
+ public void testReadEos() throws Exception {
+ OutputStream outputStream = new ByteArrayOutputStream();
+ try {
+ DLInputStream.openReaderAsync(dlm)
+ .thenCompose(d -> d.readAsync(outputStream))
+ .thenCompose(DLInputStream::closeAsync).get();
+ } catch (Exception e) {
+ if (e.getCause() instanceof EndOfStreamException) {
+ // no-op
+ } else {
+ fail(e.getMessage());
+ }
+ }
+
+ verify(dlm, times(1)).openAsyncLogReader(eq(DLSN.InitialDLSN));
+ verify(reader, times(1)).readBulk(eq(10));
+ verify(reader, times(1)).asyncClose();
+ verify(dlm, times(1)).asyncClose();
+ }
+
+
+ /**
+ * Test Case: read records from the input stream. And output it to a output stream.
+ */
+ @Test
+ public void testReadToOutputStream() {
+ // prepare test data
+ byte[] data = "test-read".getBytes();
+ LogRecordWithDLSN record = mock(LogRecordWithDLSN.class);
+ List<LogRecordWithDLSN> records = new ArrayList<LogRecordWithDLSN>();
+ records.add(record);
+
+ when(record.getPayload()).thenReturn(data);
+ when(reader.readBulk(anyInt()))
+ .thenReturn(CompletableFuture.completedFuture(records))
+ .thenReturn(failedFuture(new EndOfStreamException("eos")));
+
+
+ // test code
+ OutputStream outputStream = new ByteArrayOutputStream();
+ try {
+ DLInputStream.openReaderAsync(dlm)
+ .thenCompose(d -> d.readAsync(outputStream))
+ .thenCompose(DLInputStream::closeAsync).get();
+ } catch (Exception e) {
+ if (e.getCause() instanceof EndOfStreamException) {
+ // no-op
+ } else {
+ fail(e.getMessage());
+ }
+ }
+
+ byte[] result = ((ByteArrayOutputStream) outputStream).toByteArray();
+ assertEquals("test-read", new String(result));
+
+ }
+
+ @Test
+ public void openAsyncLogReaderFailed() {
+ when(dlm.openAsyncLogReader(any(DLSN.class))).thenReturn(failedFuture(new Exception("Open reader was failed")));
+
+ try {
+ DLInputStream.openReaderAsync(dlm).get();
+ } catch (Exception e) {
+ assertEquals(e.getCause().getMessage(), "Open reader was failed");
+ }
+ }
+
+ private <T> CompletableFuture<T> failedFuture(Throwable throwable) {
+ CompletableFuture<T> completableFuture = new CompletableFuture<>();
+ completableFuture.completeExceptionally(throwable);
+ return completableFuture;
+ }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
new file mode 100644
index 0000000..6a23357
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static org.testng.AssertJUnit.assertEquals;
+
+public class DLOutputStreamTest {
+
+ private DistributedLogManager dlm;
+ private AsyncLogWriter writer;
+
+ @BeforeMethod
+ public void setup() {
+ dlm = mock(DistributedLogManager.class);
+ writer = mock(AsyncLogWriter.class);
+
+ when(dlm.openAsyncLogWriter()).thenReturn(CompletableFuture.completedFuture(writer));
+ when(dlm.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
+ when(writer.markEndOfStream()).thenReturn(CompletableFuture.completedFuture(null));
+ when(writer.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
+ when(writer.writeBulk(any(List.class)))
+ .thenReturn(CompletableFuture.completedFuture(Collections.singletonList(DLSN.InitialDLSN))); }
+
+ @AfterMethod
+ public void teardown() throws IOException {
+ if (dlm != null) {
+ dlm.close();
+ }
+ }
+
+ /**
+ * Test Case: write data using input stream.
+ */
+ @Test
+ public void writeInputStreamData() throws ExecutionException, InterruptedException {
+ byte[] data = "test-write".getBytes();
+ DLOutputStream.openWriterAsync(dlm)
+ .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
+ .thenCompose(DLOutputStream::closeAsync)).get();
+
+ verify(writer, times(1)).writeBulk(any(List.class));
+ verify(writer, times(1)).markEndOfStream();
+ verify(writer, times(1)).asyncClose();
+ verify(dlm, times(1)).asyncClose();
+ }
+
+ /**
+ * Test Case: write data with byte array.
+ */
+ @Test
+ public void writeBytesArrayData() throws ExecutionException, InterruptedException {
+ byte[] data = "test-write".getBytes();
+ DLOutputStream.openWriterAsync(dlm)
+ .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
+ .thenCompose(DLOutputStream::closeAsync)).get();
+
+ verify(writer, times(1)).writeBulk(any(List.class));
+ verify(writer, times(1)).markEndOfStream();
+ verify(writer, times(1)).asyncClose();
+ verify(dlm, times(1)).asyncClose();
+ }
+
+ @Test
+ public void openAsyncLogWriterFailed() {
+ when(dlm.openAsyncLogWriter()).thenReturn(failedFuture(new Exception("Open writer was failed")));
+
+ try {
+ DLOutputStream.openWriterAsync(dlm).get();
+ } catch (Exception e) {
+ assertEquals(e.getCause().getMessage(), "Open writer was failed");
+ }
+ }
+
+ @Test
+ public void writeRecordFailed() {
+ when(writer.writeBulk(any(List.class)))
+ .thenReturn(failedFuture(new Exception("Write data was failed")));
+
+ byte[] data = "test-write".getBytes();
+ try {
+ DLOutputStream.openWriterAsync(dlm)
+ .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data)))
+ .thenCompose(DLOutputStream::closeAsync).get();
+ } catch (Exception e) {
+ assertEquals(e.getCause().getMessage(), "Write data was failed");
+ }
+ }
+
+ private <T> CompletableFuture<T> failedFuture(Throwable throwable) {
+ CompletableFuture<T> completableFuture = new CompletableFuture<>();
+ completableFuture.completeExceptionally(throwable);
+ return completableFuture;
+ }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
new file mode 100644
index 0000000..042e7fa
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -0,0 +1,506 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This file is derived from BookKeeperClusterTestCase from Apache BookKeeper
+ * http://bookkeeper.apache.org
+ */
+
+package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
+
+import io.netty.buffer.ByteBufAllocator;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.metastore.InMemoryMetaStore;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.AutoRecoveryMain;
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A class runs several bookie servers for testing.
+ */
+public abstract class BookKeeperClusterTestCase {
+
+ static final Logger LOG = LoggerFactory.getLogger(org.apache.bookkeeper.test.BookKeeperClusterTestCase.class);
+
+ // ZooKeeper related variables
+ protected ZooKeeperUtil zkUtil = new ZooKeeperUtil();
+ protected ZooKeeper zkc;
+
+ // BookKeeper related variables
+ protected List<File> tmpDirs = new LinkedList<File>();
+ protected List<BookieServer> bs = new LinkedList<BookieServer>();
+ protected List<ServerConfiguration> bsConfs = new LinkedList<ServerConfiguration>();
+ protected int numBookies;
+ protected BookKeeperTestClient bkc;
+
+ protected ServerConfiguration baseConf = new ServerConfiguration();
+ protected ClientConfiguration baseClientConf = new ClientConfiguration();
+
+ private final Map<BookieServer, AutoRecoveryMain> autoRecoveryProcesses = new HashMap<BookieServer, AutoRecoveryMain>();
+
+ private boolean isAutoRecoveryEnabled;
+
+ protected ExecutorService executor;
+
+ public BookKeeperClusterTestCase(int numBookies) {
+ this.numBookies = numBookies;
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ executor = Executors.newCachedThreadPool();
+ InMemoryMetaStore.reset();
+ setMetastoreImplClass(baseConf);
+ setMetastoreImplClass(baseClientConf);
+
+ try {
+ // start zookeeper service
+ startZKCluster();
+ // start bookkeeper service
+ startBKCluster();
+
+ zkc.create("/pulsar", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (Exception e) {
+ LOG.error("Error setting up", e);
+ throw e;
+ }
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ // stop bookkeeper service
+ stopBKCluster();
+ // stop zookeeper service
+ stopZKCluster();
+ executor.shutdown();
+ }
+
+ /**
+ * Start zookeeper cluster
+ *
+ * @throws Exception
+ */
+ protected void startZKCluster() throws Exception {
+ zkUtil.startServer();
+ zkc = zkUtil.getZooKeeperClient();
+ }
+
+ /**
+ * Stop zookeeper cluster
+ *
+ * @throws Exception
+ */
+ protected void stopZKCluster() throws Exception {
+ zkUtil.killServer();
+ }
+
+ /**
+ * Start cluster. Also, starts the auto recovery process for each bookie, if isAutoRecoveryEnabled is true.
+ *
+ * @throws Exception
+ */
+ protected void startBKCluster() throws Exception {
+ baseClientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+ baseClientConf.setUseV2WireProtocol(true);
+ baseClientConf.setEnableDigestTypeAutodetection(true);
+ baseClientConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
+ if (numBookies > 0) {
+ bkc = new BookKeeperTestClient(baseClientConf);
+ }
+
+ // Create Bookie Servers (B1, B2, B3)
+ for (int i = 0; i < numBookies; i++) {
+ startNewBookie();
+ }
+ }
+
+ /**
+ * Stop cluster. Also, stops all the auto recovery processes for the bookie cluster, if isAutoRecoveryEnabled is
+ * true.
+ *
+ * @throws Exception
+ */
+ protected void stopBKCluster() throws Exception {
+ if (bkc != null) {
+ bkc.close();
+ }
+
+ for (BookieServer server : bs) {
+ server.shutdown();
+ AutoRecoveryMain autoRecovery = autoRecoveryProcesses.get(server);
+ if (autoRecovery != null && isAutoRecoveryEnabled()) {
+ autoRecovery.shutdown();
+ LOG.debug("Shutdown auto recovery for bookieserver:" + server.getLocalAddress());
+ }
+ }
+ bs.clear();
+ for (File f : tmpDirs) {
+ FileUtils.deleteDirectory(f);
+ }
+ }
+
+ protected ServerConfiguration newServerConfiguration() throws Exception {
+ File f = File.createTempFile("bookie", "test");
+ tmpDirs.add(f);
+ f.delete();
+ f.mkdir();
+
+ int port = PortManager.nextFreePort();
+ return newServerConfiguration(port, zkUtil.getZooKeeperConnectString(), f, new File[] { f });
+ }
+
+ protected ClientConfiguration newClientConfiguration() {
+ return new ClientConfiguration(baseConf);
+ }
+
+ protected ServerConfiguration newServerConfiguration(int port, String zkServers, File journalDir,
+ File[] ledgerDirs) {
+ ServerConfiguration conf = new ServerConfiguration(baseConf);
+ conf.setBookiePort(port);
+ conf.setZkServers(zkServers);
+ conf.setJournalDirName(journalDir.getPath());
+ conf.setAllowLoopback(true);
+ conf.setFlushInterval(60 * 1000);
+ conf.setGcWaitTime(60 * 1000);
+ conf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
+ String[] ledgerDirNames = new String[ledgerDirs.length];
+ for (int i = 0; i < ledgerDirs.length; i++) {
+ ledgerDirNames[i] = ledgerDirs[i].getPath();
+ }
+ conf.setLedgerDirNames(ledgerDirNames);
+ conf.setLedgerStorageClass("org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage");
+ conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+ conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+ return conf;
+ }
+
+ /**
+ * Get bookie address for bookie at index
+ */
+ public BookieSocketAddress getBookie(int index) throws Exception {
+ if (bs.size() <= index || index < 0) {
+ throw new IllegalArgumentException(
+ "Invalid index, there are only " + bs.size() + " bookies. Asked for " + index);
+ }
+ return bs.get(index).getLocalAddress();
+ }
+
+ /**
+ * Kill a bookie by its socket address. Also, stops the autorecovery process for the corresponding bookie server, if
+ * isAutoRecoveryEnabled is true.
+ *
+ * @param addr
+ * Socket Address
+ * @return the configuration of killed bookie
+ * @throws InterruptedException
+ */
+ public ServerConfiguration killBookie(InetSocketAddress addr) throws Exception {
+ BookieServer toRemove = null;
+ int toRemoveIndex = 0;
+ for (BookieServer server : bs) {
+ if (server.getLocalAddress().equals(addr)) {
+ server.shutdown();
+ toRemove = server;
+ break;
+ }
+ ++toRemoveIndex;
+ }
+ if (toRemove != null) {
+ stopAutoRecoveryService(toRemove);
+ bs.remove(toRemove);
+ return bsConfs.remove(toRemoveIndex);
+ }
+ return null;
+ }
+
+ /**
+ * Kill a bookie by index. Also, stops the respective auto recovery process for this bookie, if
+ * isAutoRecoveryEnabled is true.
+ *
+ * @param index
+ * Bookie Index
+ * @return the configuration of killed bookie
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public ServerConfiguration killBookie(int index) throws Exception {
+ if (index >= bs.size()) {
+ throw new IOException("Bookie does not exist");
+ }
+ BookieServer server = bs.get(index);
+ server.shutdown();
+ stopAutoRecoveryService(server);
+ bs.remove(server);
+ return bsConfs.remove(index);
+ }
+
+ /**
+ * Sleep a bookie
+ *
+ * @param addr
+ * Socket Address
+ * @param seconds
+ * Sleep seconds
+ * @return Count Down latch which will be counted down when sleep finishes
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public CountDownLatch sleepBookie(InetSocketAddress addr, final int seconds) throws Exception {
+ for (final BookieServer bookie : bs) {
+ if (bookie.getLocalAddress().equals(addr)) {
+ final CountDownLatch l = new CountDownLatch(1);
+ Thread sleeper = new Thread() {
+ @Override
+ public void run() {
+ try {
+ bookie.suspendProcessing();
+ l.countDown();
+ Thread.sleep(seconds * 1000);
+ bookie.resumeProcessing();
+ } catch (Exception e) {
+ LOG.error("Error suspending bookie", e);
+ }
+ }
+ };
+ sleeper.start();
+ return l;
+ }
+ }
+ throw new IOException("Bookie not found");
+ }
+
+ /**
+ * Sleep a bookie until I count down the latch
+ *
+ * @param addr
+ * Socket Address
+ * @param latch
+ * Latch to wait on
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public void sleepBookie(InetSocketAddress addr, final CountDownLatch l) throws Exception {
+ for (final BookieServer bookie : bs) {
+ if (bookie.getLocalAddress().equals(addr)) {
+ Thread sleeper = new Thread() {
+ @Override
+ public void run() {
+ try {
+ bookie.suspendProcessing();
+ l.await();
+ bookie.resumeProcessing();
+ } catch (Exception e) {
+ LOG.error("Error suspending bookie", e);
+ }
+ }
+ };
+ sleeper.start();
+ return;
+ }
+ }
+ throw new IOException("Bookie not found");
+ }
+
+ /**
+ * Restart bookie servers. Also restarts all the respective auto recovery process, if isAutoRecoveryEnabled is true.
+ *
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws KeeperException
+ * @throws BookieException
+ */
+ public void restartBookies() throws Exception {
+ restartBookies(null);
+ }
+
+ /**
+ * Restart bookie servers using new configuration settings. Also restart the respective auto recovery process, if
+ * isAutoRecoveryEnabled is true.
+ *
+ * @param newConf
+ * New Configuration Settings
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws KeeperException
+ * @throws BookieException
+ */
+ public void restartBookies(ServerConfiguration newConf) throws Exception {
+ // shut down bookie server
+ for (BookieServer server : bs) {
+ server.shutdown();
+ stopAutoRecoveryService(server);
+ }
+ bs.clear();
+ Thread.sleep(1000);
+ // restart them to ensure we can't
+
+ List<ServerConfiguration> bsConfsCopy = new ArrayList<ServerConfiguration>(bsConfs);
+ bsConfs.clear();
+ for (ServerConfiguration conf : bsConfsCopy) {
+ if (null != newConf) {
+ conf.loadConf(newConf);
+ }
+ startBookie(conf);
+ }
+ }
+
+ /**
+ * Helper method to startup a new bookie server with the indicated port number. Also, starts the auto recovery
+ * process, if the isAutoRecoveryEnabled is set true.
+ *
+ * @param port
+ * Port to start the new bookie server on
+ * @throws IOException
+ */
+ public int startNewBookie() throws Exception {
+ ServerConfiguration conf = newServerConfiguration();
+ startBookie(conf);
+
+ return conf.getBookiePort();
+ }
+
+ /**
+ * Helper method to startup a bookie server using a configuration object. Also, starts the auto recovery process if
+ * isAutoRecoveryEnabled is true.
+ *
+ * @param conf
+ * Server Configuration Object
+ *
+ */
+ protected BookieServer startBookie(ServerConfiguration conf) throws Exception {
+ BookieServer server = new BookieServer(conf);
+ bsConfs.add(conf);
+ bs.add(server);
+
+ server.start();
+
+ if (bkc == null) {
+ bkc = new BookKeeperTestClient(baseClientConf);
+ }
+
+ int port = conf.getBookiePort();
+ while (bkc.getZkHandle().exists(
+ "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, false) == null) {
+ Thread.sleep(500);
+ }
+
+ bkc.readBookiesBlocking();
+ LOG.info("New bookie on port " + port + " has been created.");
+
+ return server;
+ }
+
+ /**
+ * Start a bookie with the given bookie instance. Also, starts the auto recovery for this bookie, if
+ * isAutoRecoveryEnabled is true.
+ */
+ protected BookieServer startBookie(ServerConfiguration conf, final Bookie b) throws Exception {
+ BookieServer server = new BookieServer(conf) {
+ protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator)
+ throws IOException, KeeperException, InterruptedException, BookieException {
+ return b;
+ }
+ };
+ server.start();
+
+ int port = conf.getBookiePort();
+ while (bkc.getZkHandle().exists(
+ "/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, false) == null) {
+ Thread.sleep(500);
+ }
+
+ bkc.readBookiesBlocking();
+ LOG.info("New bookie on port " + port + " has been created.");
+ return server;
+ }
+
+ public void setMetastoreImplClass(AbstractConfiguration conf) {
+ conf.setMetastoreImplClass(InMemoryMetaStore.class.getName());
+ }
+
+ /**
+ * Flags used to enable/disable the auto recovery process. If it is enabled, starting the bookie server will starts
+ * the auto recovery process for that bookie. Also, stopping bookie will stops the respective auto recovery process.
+ *
+ * @param isAutoRecoveryEnabled
+ * Value true will enable the auto recovery process. Value false will disable the auto recovery process
+ */
+ public void setAutoRecoveryEnabled(boolean isAutoRecoveryEnabled) {
+ this.isAutoRecoveryEnabled = isAutoRecoveryEnabled;
+ }
+
+ /**
+ * Flag used to check whether auto recovery process is enabled/disabled. By default the flag is false.
+ *
+ * @return true, if the auto recovery is enabled. Otherwise return false.
+ */
+ public boolean isAutoRecoveryEnabled() {
+ return isAutoRecoveryEnabled;
+ }
+
+ private void stopAutoRecoveryService(BookieServer toRemove) throws Exception {
+ AutoRecoveryMain autoRecoveryMain = autoRecoveryProcesses.remove(toRemove);
+ if (null != autoRecoveryMain && isAutoRecoveryEnabled()) {
+ autoRecoveryMain.shutdown();
+ LOG.debug("Shutdown auto recovery for bookieserver:" + toRemove.getLocalAddress());
+ }
+ }
+
+ /**
+ * Will stops all the auto recovery processes for the bookie cluster, if isAutoRecoveryEnabled is true.
+ */
+ public void stopReplicationService() throws Exception {
+ if (false == isAutoRecoveryEnabled()) {
+ return;
+ }
+ for (Entry<BookieServer, AutoRecoveryMain> autoRecoveryProcess : autoRecoveryProcesses.entrySet()) {
+ autoRecoveryProcess.getValue().shutdown();
+ LOG.debug("Shutdown Auditor Recovery for the bookie:" + autoRecoveryProcess.getKey().getLocalAddress());
+ }
+ }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
new file mode 100644
index 0000000..f1d0f08
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
+
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A class runs several bookie servers for testing.
+ */
+public abstract class MockedBookKeeperTestCase {
+
+ static final Logger LOG = LoggerFactory.getLogger(MockedBookKeeperTestCase.class);
+
+ // ZooKeeper related variables
+ protected MockZooKeeper zkc;
+
+ // BookKeeper related variables
+ protected PulsarMockBookKeeper bkc;
+ protected int numBookies;
+
+ protected ManagedLedgerFactoryImpl factory;
+
+ protected ClientConfiguration baseClientConf = new ClientConfiguration();
+
+ protected OrderedScheduler executor;
+ protected ExecutorService cachedExecutor;
+
+ public MockedBookKeeperTestCase() {
+ // By default start a 3 bookies cluster
+ this(3);
+ }
+
+ public MockedBookKeeperTestCase(int numBookies) {
+ this.numBookies = numBookies;
+ }
+
+ @BeforeMethod
+ public void setUp(Method method) throws Exception {
+ LOG.info(">>>>>> starting {}", method);
+ try {
+ // start bookkeeper service
+ startBookKeeper();
+ } catch (Exception e) {
+ LOG.error("Error setting up", e);
+ throw e;
+ }
+
+ ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
+ factory = new ManagedLedgerFactoryImpl(bkc, zkc, conf);
+
+ zkc.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown(Method method) {
+ try {
+ LOG.info("@@@@@@@@@ stopping " + method);
+ factory.shutdown();
+ factory = null;
+ stopBookKeeper();
+ stopZooKeeper();
+ LOG.info("--------- stopped {}", method);
+ } catch (Exception e) {
+ LOG.error("tearDown Error", e);
+ }
+ }
+
+ @BeforeClass
+ public void setUpClass() {
+ executor = OrderedScheduler.newSchedulerBuilder().numThreads(2).name("test").build();
+ cachedExecutor = Executors.newCachedThreadPool();
+ }
+
+ @AfterClass
+ public void tearDownClass() {
+ executor.shutdown();
+ cachedExecutor.shutdown();
+ }
+
+ /**
+ * Start cluster
+ *
+ * @throws Exception
+ */
+ protected void startBookKeeper() throws Exception {
+ zkc = MockZooKeeper.newInstance();
+ for (int i = 0; i < numBookies; i++) {
+ ZkUtils.createFullPathOptimistic(zkc, "/ledgers/available/192.168.1.1:" + (5000 + i), "".getBytes(), null,
+ null);
+ }
+
+ zkc.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(), null, null);
+
+ bkc = new PulsarMockBookKeeper(zkc, executor.chooseThread(this));
+ }
+
+ protected void stopBookKeeper() throws Exception {
+ bkc.shutdown();
+ }
+
+ protected void stopZooKeeper() throws Exception {
+ zkc.shutdown();
+ }
+
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java
new file mode 100644
index 0000000..daed597
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/PortManager.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
+
+import lombok.Cleanup;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.Inet4Address;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * Port manager allows a base port to be specified on the commandline. Tests will then use ports, counting up from this
+ * base port. This allows multiple instances of the bookkeeper tests to run at once.
+ */
+public class PortManager {
+
+ private static final String lockFilename = System.getProperty("test.lockFilename",
+ "/tmp/pulsar-test-port-manager.lock");
+ private static final int basePort = Integer.parseInt(System.getProperty("test.basePort", "15000"));
+
+ private static final int maxPort = 32000;
+
+ /**
+ * Return a TCP port that is currently unused.
+ *
+ * Keeps track of assigned ports and avoid race condition between different processes
+ */
+ public synchronized static int nextFreePort() {
+ Path path = Paths.get(lockFilename);
+
+ try {
+ @Cleanup
+ FileChannel fileChannel = FileChannel.open(path,
+ StandardOpenOption.CREATE,
+ StandardOpenOption.WRITE,
+ StandardOpenOption.READ);
+
+ @Cleanup
+ FileLock lock = fileChannel.lock();
+
+ ByteBuffer buffer = ByteBuffer.allocate(32);
+ int len = fileChannel.read(buffer, 0L);
+ buffer.flip();
+
+ int lastUsedPort = basePort;
+ if (len > 0) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ String lastUsedPortStr = new String(bytes);
+ lastUsedPort = Integer.parseInt(lastUsedPortStr);
+ }
+
+ int freePort = probeFreePort(lastUsedPort + 1);
+
+ buffer.clear();
+ buffer.put(Integer.toString(freePort).getBytes());
+ buffer.flip();
+ fileChannel.write(buffer, 0L);
+ fileChannel.truncate(buffer.position());
+ fileChannel.force(true);
+
+ return freePort;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static final int MAX_PORT_CONFLICTS = 10;
+
+ private synchronized static int probeFreePort(int port) {
+ int exceptionCount = 0;
+ while (true) {
+ if (port == maxPort) {
+ // Rollover the port probe
+ port = basePort;
+ }
+
+ try (Socket s = new Socket()) {
+ s.connect(new InetSocketAddress(Inet4Address.getLoopbackAddress(), port), 100);
+
+ // If we succeed to connect it means the port is being used
+
+ } catch (ConnectException e) {
+ return port;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ port++;
+ exceptionCount++;
+ if (exceptionCount > MAX_PORT_CONFLICTS) {
+ throw new RuntimeException("Failed to find an open port");
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ while (true) {
+ System.out.println("Port: " + nextFreePort());
+ Thread.sleep(100);
+ }
+ }
+}
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/ZooKeeperUtil.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/ZooKeeperUtil.java
new file mode 100644
index 0000000..a06134f
--- /dev/null
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/ZooKeeperUtil.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This file is derived from ZooKeeperUtil from Apache BookKeeper
+ * http://bookkeeper.apache.org
+ */
+
+package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
+
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+
+public class ZooKeeperUtil {
+ static final Logger LOG = LoggerFactory.getLogger(org.apache.bookkeeper.test.ZooKeeperUtil.class);
+
+ // ZooKeeper related variables
+ protected final static Integer zooKeeperPort = PortManager.nextFreePort();
+ private final InetSocketAddress zkaddr;
+
+ protected ZooKeeperServer zks;
+ protected ZooKeeper zkc; // zookeeper client
+ protected NIOServerCnxnFactory serverFactory;
+ protected File ZkTmpDir;
+ private final String connectString;
+
+ public ZooKeeperUtil() {
+ zkaddr = new InetSocketAddress(zooKeeperPort);
+ connectString = "localhost:" + zooKeeperPort;
+ }
+
+ public ZooKeeper getZooKeeperClient() {
+ return zkc;
+ }
+
+ public String getZooKeeperConnectString() {
+ return connectString;
+ }
+
+ public void startServer() throws Exception {
+ // create a ZooKeeper server(dataDir, dataLogDir, port)
+ LOG.debug("Running ZK server");
+ // ServerStats.registerAsConcrete();
+ ClientBase.setupTestEnv();
+ ZkTmpDir = File.createTempFile("zookeeper", "test");
+ ZkTmpDir.delete();
+ ZkTmpDir.mkdir();
+
+ zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperServer.DEFAULT_TICK_TIME);
+ serverFactory = new NIOServerCnxnFactory();
+ serverFactory.configure(zkaddr, 100);
+ serverFactory.startup(zks);
+
+ boolean b = ClientBase.waitForServerUp(getZooKeeperConnectString(), ClientBase.CONNECTION_TIMEOUT);
+ LOG.debug("Server up: " + b);
+
+ // create a zookeeper client
+ LOG.debug("Instantiate ZK Client");
+ zkc = ZooKeeperClient.newBuilder().connectString(getZooKeeperConnectString()).build();
+
+ // initialize the zk client with values
+ zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ @SuppressWarnings("deprecation")
+ public void sleepServer(final int seconds, final CountDownLatch l) throws InterruptedException, IOException {
+ Thread[] allthreads = new Thread[Thread.activeCount()];
+ Thread.enumerate(allthreads);
+ for (final Thread t : allthreads) {
+ if (t.getName().contains("SyncThread:0")) {
+ Thread sleeper = new Thread() {
+ public void run() {
+ try {
+ t.suspend();
+ l.countDown();
+ Thread.sleep(seconds * 1000);
+ t.resume();
+ } catch (Exception e) {
+ LOG.error("Error suspending thread", e);
+ }
+ }
+ };
+ sleeper.start();
+ return;
+ }
+ }
+ throw new IOException("ZooKeeper thread not found");
+ }
+
+ public void killServer() throws Exception {
+ if (zkc != null) {
+ zkc.close();
+ }
+
+ // shutdown ZK server
+ if (serverFactory != null) {
+ serverFactory.shutdown();
+ Assert.assertTrue(ClientBase.waitForServerDown(getZooKeeperConnectString(), ClientBase.CONNECTION_TIMEOUT),
+ "waiting for server down");
+ }
+ if (zks != null) {
+ zks.getTxnLogFactory().close();
+ }
+ // ServerStats.unregister();
+ FileUtils.deleteDirectory(ZkTmpDir);
+ }
+}
diff --git a/pulsar-package-management/core/pom.xml b/pulsar-package-management/core/pom.xml
index 3c4af21..cb54e1e 100644
--- a/pulsar-package-management/core/pom.xml
+++ b/pulsar-package-management/core/pom.xml
@@ -29,7 +29,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>core</artifactId>
+ <artifactId>pulsar-package-core</artifactId>
<name>Apache Pulsar :: Package Management :: Core</name>
<dependencies>
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorage.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorage.java
index e01e815..9672cf8 100644
--- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorage.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorage.java
@@ -25,6 +25,10 @@ import java.util.concurrent.CompletableFuture;
public interface PackagesStorage {
/**
+ * Initialize the packages management service with the given storage.
+ */
+ void initialize();
+ /**
* Write a input stream to a path.
*
* @param path
diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java
index 4525a97..23e9028 100644
--- a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/PackagesStorageConfiguration.java
@@ -29,7 +29,7 @@ public interface PackagesStorageConfiguration {
* property key
* @return the value
*/
- Object getProperty(String key);
+ String getProperty(String key);
/**
* Set a property with the key.
@@ -39,5 +39,5 @@ public interface PackagesStorageConfiguration {
* @param value
* property value
*/
- void setProperty(String key, Object value);
+ void setProperty(String key, String value);
}
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java
similarity index 62%
rename from pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
rename to pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java
index ada2d6c..92ff426 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageConfiguration.java
+++ b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/DefaultPackagesStorageConfiguration.java
@@ -16,20 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.packages.management.core;
+package org.apache.pulsar.packages.management.core.impl;
-import java.util.HashMap;
+import java.util.Properties;
+import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
-public class MockedPackagesStorageConfiguration implements PackagesStorageConfiguration {
- private HashMap<String, Object> properties = new HashMap<>();
+public class DefaultPackagesStorageConfiguration implements PackagesStorageConfiguration {
+ private final Properties properties = new Properties();
@Override
- public Object getProperty(String key) {
- return properties.get(key);
+ public String getProperty(String key) {
+ return (String) properties.get(key);
}
@Override
- public void setProperty(String key, Object value) {
- properties.put(key, value);
+ public void setProperty(String key, String value) {
+ properties.setProperty(key, value);
}
}
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
index c5a379e..8af3195 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
+++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java
@@ -36,6 +36,11 @@ public class MockedPackagesStorage implements PackagesStorage {
}
@Override
+ public void initialize() {
+
+ }
+
+ @Override
public CompletableFuture<Void> writeAsync(String path, InputStream inputStream) {
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java
index 940be34..98b2168 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java
+++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.packages.management.core.common;
import java.util.HashMap;
import org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException;
-import org.junit.Test;
-import org.junit.Assert;
+import org.testng.Assert;
+import org.testng.annotations.Test;
public class PackageMetadataSerdeTest {
@Test
diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
index bae08f1..ff150c4 100644
--- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
+++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
@@ -23,7 +23,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
-import org.apache.pulsar.packages.management.core.MockedPackagesStorageConfiguration;
import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.apache.pulsar.packages.management.core.PackagesManagement;
import org.apache.pulsar.packages.management.core.PackagesStorage;
@@ -43,7 +42,7 @@ public class PackagesManagementImplTest {
@BeforeClass
public static void setup() throws IOException {
PackagesStorageProvider storageProvider = PackagesStorageProvider.newProvider(MockedPackagesStorageProvider.class.getName());
- MockedPackagesStorageConfiguration packagesStorageConfiguration = new MockedPackagesStorageConfiguration();
+ DefaultPackagesStorageConfiguration packagesStorageConfiguration = new DefaultPackagesStorageConfiguration();
storage = storageProvider.getStorage(packagesStorageConfiguration);
packagesManagement = new PackagesManagementImpl();
diff --git a/pulsar-package-management/pom.xml b/pulsar-package-management/pom.xml
index 98a9db5..232a058 100644
--- a/pulsar-package-management/pom.xml
+++ b/pulsar-package-management/pom.xml
@@ -35,6 +35,7 @@
<name>Apache Pulsar :: Package Management</name>
<modules>
<module>core</module>
+ <module>bookkeeper-storage</module>
</modules>
<build>