You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/03 01:41:11 UTC
[pulsar] branch master updated: File system offload (#4403)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0e807f1 File system offload (#4403)
0e807f1 is described below
commit 0e807f15c357476059383c80870abd9b8a59826c
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Jul 3 09:41:05 2019 +0800
File system offload (#4403)
Fixes #3216
Implementation of offload to HDFS
### Motivation
Implementation of offload to HDFS
### Verifying this change
Add the test for this
---
conf/broker.conf | 6 +
.../filesystem_offload_core_site.xml | 40 ++-
.../offloaders/src/assemble/offloaders.xml | 6 +
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 +-
pom.xml | 3 +
.../{TestS3Offload.java => TestBaseOffload.java} | 78 ++---
.../integration/offload/TestFileSystemOffload.java | 57 ++++
.../tests/integration/offload/TestS3Offload.java | 288 ++----------------
.../suites/PulsarTieredStorageTestSuite.java | 21 +-
tests/integration/src/test/resources/pulsar.xml | 3 +-
...rage-suite.xml => tiered-file-system-suite.xml} | 4 +-
...d-storage.xml => tiered-filesystem-storage.xml} | 6 +-
...iered-storage.xml => tiered-jcloud-storage.xml} | 4 +-
...e-suite.xml => tiered-storage-jcloud-suite.xml} | 2 +-
tiered-storage/file-system/pom.xml | 95 ++++++
.../filesystem/FileSystemConfigurationData.java | 68 +++++
.../FileSystemLedgerOffloaderFactory.java | 40 +++
.../impl/FileStoreBackedReadHandleImpl.java | 239 +++++++++++++++
.../impl/FileSystemManagedLedgerOffloader.java | 331 +++++++++++++++++++++
.../META-INF/services/pulsar-offloader.yaml | 22 ++
.../offload/filesystem/FileStoreTestBase.java | 64 ++++
.../impl/FileSystemManagedLedgerOffloaderTest.java | 157 ++++++++++
tiered-storage/pom.xml | 1 +
23 files changed, 1182 insertions(+), 357 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 022085a..20f2576 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -742,6 +742,12 @@ gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576
# For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849
gcsManagedLedgerOffloadServiceAccountKeyFile=
+#For File System Storage, file system profile path
+fileSystemProfilePath=../conf/filesystem_offload_core_site.xml
+
+#For File System Storage, file system uri
+fileSystemURI=
+
### --- Deprecated config variables --- ###
# Deprecated. Use configurationStoreServers
diff --git a/tests/integration/src/test/resources/pulsar.xml b/conf/filesystem_offload_core_site.xml
similarity index 54%
copy from tests/integration/src/test/resources/pulsar.xml
copy to conf/filesystem_offload_core_site.xml
index c0720e0..d26cec2 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/conf/filesystem_offload_core_site.xml
@@ -18,15 +18,31 @@
under the License.
-->
-<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
-<!-- TODO: we have to put suite files in one file to avoid executing TESTNG test suites multiple times.
- see {@link https://github.com/cbeust/testng/issues/508} -->
-<suite name="Pulsar Test Suite" parallel="instances" thread-count="1">
- <suite-files>
- <suite-file path="./pulsar-cli.xml" />
- <suite-file path="./pulsar-process.xml" />
- <suite-file path="./pulsar-sql.xml" />
- <suite-file path="./pulsar-thread.xml" />
- <suite-file path="./tiered-storage.xml" />
- </suite-files>
-</suite>
+<configuration>
+ <!--file system uri, necessary-->
+ <property>
+ <name>fs.defaultFS</name>
+ <value></value>
+ </property>
+ <property>
+ <name>hadoop.tmp.dir</name>
+ <value>pulsar</value>
+ </property>
+ <property>
+ <name>io.file.buffer.size</name>
+ <value>4096</value>
+ </property>
+ <property>
+ <name>io.seqfile.compress.blocksize</name>
+ <value>1000000</value>
+ </property>
+ <property>
+ <name>io.seqfile.compression.type</name>
+ <value>BLOCK</value>
+ </property>
+ <property>
+ <name>io.map.index.interval</name>
+ <value>128</value>
+ </property>
+
+</configuration>
diff --git a/distribution/offloaders/src/assemble/offloaders.xml b/distribution/offloaders/src/assemble/offloaders.xml
index af73772..c6f4988 100644
--- a/distribution/offloaders/src/assemble/offloaders.xml
+++ b/distribution/offloaders/src/assemble/offloaders.xml
@@ -45,5 +45,11 @@
<outputDirectory>offloaders</outputDirectory>
<fileMode>644</fileMode>
</file>
+
+ <file>
+ <source>${basedir}/../../tiered-storage/file-system/target/tiered-storage-file-system-${project.version}.nar</source>
+ <outputDirectory>offloaders</outputDirectory>
+ <fileMode>644</fileMode>
+ </file>
</files>
</assembly>
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bdb062f..15b32a9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1426,8 +1426,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) {
UUID uid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
// TODO: improve this to load ledger offloader by driver name recorded in metadata
+ Map<String, String> offloadDriverMetadata = OffloadUtils.getOffloadDriverMetadata(info);
+ offloadDriverMetadata.put("ManagedLedgerName", name);
openFuture = config.getLedgerOffloader().readOffloaded(ledgerId, uid,
- OffloadUtils.getOffloadDriverMetadata(info));
+ offloadDriverMetadata);
} else {
openFuture = bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId)
.withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute();
diff --git a/pom.xml b/pom.xml
index 9874e06..06e8910 100644
--- a/pom.xml
+++ b/pom.xml
@@ -186,6 +186,9 @@ flexible messaging model and an intuitive client API.</description>
<jclouds.version>2.1.1</jclouds.version>
<sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
<mysql-jdbc.version>8.0.11</mysql-jdbc.version>
+ <hdfs-offload-version3>3.2.0</hdfs-offload-version3>
+ <org.eclipse.jetty-hdfs-offload>9.3.24.v20180605</org.eclipse.jetty-hdfs-offload>
+ <test-hdfs-offload-jetty>9.3.24.v20180605</test-hdfs-offload-jetty>
<elasticsearch.version>6.3.2</elasticsearch.version>
<presto.version>0.206</presto.version>
<flink.version>1.6.0</flink.version>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
similarity index 85%
copy from tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
copy to tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
index 6a58f93..408dd42 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
@@ -18,33 +18,25 @@
*/
package org.apache.pulsar.tests.integration.offload;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.conf.ClientConfiguration;
-
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
-import org.apache.pulsar.tests.integration.containers.S3Container;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite;
-
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-@Slf4j
-public class TestS3Offload extends PulsarTieredStorageTestSuite {
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+@Slf4j
+public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite {
private static final int ENTRY_SIZE = 1024;
private static byte[] buildEntry(String pattern) {
@@ -57,43 +49,23 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
return entry;
}
- private S3Container s3Container;
-
- @BeforeClass
- public void setupS3() {
- s3Container = new S3Container(
- pulsarCluster.getClusterName(),
- S3Container.NAME)
- .withNetwork(pulsarCluster.getNetwork())
- .withNetworkAliases(S3Container.NAME);
- s3Container.start();
- }
-
- @AfterClass
- public void teardownS3() {
- if (null != s3Container) {
- s3Container.stop();
- }
- }
-
- @Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception {
- final String tenant = "s3-offload-test-cli-" + randomName(4);
+ final String tenant = "offload-test-cli-" + randomName(4);
final String namespace = tenant + "/ns1";
final String topic = "persistent://" + namespace + "/topic1";
pulsarCluster.runAdminCommandOnAnyBroker( "tenants",
- "create", "--allowed-clusters", pulsarCluster.getClusterName(),
- "--admin-roles", "offload-admin", tenant);
+ "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+ "--admin-roles", "offload-admin", tenant);
pulsarCluster.runAdminCommandOnAnyBroker(
- "namespaces",
- "create", "--clusters", pulsarCluster.getClusterName(), namespace);
+ "namespaces",
+ "create", "--clusters", pulsarCluster.getClusterName(), namespace);
long firstLedger = -1;
try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
Producer<byte[]> producer = client.newProducer().topic(topic)
- .blockIfQueueFull(true).enableBatching(false).create();) {
+ .blockIfQueueFull(true).enableBatching(false).create();) {
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
// write enough to topic to make it roll
@@ -115,7 +87,7 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
Assert.assertTrue(output.contains("Nothing to offload"));
output = pulsarCluster.runAdminCommandOnAnyBroker( "topics",
- "offload-status", topic).getStdout();
+ "offload-status", topic).getStdout();
Assert.assertTrue(output.contains("Offload has not been run"));
// offload with a low threshold
@@ -124,7 +96,7 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
Assert.assertTrue(output.contains("Offload triggered"));
output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
- "offload-status", "-w", topic).getStdout();
+ "offload-status", "-w", topic).getStdout();
Assert.assertTrue(output.contains("Offload was a success"));
// delete the first ledger, so that we cannot possibly read from it
@@ -149,9 +121,8 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
}
}
- @Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception {
- final String tenant = "s3-offload-test-threshold-" + randomName(4);
+ final String tenant = "offload-test-threshold-" + randomName(4);
final String namespace = tenant + "/ns1";
final String topic = "persistent://" + namespace + "/topic1";
@@ -168,8 +139,8 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
long firstLedger = 0;
try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
Producer<byte[]> producer = client.newProducer().topic(topic)
- .blockIfQueueFull(true).enableBatching(false).create();
- ) {
+ .blockIfQueueFull(true).enableBatching(false).create();
+ ) {
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
@@ -222,18 +193,18 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
}
}
- private boolean ledgerOffloaded(List<LedgerInfo> ledgers, long ledgerId) {
+ private boolean ledgerOffloaded(List<PersistentTopicInternalStats.LedgerInfo> ledgers, long ledgerId) {
return ledgers.stream().filter(l -> l.ledgerId == ledgerId)
- .map(l -> l.offloaded).findFirst().get();
+ .map(l -> l.offloaded).findFirst().get();
}
private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic) throws Exception {
try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
Producer producer = client.newProducer().topic(topic)
- .blockIfQueueFull(true).enableBatching(false).create();
+ .blockIfQueueFull(true).enableBatching(false).create();
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
- List<LedgerInfo> ledgers = admin.topics().getInternalStats(topic).ledgers;
+ List<PersistentTopicInternalStats.LedgerInfo> ledgers = admin.topics().getInternalStats(topic).ledgers;
long currentLedger = ledgers.get(ledgers.size() - 1).ledgerId;
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
@@ -269,9 +240,8 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
}
}
- @Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception {
- final String tenant = "s3-offload-test-deletion-lag-" + randomName(4);
+ final String tenant = "offload-test-deletion-lag-" + randomName(4);
final String namespace = tenant + "/ns1";
final String topic = "persistent://" + namespace + "/topic1";
@@ -284,7 +254,7 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
// set threshold to offload runs immediately after role
pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
- "set-offload-threshold", "--size", "0", namespace);
+ "set-offload-threshold", "--size", "0", namespace);
String output = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-offload-deletion-lag", namespace).getStdout();
@@ -297,7 +267,7 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace,
- "--lag", "0m");
+ "--lag", "0m");
output = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-offload-deletion-lag", namespace).getStdout();
Assert.assertTrue(output.contains("0 minute(s)"));
@@ -325,6 +295,4 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
Thread.sleep(5000);
Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
}
-
-
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java
new file mode 100644
index 0000000..c0ddfbc
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.offload;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class TestFileSystemOffload extends TestBaseOffload {
+
+ @Test(dataProvider = "ServiceAndAdminUrls")
+ public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception {
+ super.testPublishOffloadAndConsumeViaCLI(serviceUrl, adminUrl);
+ }
+
+ @Test(dataProvider = "ServiceAndAdminUrls")
+ public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception {
+ super.testPublishOffloadAndConsumeViaThreshold(serviceUrl, adminUrl);
+ }
+
+ @Test(dataProvider = "ServiceAndAdminUrls")
+ public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception {
+ super.testPublishOffloadAndConsumeDeletionLag(serviceUrl, adminUrl);
+
+ }
+
+
+ @Override
+ protected Map<String, String> getEnv() {
+ Map<String, String> result = new HashMap<>();
+ result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
+ result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+ result.put("managedLedgerOffloadDriver", "filesystem");
+ result.put("fileSystemURI", "file:///");
+
+ return result;
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
index 6a58f93..0631664 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
@@ -18,54 +18,27 @@
*/
package org.apache.pulsar.tests.integration.offload;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeperAdmin;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
import org.apache.pulsar.tests.integration.containers.S3Container;
-import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite;
-
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
-public class TestS3Offload extends PulsarTieredStorageTestSuite {
-
- private static final int ENTRY_SIZE = 1024;
-
- private static byte[] buildEntry(String pattern) {
- byte[] entry = new byte[ENTRY_SIZE];
- byte[] patternBytes = pattern.getBytes();
-
- for (int i = 0; i < entry.length; i++) {
- entry[i] = patternBytes[i % patternBytes.length];
- }
- return entry;
- }
+public class TestS3Offload extends TestBaseOffload {
private S3Container s3Container;
@BeforeClass
public void setupS3() {
s3Container = new S3Container(
- pulsarCluster.getClusterName(),
- S3Container.NAME)
- .withNetwork(pulsarCluster.getNetwork())
- .withNetworkAliases(S3Container.NAME);
+ pulsarCluster.getClusterName(),
+ S3Container.NAME)
+ .withNetwork(pulsarCluster.getNetwork())
+ .withNetworkAliases(S3Container.NAME);
s3Container.start();
}
@@ -78,252 +51,31 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
@Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception {
- final String tenant = "s3-offload-test-cli-" + randomName(4);
- final String namespace = tenant + "/ns1";
- final String topic = "persistent://" + namespace + "/topic1";
-
- pulsarCluster.runAdminCommandOnAnyBroker( "tenants",
- "create", "--allowed-clusters", pulsarCluster.getClusterName(),
- "--admin-roles", "offload-admin", tenant);
-
- pulsarCluster.runAdminCommandOnAnyBroker(
- "namespaces",
- "create", "--clusters", pulsarCluster.getClusterName(), namespace);
-
- long firstLedger = -1;
- try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
- Producer<byte[]> producer = client.newProducer().topic(topic)
- .blockIfQueueFull(true).enableBatching(false).create();) {
- client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
-
- // write enough to topic to make it roll
- int i = 0;
- for (; i < ENTRIES_PER_LEDGER * 1.5; i++) {
- producer.sendAsync(buildEntry("offload-message" + i));
- }
- producer.flush();
- }
-
- try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
- // read managed ledger info, check ledgers exist
- firstLedger = admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
-
- // first offload with a high threshold, nothing should offload
-
- String output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
- "offload", "--size-threshold", "100G", topic).getStdout();
- Assert.assertTrue(output.contains("Nothing to offload"));
-
- output = pulsarCluster.runAdminCommandOnAnyBroker( "topics",
- "offload-status", topic).getStdout();
- Assert.assertTrue(output.contains("Offload has not been run"));
-
- // offload with a low threshold
- output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
- "offload", "--size-threshold", "1M", topic).getStdout();
- Assert.assertTrue(output.contains("Offload triggered"));
-
- output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
- "offload-status", "-w", topic).getStdout();
- Assert.assertTrue(output.contains("Offload was a success"));
-
- // delete the first ledger, so that we cannot possibly read from it
- ClientConfiguration bkConf = new ClientConfiguration();
- bkConf.setZkServers(pulsarCluster.getZKConnString());
- try (BookKeeper bk = new BookKeeper(bkConf)) {
- bk.deleteLedger(firstLedger);
- }
-
- // Unload topic to clear all caches, open handles, etc
- admin.topics().unload(topic);
- }
-
- log.info("Read back the data (which would be in that first ledger)");
- try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
- Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
- // read back from topic
- for (int i = 0; i < ENTRIES_PER_LEDGER * 1.5; i++) {
- Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
- Assert.assertEquals(buildEntry("offload-message" + i), m.getData());
- }
- }
+ super.testPublishOffloadAndConsumeViaCLI(serviceUrl, adminUrl);
}
@Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception {
- final String tenant = "s3-offload-test-threshold-" + randomName(4);
- final String namespace = tenant + "/ns1";
- final String topic = "persistent://" + namespace + "/topic1";
-
- pulsarCluster.runAdminCommandOnAnyBroker("tenants",
- "create", "--allowed-clusters", pulsarCluster.getClusterName(),
- "--admin-roles", "offload-admin", tenant);
-
- pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
- "create", "--clusters", pulsarCluster.getClusterName(), namespace);
-
- pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
- "set-offload-threshold", "--size", "1M", namespace);
-
- long firstLedger = 0;
- try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
- Producer<byte[]> producer = client.newProducer().topic(topic)
- .blockIfQueueFull(true).enableBatching(false).create();
- ) {
-
- client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
-
- // write enough to topic to make it roll twice
- for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
- producer.sendAsync(buildEntry("offload-message" + i));
- }
-
- producer.flush();
- }
-
- try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
- firstLedger = admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
-
- // wait up to 30 seconds for offload to occur
- for (int i = 0; i < 300 && !admin.topics().getInternalStats(topic).ledgers.get(0).offloaded; i++) {
- Thread.sleep(100);
- }
- Assert.assertTrue(admin.topics().getInternalStats(topic).ledgers.get(0).offloaded);
-
- // delete the first ledger, so that we cannot possibly read from it
- ClientConfiguration bkConf = new ClientConfiguration();
- bkConf.setZkServers(pulsarCluster.getZKConnString());
- try (BookKeeper bk = new BookKeeper(bkConf)) {
- bk.deleteLedger(firstLedger);
- }
-
- // Unload topic to clear all caches, open handles, etc
- admin.topics().unload(topic);
- }
-
- log.info("Read back the data (which would be in that first ledger)");
- try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
- Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
- // read back from topic
- for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
- Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
- Assert.assertEquals(buildEntry("offload-message" + i), m.getData());
- }
- }
-
- // try disabling
- pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
- "set-offload-threshold", "--size", "-1", namespace);
-
- // hard to validate that it has been disabled as we'd be waiting for
- // something _not_ to happen (i.e. waiting for ages), so just check
- try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
- Assert.assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1L);
- }
- }
-
- private boolean ledgerOffloaded(List<LedgerInfo> ledgers, long ledgerId) {
- return ledgers.stream().filter(l -> l.ledgerId == ledgerId)
- .map(l -> l.offloaded).findFirst().get();
- }
-
- private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic) throws Exception {
- try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
- Producer producer = client.newProducer().topic(topic)
- .blockIfQueueFull(true).enableBatching(false).create();
- PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
-
- List<LedgerInfo> ledgers = admin.topics().getInternalStats(topic).ledgers;
- long currentLedger = ledgers.get(ledgers.size() - 1).ledgerId;
-
- client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
-
- // write enough to topic to make it roll twice
- for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
- producer.sendAsync(buildEntry("offload-message" + i));
- }
- producer.send(buildEntry("final-offload-message"));
-
- // wait up to 30 seconds for offload to occur
- for (int i = 0;
- i < 300 && !ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers, currentLedger);
- i++) {
- Thread.sleep(100);
- }
- Assert.assertTrue(ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers, currentLedger));
-
- return currentLedger;
- }
- }
-
- public boolean ledgerExistsInBookKeeper(long ledgerId) throws Exception {
- ClientConfiguration bkConf = new ClientConfiguration();
- bkConf.setZkServers(pulsarCluster.getZKConnString());
- try (BookKeeperAdmin bk = new BookKeeperAdmin(bkConf)) {
- try {
- bk.openLedger(ledgerId).close();
- return true;
- } catch (BKException.BKNoSuchLedgerExistsException e) {
- return false;
- }
- }
+ super.testPublishOffloadAndConsumeViaThreshold(serviceUrl, adminUrl);
}
@Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception {
- final String tenant = "s3-offload-test-deletion-lag-" + randomName(4);
- final String namespace = tenant + "/ns1";
- final String topic = "persistent://" + namespace + "/topic1";
-
- pulsarCluster.runAdminCommandOnAnyBroker("tenants",
- "create", "--allowed-clusters", pulsarCluster.getClusterName(),
- "--admin-roles", "offload-admin", tenant);
-
- pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
- "create", "--clusters", pulsarCluster.getClusterName(), namespace);
+ super.testPublishOffloadAndConsumeDeletionLag(serviceUrl, adminUrl);
- // set threshold to offload runs immediately after role
- pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
- "set-offload-threshold", "--size", "0", namespace);
-
- String output = pulsarCluster.runAdminCommandOnAnyBroker(
- "namespaces", "get-offload-deletion-lag", namespace).getStdout();
- Assert.assertTrue(output.contains("Unset for namespace"));
-
- long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
- // give it up to 5 seconds to delete, it shouldn't
- // so we wait this every time
- Thread.sleep(5000);
- Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
-
- pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace,
- "--lag", "0m");
- output = pulsarCluster.runAdminCommandOnAnyBroker(
- "namespaces", "get-offload-deletion-lag", namespace).getStdout();
- Assert.assertTrue(output.contains("0 minute(s)"));
-
- offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
- // wait up to 10 seconds for ledger to be deleted
- for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); i++) {
- writeAndWaitForOffload(serviceUrl, adminUrl, topic);
- Thread.sleep(1000);
- }
- Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger));
-
- pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "clear-offload-deletion-lag", namespace);
-
- Thread.sleep(5); // wait 5 seconds to allow broker to see update
+ }
- output = pulsarCluster.runAdminCommandOnAnyBroker(
- "namespaces", "get-offload-deletion-lag", namespace).getStdout();
- Assert.assertTrue(output.contains("Unset for namespace"));
- offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+ @Override
+ protected Map<String, String> getEnv() {
+ Map<String, String> result = new HashMap<>();
+ result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
+ result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+ result.put("managedLedgerOffloadDriver", "s3");
+ result.put("s3ManagedLedgerOffloadBucket", "pulsar-integtest");
+ result.put("s3ManagedLedgerOffloadServiceEndpoint", "http://" + S3Container.NAME + ":9090");
- // give it up to 5 seconds to delete, it shouldn't
- // so we wait this every time
- Thread.sleep(5000);
- Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
+ return result;
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
index d248dce..67fe145 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
@@ -18,12 +18,8 @@
*/
package org.apache.pulsar.tests.integration.suites;
-import static java.util.stream.Collectors.joining;
-
-import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
-import org.apache.pulsar.tests.integration.containers.S3Container;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
@@ -31,8 +27,13 @@ import org.testng.ITest;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.joining;
+
@Slf4j
-public class PulsarTieredStorageTestSuite extends PulsarClusterTestBase implements ITest {
+public abstract class PulsarTieredStorageTestSuite extends PulsarClusterTestBase implements ITest {
protected static final int ENTRIES_PER_LEDGER = 1024;
@@ -51,15 +52,9 @@ public class PulsarTieredStorageTestSuite extends PulsarClusterTestBase implemen
log.info("Setting up cluster {} with {} bookies, {} brokers",
spec.clusterName(), spec.numBookies(), spec.numBrokers());
-
pulsarCluster = PulsarCluster.forSpec(spec);
-
for(BrokerContainer brokerContainer : pulsarCluster.getBrokers()){
- brokerContainer.withEnv("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
- brokerContainer.withEnv("managedLedgerMinLedgerRolloverTimeMinutes", "0");
- brokerContainer.withEnv("managedLedgerOffloadDriver", "s3");
- brokerContainer.withEnv("s3ManagedLedgerOffloadBucket", "pulsar-integtest");
- brokerContainer.withEnv("s3ManagedLedgerOffloadServiceEndpoint", "http://" + S3Container.NAME + ":9090");
+ getEnv().forEach(brokerContainer::withEnv);
}
pulsarCluster.start();
@@ -77,4 +72,6 @@ public class PulsarTieredStorageTestSuite extends PulsarClusterTestBase implemen
public String getTestName() {
return "tiered-storage-test-suite";
}
+
+ protected abstract Map<String, String> getEnv();
}
diff --git a/tests/integration/src/test/resources/pulsar.xml b/tests/integration/src/test/resources/pulsar.xml
index c0720e0..79bf744 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -27,6 +27,7 @@
<suite-file path="./pulsar-process.xml" />
<suite-file path="./pulsar-sql.xml" />
<suite-file path="./pulsar-thread.xml" />
- <suite-file path="./tiered-storage.xml" />
+ <suite-file path="./tiered-jcloud-storage.xml" />
+ <suite-file path="./tiered-filesystem-storage.xml"/>
</suite-files>
</suite>
diff --git a/tests/integration/src/test/resources/tiered-storage-suite.xml b/tests/integration/src/test/resources/tiered-file-system-suite.xml
similarity index 87%
copy from tests/integration/src/test/resources/tiered-storage-suite.xml
copy to tests/integration/src/test/resources/tiered-file-system-suite.xml
index 5b299b7..e5ec338 100644
--- a/tests/integration/src/test/resources/tiered-storage-suite.xml
+++ b/tests/integration/src/test/resources/tiered-file-system-suite.xml
@@ -21,8 +21,8 @@
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
<!-- TODO: we have to put suite files in one file to avoid executing TESTNG test suites multiple times.
see {@link https://github.com/cbeust/testng/issues/508} -->
-<suite name="Pulsar Test Suite" parallel="instances" thread-count="1">
+<suite name="Pulsar (Jcloud Tiered Storage)Test Suite" parallel="instances" thread-count="1">
<suite-files>
- <suite-file path="./tiered-storage.xml" />
+ <suite-file path="./tiered-filesystem-storage.xml" />
</suite-files>
</suite>
diff --git a/tests/integration/src/test/resources/tiered-storage.xml b/tests/integration/src/test/resources/tiered-filesystem-storage.xml
similarity index 82%
copy from tests/integration/src/test/resources/tiered-storage.xml
copy to tests/integration/src/test/resources/tiered-filesystem-storage.xml
index 8cbdaa7..b6279e3 100644
--- a/tests/integration/src/test/resources/tiered-storage.xml
+++ b/tests/integration/src/test/resources/tiered-filesystem-storage.xml
@@ -19,10 +19,10 @@
-->
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
-<suite name="Pulsar (Tiered Storage) Integration Tests" verbose="2" annotations="JDK">
- <test name="tiered-storage-test-suite" preserve-order="true">
+<suite name="Pulsar (Filesystem Tiered Storage) Integration Tests" verbose="2" annotations="JDK">
+ <test name="tiered-storage-filesystem-test-suite" preserve-order="true">
<classes>
- <class name="org.apache.pulsar.tests.integration.offload.TestS3Offload" />
+ <class name="org.apache.pulsar.tests.integration.offload.TestFileSystemOffload" />
</classes>
</test>
</suite>
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/tiered-storage.xml b/tests/integration/src/test/resources/tiered-jcloud-storage.xml
similarity index 85%
rename from tests/integration/src/test/resources/tiered-storage.xml
rename to tests/integration/src/test/resources/tiered-jcloud-storage.xml
index 8cbdaa7..a727144 100644
--- a/tests/integration/src/test/resources/tiered-storage.xml
+++ b/tests/integration/src/test/resources/tiered-jcloud-storage.xml
@@ -19,8 +19,8 @@
-->
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
-<suite name="Pulsar (Tiered Storage) Integration Tests" verbose="2" annotations="JDK">
- <test name="tiered-storage-test-suite" preserve-order="true">
+<suite name="Pulsar (Jcloud Tiered Storage) Integration Tests" verbose="2" annotations="JDK">
+ <test name="tiered-storage-jcloud-test-suite" preserve-order="true">
<classes>
<class name="org.apache.pulsar.tests.integration.offload.TestS3Offload" />
</classes>
diff --git a/tests/integration/src/test/resources/tiered-storage-suite.xml b/tests/integration/src/test/resources/tiered-storage-jcloud-suite.xml
similarity index 95%
rename from tests/integration/src/test/resources/tiered-storage-suite.xml
rename to tests/integration/src/test/resources/tiered-storage-jcloud-suite.xml
index 5b299b7..bd49803 100644
--- a/tests/integration/src/test/resources/tiered-storage-suite.xml
+++ b/tests/integration/src/test/resources/tiered-storage-jcloud-suite.xml
@@ -23,6 +23,6 @@
see {@link https://github.com/cbeust/testng/issues/508} -->
<suite name="Pulsar Test Suite" parallel="instances" thread-count="1">
<suite-files>
- <suite-file path="./tiered-storage.xml" />
+ <suite-file path="./tiered-jcloud-storage.xml" />
</suite-files>
</suite>
diff --git a/tiered-storage/file-system/pom.xml b/tiered-storage/file-system/pom.xml
new file mode 100644
index 0000000..9e76101
--- /dev/null
+++ b/tiered-storage/file-system/pom.xml
@@ -0,0 +1,95 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>tiered-storage-parent</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>tiered-storage-file-system</artifactId>
+ <name>Apache Pulsar :: Tiered Storage :: File System</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>managed-ledger-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>${hdfs-offload-version3}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hdfs-offload-version3}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf3.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>managed-ledger-original</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hdfs-offload-version3}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>${org.eclipse.jetty-hdfs-offload}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>${org.eclipse.jetty-hdfs-offload}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ <version>${org.eclipse.jetty-hdfs-offload}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java
new file mode 100644
index 0000000..899887b
--- /dev/null
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Properties;
+
+import static org.apache.pulsar.common.util.FieldParser.value;
+
+/**
+ * Configuration for file system.
+ */
+@Data
+public class FileSystemConfigurationData implements Serializable, Cloneable {
+
+ /**** --- Ledger Offloading --- ****/
+ // Driver to use to offload old data to long term storage
+ private String managedLedgerOffloadDriver = null;
+
+ private String fileSystemProfilePath = null;
+
+ private String fileSystemURI = null;
+
+ private int managedLedgerOffloadMaxThreads = 2;
+
+ /**
+ * Create a tiered storage configuration from the provided <tt>properties</tt>.
+ *
+ * @param properties the configuration properties
+ * @return tiered storage configuration
+ */
+ public static FileSystemConfigurationData create(Properties properties) {
+ FileSystemConfigurationData data = new FileSystemConfigurationData();
+ Field[] fields = FileSystemConfigurationData.class.getDeclaredFields();
+ Arrays.stream(fields).forEach(f -> {
+ if (properties.containsKey(f.getName())) {
+ try {
+ f.setAccessible(true);
+ f.set(data, value((String) properties.get(f.getName()), f));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("failed to initialize %s field while setting value %s",
+ f.getName(), properties.get(f.getName())), e);
+ }
+ }
+ });
+ return data;
+ }
+}
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java
new file mode 100644
index 0000000..cd52197
--- /dev/null
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem;
+
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+public class FileSystemLedgerOffloaderFactory implements LedgerOffloaderFactory<FileSystemManagedLedgerOffloader> {
+ @Override
+ public boolean isDriverSupported(String driverName) {
+ return FileSystemManagedLedgerOffloader.driverSupported(driverName);
+ }
+
+ @Override
+ public FileSystemManagedLedgerOffloader create(Properties properties, Map<String, String> userMetadata, OrderedScheduler scheduler) throws IOException {
+ FileSystemConfigurationData data = FileSystemConfigurationData.create(properties);
+ return FileSystemManagedLedgerOffloader.create(data, scheduler);
+ }
+}
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
new file mode 100644
index 0000000..b1663d2
--- /dev/null
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem.impl;
+
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.DataFormats;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class FileStoreBackedReadHandleImpl implements ReadHandle {
+ private static final Logger log = LoggerFactory.getLogger(FileStoreBackedReadHandleImpl.class);
+ private final ExecutorService executor;
+ private final MapFile.Reader reader;
+ private final long ledgerId;
+ private final LedgerMetadata ledgerMetadata;
+
+ private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId) throws IOException {
+ this.ledgerId = ledgerId;
+ this.executor = executor;
+ this.reader = reader;
+ LongWritable key = new LongWritable();
+ BytesWritable value = new BytesWritable();
+ try {
+ key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX);
+ reader.get(key, value);
+ this.ledgerMetadata = parseLedgerMetadata(value.copyBytes());
+ } catch (IOException e) {
+ log.error("Fail to read LedgerMetadata for ledgerId {}",
+ ledgerId);
+ throw new IOException("Fail to read LedgerMetadata for ledgerId " + key.get());
+ }
+ }
+
+ @Override
+ public long getId() {
+ return ledgerId;
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata() {
+ return ledgerMetadata;
+
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ executor.submit(() -> {
+ try {
+ reader.close();
+ } catch (IOException t) {
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
+ if (log.isDebugEnabled()) {
+ log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+ }
+ CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+ executor.submit(() -> {
+ if (firstEntry > lastEntry
+ || firstEntry < 0
+ || lastEntry > getLastAddConfirmed()) {
+ promise.completeExceptionally(new BKException.BKIncorrectParameterException());
+ return;
+ }
+ long entriesToRead = (lastEntry - firstEntry) + 1;
+ List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+ long nextExpectedId = firstEntry;
+ LongWritable key = new LongWritable();
+ BytesWritable value = new BytesWritable();
+ try {
+ key.set(nextExpectedId - 1);
+ reader.seek(key);
+ while (entriesToRead > 0) {
+ reader.next(key, value);
+ int length = value.getLength();
+ long entryId = key.get();
+ if (entryId == nextExpectedId) {
+ ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
+ entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
+ buf.writeBytes(value.copyBytes());
+ entriesToRead--;
+ nextExpectedId++;
+ } else if (entryId > lastEntry) {
+ log.info("Expected to read {}, but read {}, which is greater than last entry {}",
+ nextExpectedId, entryId, lastEntry);
+ throw new BKException.BKUnexpectedConditionException();
+ }
+ }
+ promise.complete(LedgerEntriesImpl.create(entries));
+ } catch (Throwable t) {
+ promise.completeExceptionally(t);
+ entries.forEach(LedgerEntry::close);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
+ return readAsync(firstEntry, lastEntry);
+ }
+
+ @Override
+ public CompletableFuture<Long> readLastAddConfirmedAsync() {
+ return CompletableFuture.completedFuture(getLastAddConfirmed());
+ }
+
+ @Override
+ public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+ return CompletableFuture.completedFuture(getLastAddConfirmed());
+ }
+
+ @Override
+ public long getLastAddConfirmed() {
+ return getLedgerMetadata().getLastEntryId();
+ }
+
+ @Override
+ public long getLength() {
+ return getLedgerMetadata().getLength();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return getLedgerMetadata().isClosed();
+ }
+
+ @Override
+ public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
+ long timeOutInMillis,
+ boolean parallel) {
+ CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>();
+ promise.completeExceptionally(new UnsupportedOperationException());
+ return promise;
+ }
+
+ public static ReadHandle open(ScheduledExecutorService executor, MapFile.Reader reader, long ledgerId) throws IOException {
+ return new FileStoreBackedReadHandleImpl(executor, reader, ledgerId);
+ }
+
+ private static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws IOException {
+ DataFormats.LedgerMetadataFormat ledgerMetadataFormat = DataFormats.LedgerMetadataFormat.newBuilder().mergeFrom(bytes).build();
+ LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
+ .withLastEntryId(ledgerMetadataFormat.getLastEntryId())
+ .withPassword(ledgerMetadataFormat.getPassword().toByteArray())
+ .withClosedState()
+ .withMetadataFormatVersion(2)
+ .withLength(ledgerMetadataFormat.getLength())
+ .withAckQuorumSize(ledgerMetadataFormat.getAckQuorumSize())
+ .withCreationTime(ledgerMetadataFormat.getCtime())
+ .withWriteQuorumSize(ledgerMetadataFormat.getQuorumSize())
+ .withEnsembleSize(ledgerMetadataFormat.getEnsembleSize());
+ ledgerMetadataFormat.getSegmentList().forEach(segment -> {
+ ArrayList<BookieSocketAddress> addressArrayList = new ArrayList<>();
+ segment.getEnsembleMemberList().forEach(address -> {
+ try {
+ addressArrayList.add(new BookieSocketAddress(address));
+ } catch (IOException e) {
+ log.error("Exception when create BookieSocketAddress. ", e);
+ }
+ });
+ builder.newEnsembleEntry(segment.getFirstEntryId(), addressArrayList);
+ });
+
+ if (ledgerMetadataFormat.getCustomMetadataCount() > 0) {
+ Map<String, byte[]> customMetadata = Maps.newHashMap();
+ ledgerMetadataFormat.getCustomMetadataList().forEach(
+ entry -> customMetadata.put(entry.getKey(), entry.getValue().toByteArray()));
+ builder.withCustomMetadata(customMetadata);
+ }
+
+ switch (ledgerMetadataFormat.getDigestType()) {
+ case HMAC:
+ builder.withDigestType(DigestType.MAC);
+ break;
+ case CRC32:
+ builder.withDigestType(DigestType.CRC32);
+ break;
+ case CRC32C:
+ builder.withDigestType(DigestType.CRC32C);
+ break;
+ case DUMMY:
+ builder.withDigestType(DigestType.DUMMY);
+ break;
+ default:
+ throw new IllegalArgumentException("Unable to convert digest type " + ledgerMetadataFormat.getDigestType());
+ }
+
+ return builder.build();
+ }
+
+}
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
new file mode 100644
index 0000000..309076c
--- /dev/null
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.ByteString;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemConfigurationData;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.DataFormats;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+
+public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
+
+ private static final Logger log = LoggerFactory.getLogger(FileSystemManagedLedgerOffloader.class);
+ private static final String STORAGE_BASE_PATH = "storageBasePath";
+ private static final String DRIVER_NAMES = "filesystem";
+ private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";
+ static final long METADATA_KEY_INDEX = -1;
+ private final Configuration configuration;
+ private final String driverName;
+ private final String storageBasePath;
+ private final FileSystem fileSystem;
+ private OrderedScheduler scheduler;
+ private static final long ENTRIES_PER_READ = 100;
+ private OrderedScheduler assignmentScheduler;
+ public static boolean driverSupported(String driver) {
+ return DRIVER_NAMES.equals(driver);
+ }
+ @Override
+ public String getOffloadDriverName() {
+ return driverName;
+ }
+
+ public static FileSystemManagedLedgerOffloader create(FileSystemConfigurationData conf, OrderedScheduler scheduler) throws IOException {
+ return new FileSystemManagedLedgerOffloader(conf, scheduler);
+ }
+
+ private FileSystemManagedLedgerOffloader(FileSystemConfigurationData conf, OrderedScheduler scheduler) throws IOException {
+ this.configuration = new Configuration();
+ if (conf.getFileSystemProfilePath() != null) {
+ String[] paths = conf.getFileSystemProfilePath().split(",");
+ for (int i =0 ; i < paths.length; i++) {
+ configuration.addResource(new Path(paths[i]));
+ }
+ }
+ if (!"".equals(conf.getFileSystemURI()) && conf.getFileSystemURI() != null) {
+ configuration.set("fs.defaultFS", conf.getFileSystemURI());
+ }
+ if (configuration.get("fs.hdfs.impl") == null) {
+ this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ }
+
+ if (configuration.get("fs.file.impl") == null) {
+ this.configuration.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+ }
+
+ this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
+ this.driverName = conf.getManagedLedgerOffloadDriver();
+ this.storageBasePath = configuration.get("hadoop.tmp.dir");
+ this.scheduler = scheduler;
+ this.fileSystem = FileSystem.get(configuration);
+ this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
+ .numThreads(conf.getManagedLedgerOffloadMaxThreads())
+ .name("offload-assignment").build();
+ }
+ @VisibleForTesting
+ public FileSystemManagedLedgerOffloader(FileSystemConfigurationData conf, OrderedScheduler scheduler, String testHDFSPath, String baseDir) throws IOException {
+ this.configuration = new Configuration();
+ this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ this.configuration.set("fs.defaultFS", testHDFSPath);
+ this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
+ this.driverName = conf.getManagedLedgerOffloadDriver();
+ this.configuration.set("hadoop.tmp.dir", baseDir);
+ this.storageBasePath = baseDir;
+ this.scheduler = scheduler;
+ this.fileSystem = FileSystem.get(configuration);
+ this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
+ .numThreads(conf.getManagedLedgerOffloadMaxThreads())
+ .name("offload-assignment").build();
+ }
+
+ @Override
+ public Map<String, String> getOffloadDriverMetadata() {
+ String path = storageBasePath == null ? "null" : storageBasePath;
+ return ImmutableMap.of(
+ STORAGE_BASE_PATH, path
+ );
+ }
+
+ /*
+ * ledgerMetadata stored in an index of -1
+ * */
+ @Override
+ public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ scheduler.chooseThread(readHandle.getId()).submit(new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration, assignmentScheduler));
+ return promise;
+ }
+
+ private static class LedgerReader implements Runnable {
+
+ private final ReadHandle readHandle;
+ private final UUID uuid;
+ private final Map<String, String> extraMetadata;
+ private final CompletableFuture<Void> promise;
+ private final String storageBasePath;
+ private final Configuration configuration;
+ volatile Exception fileSystemWriteException = null;
+ private OrderedScheduler assignmentScheduler;
+
+ private LedgerReader(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata, CompletableFuture<Void> promise,
+ String storageBasePath, Configuration configuration, OrderedScheduler assignmentScheduler) {
+ this.readHandle = readHandle;
+ this.uuid = uuid;
+ this.extraMetadata = extraMetadata;
+ this.promise = promise;
+ this.storageBasePath = storageBasePath;
+ this.configuration = configuration;
+ this.assignmentScheduler = assignmentScheduler;
+ }
+
+ @Override
+ public void run() {
+ if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) {
+ promise.completeExceptionally(
+ new IllegalArgumentException("An empty or open ledger should never be offloaded"));
+ return;
+ }
+ long ledgerId = readHandle.getId();
+ String storagePath = getStoragePath(storageBasePath, extraMetadata.get(MANAGED_LEDGER_NAME));
+ String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
+ LongWritable key = new LongWritable();
+ BytesWritable value = new BytesWritable();
+ try {
+ MapFile.Writer dataWriter = new MapFile.Writer(configuration,
+ new Path(dataFilePath),
+ MapFile.Writer.keyClass(LongWritable.class),
+ MapFile.Writer.valueClass(BytesWritable.class));
+ //store the ledgerMetadata in -1 index
+ key.set(METADATA_KEY_INDEX);
+ byte[] ledgerMetadata = buildLedgerMetadataFormat(readHandle.getLedgerMetadata());
+ value.set(buildLedgerMetadataFormat(readHandle.getLedgerMetadata()), 0, ledgerMetadata.length);
+ dataWriter.append(key, value);
+ AtomicLong haveOffloadEntryNumber = new AtomicLong(0);
+ long needToOffloadFirstEntryNumber = 0;
+ CountDownLatch countDownLatch;
+ do {
+ long end = Math.min(needToOffloadFirstEntryNumber + ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed());
+ log.debug("read ledger entries. start: {}, end: {}", needToOffloadFirstEntryNumber, end);
+ LedgerEntries ledgerEntriesOnce = readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
+ countDownLatch = new CountDownLatch(1);
+ assignmentScheduler.chooseThread(ledgerId).submit(new FileSystemWriter(ledgerEntriesOnce, dataWriter,
+ countDownLatch, haveOffloadEntryNumber, this)).addListener(() -> {}, Executors.newSingleThreadExecutor());
+ needToOffloadFirstEntryNumber = end + 1;
+ } while (needToOffloadFirstEntryNumber - 1 != readHandle.getLastAddConfirmed() && fileSystemWriteException == null);
+ countDownLatch.await();
+ if (fileSystemWriteException != null) {
+ throw fileSystemWriteException;
+ }
+ IOUtils.closeStream(dataWriter);
+ promise.complete(null);
+ } catch (Exception e) {
+ log.error("Exception when get CompletableFuture<LedgerEntries> : ManagerLedgerName: {}, " +
+ "LedgerId: {}, UUID: {} ", extraMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, e);
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ promise.completeExceptionally(e);
+ }
+ }
+ }
+
+ private static class FileSystemWriter implements Runnable {
+
+ private final LedgerEntries ledgerEntriesOnce;
+
+ private final LongWritable key = new LongWritable();
+ private final BytesWritable value = new BytesWritable();
+
+ private final MapFile.Writer dataWriter;
+ private final CountDownLatch countDownLatch;
+ private final AtomicLong haveOffloadEntryNumber;
+ private final LedgerReader ledgerReader;
+
+
+ private FileSystemWriter(LedgerEntries ledgerEntriesOnce, MapFile.Writer dataWriter,
+ CountDownLatch countDownLatch, AtomicLong haveOffloadEntryNumber, LedgerReader ledgerReader) {
+ this.ledgerEntriesOnce = ledgerEntriesOnce;
+ this.dataWriter = dataWriter;
+ this.countDownLatch = countDownLatch;
+ this.haveOffloadEntryNumber = haveOffloadEntryNumber;
+ this.ledgerReader = ledgerReader;
+ }
+
+ @Override
+ public void run() {
+ if (ledgerReader.fileSystemWriteException == null) {
+ Iterator<LedgerEntry> iterator = ledgerEntriesOnce.iterator();
+ while (iterator.hasNext()) {
+ LedgerEntry entry = iterator.next();
+ long entryId = entry.getEntryId();
+ key.set(entryId);
+ try {
+ value.set(entry.getEntryBytes(), 0, entry.getEntryBytes().length);
+ dataWriter.append(key, value);
+ } catch (IOException e) {
+ ledgerReader.fileSystemWriteException = e;
+ break;
+ }
+ haveOffloadEntryNumber.incrementAndGet();
+ }
+ }
+ countDownLatch.countDown();
+ }
+ }
+
+ @Override
+ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid, Map<String, String> offloadDriverMetadata) {
+
+ CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
+ String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME));
+ String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
+ scheduler.chooseThread(ledgerId).submit(() -> {
+ try {
+ MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath),
+ configuration);
+ promise.complete(FileStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), reader, ledgerId));
+ } catch (Throwable t) {
+ log.error("Failed to open FileStoreBackedReadHandleImpl: ManagerLedgerName: {}, " +
+ "LegerId: {}, UUID: {}", offloadDriverMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, t);
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ private static String getStoragePath(String storageBasePath, String managedLedgerName) {
+ return storageBasePath == null ? managedLedgerName + "/" : storageBasePath + "/" + managedLedgerName + "/";
+ }
+
+ private static String getDataFilePath(String storagePath, long ledgerId, UUID uuid) {
+ return storagePath + ledgerId + "-" + uuid.toString();
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
+ String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME));
+ String dataFilePath = getDataFilePath(storagePath, ledgerId, uid);
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ try {
+ fileSystem.delete(new Path(dataFilePath), true);
+ promise.complete(null);
+ } catch (IOException e) {
+ log.error("Failed to delete Offloaded: ", e);
+ promise.completeExceptionally(e);
+ }
+ return promise;
+ }
+
+ private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
+ DataFormats.LedgerMetadataFormat.Builder builder = DataFormats.LedgerMetadataFormat.newBuilder();
+ builder.setQuorumSize(metadata.getWriteQuorumSize())
+ .setAckQuorumSize(metadata.getAckQuorumSize())
+ .setEnsembleSize(metadata.getEnsembleSize())
+ .setLength(metadata.getLength())
+ .setState(metadata.isClosed() ? DataFormats.LedgerMetadataFormat.State.CLOSED : DataFormats.LedgerMetadataFormat.State.OPEN)
+ .setLastEntryId(metadata.getLastEntryId())
+ .setCtime(metadata.getCtime())
+ .setDigestType(BookKeeper.DigestType.toProtoDigestType(
+ BookKeeper.DigestType.fromApiDigestType(metadata.getDigestType())));
+
+ for (Map.Entry<String, byte[]> e : metadata.getCustomMetadata().entrySet()) {
+ builder.addCustomMetadataBuilder()
+ .setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue()));
+ }
+
+ for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : metadata.getAllEnsembles().entrySet()) {
+ builder.addSegmentBuilder()
+ .setFirstEntryId(e.getKey())
+ .addAllEnsembleMember(e.getValue().stream().map(a -> a.toString()).collect(Collectors.toList()));
+ }
+
+ return builder.build().toByteArray();
+ }
+}
diff --git a/tiered-storage/file-system/src/main/resources/META-INF/services/pulsar-offloader.yaml b/tiered-storage/file-system/src/main/resources/META-INF/services/pulsar-offloader.yaml
new file mode 100644
index 0000000..d7f7ed2
--- /dev/null
+++ b/tiered-storage/file-system/src/main/resources/META-INF/services/pulsar-offloader.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: filesystem
+description: fileSystem based offloader implementation
+offloaderFactoryClass: org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory
diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
new file mode 100644
index 0000000..4abff16
--- /dev/null
+++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem;
+
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Properties;
+
+public class FileStoreTestBase {
+ protected FileSystemManagedLedgerOffloader fileSystemManagedLedgerOffloader;
+ protected OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
+ protected final String basePath = "pulsar";
+ private MiniDFSCluster hdfsCluster;
+ private String hdfsURI;
+
+ @BeforeMethod
+ public void start() throws Exception {
+ File baseDir = Files.createTempDirectory(basePath).toFile().getAbsoluteFile();
+ Configuration conf = new Configuration();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+ hdfsCluster = builder.build();
+
+ hdfsURI = "hdfs://localhost:"+ hdfsCluster.getNameNodePort() + "/";
+ Properties properties = new Properties();
+ fileSystemManagedLedgerOffloader = new FileSystemManagedLedgerOffloader(
+ FileSystemConfigurationData.create(properties),
+ scheduler, hdfsURI, basePath);
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ hdfsCluster.shutdown(true, true);
+ hdfsCluster.close();
+ }
+
+ public String getURI() {
+ return hdfsURI;
+ }
+}
diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
new file mode 100644
index 0000000..e03c5e1
--- /dev/null
+++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem.impl;
+
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.offload.filesystem.FileStoreTestBase;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
+ private final PulsarMockBookKeeper bk;
+ private String topic = "public/default/persistent/testOffload";
+ private String storagePath = createStoragePath(topic);
+ private LedgerHandle lh;
+ private ReadHandle toWrite;
+ private final int numberOfEntries = 601;
+ private Map<String, String> map = new HashMap<>();
+
+ public FileSystemManagedLedgerOffloaderTest() throws Exception {
+ this.bk = new PulsarMockBookKeeper(createMockZooKeeper(), scheduler.chooseThread(this));
+ this.toWrite = buildReadHandle();
+ map.put("ManagedLedgerName", topic);
+ }
+
+ private static MockZooKeeper createMockZooKeeper() throws Exception {
+ MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
+ List<ACL> dummyAclList = new ArrayList<ACL>(0);
+
+ ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
+ "".getBytes(UTF_8), dummyAclList, CreateMode.PERSISTENT);
+
+ zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(UTF_8), dummyAclList,
+ CreateMode.PERSISTENT);
+ return zk;
+ }
+
+ private ReadHandle buildReadHandle() throws Exception {
+
+ lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "foobar".getBytes());
+
+ int i = 0;
+ int blocksWritten = 1;
+ while (blocksWritten <= numberOfEntries) {
+ byte[] entry = ("foobar"+i).getBytes();
+ blocksWritten++;
+ lh.addEntry(entry);
+ i++;
+ }
+ lh.close();
+
+ return bk.newOpenLedgerOp().withLedgerId(lh.getId())
+ .withPassword("foobar".getBytes()).withDigestType(DigestType.CRC32).execute().get();
+ }
+
+ @Test
+ public void testOffloadAndRead() throws Exception {
+ LedgerOffloader offloader = fileSystemManagedLedgerOffloader;
+ UUID uuid = UUID.randomUUID();
+ offloader.offload(toWrite, uuid, map).get();
+ ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, map).get();
+ Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
+ LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1);
+ LedgerEntries toWriteEntries = toWrite.read(0,numberOfEntries - 1);
+ Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();
+ Iterator<LedgerEntry> toWriteIter = toWriteEntries.iterator();
+ while(toTestIter.hasNext()) {
+ LedgerEntry toWriteEntry = toWriteIter.next();
+ LedgerEntry toTestEntry = toTestIter.next();
+
+ Assert.assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId());
+ Assert.assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
+ Assert.assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
+ Assert.assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer());
+ }
+ toTestEntries = toTest.read(1, numberOfEntries - 1);
+ toWriteEntries = toWrite.read(1,numberOfEntries - 1);
+ toTestIter = toTestEntries.iterator();
+ toWriteIter = toWriteEntries.iterator();
+ while(toTestIter.hasNext()) {
+ LedgerEntry toWriteEntry = toWriteIter.next();
+ LedgerEntry toTestEntry = toTestIter.next();
+
+ Assert.assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId());
+ Assert.assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
+ Assert.assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
+ Assert.assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer());
+ }
+ }
+
+ @Test
+ public void testDeleteOffload() throws Exception {
+ LedgerOffloader offloader = fileSystemManagedLedgerOffloader;
+ UUID uuid = UUID.randomUUID();
+ offloader.offload(toWrite, uuid, map).get();
+ Configuration configuration = new Configuration();
+ FileSystem fileSystem = FileSystem.get(new URI(getURI()), configuration);
+ Assert.assertEquals(true, fileSystem.exists(new Path(createDataFilePath(storagePath, lh.getId(), uuid))));
+ Assert.assertEquals(true, fileSystem.exists(new Path(createIndexFilePath(storagePath, lh.getId(), uuid))));
+ offloader.deleteOffloaded(lh.getId(), uuid, map);
+ Assert.assertEquals(false, fileSystem.exists(new Path(createDataFilePath(storagePath, lh.getId(), uuid))));
+ Assert.assertEquals(false, fileSystem.exists(new Path(createIndexFilePath(storagePath, lh.getId(), uuid))));
+ }
+
+ private String createStoragePath(String managedLedgerName) {
+ return basePath == null ? managedLedgerName + "/" : basePath + "/" + managedLedgerName + "/";
+ }
+
+ private String createIndexFilePath(String storagePath, long ledgerId, UUID uuid) {
+ return storagePath + ledgerId + "-" + uuid + "/index";
+ }
+
+ private String createDataFilePath(String storagePath, long ledgerId, UUID uuid) {
+ return storagePath + ledgerId + "-" + uuid + "/data";
+ }
+}
diff --git a/tiered-storage/pom.xml b/tiered-storage/pom.xml
index b8390ad..fc4cd51 100644
--- a/tiered-storage/pom.xml
+++ b/tiered-storage/pom.xml
@@ -38,5 +38,6 @@
<modules>
<module>jcloud</module>
+ <module>file-system</module>
</modules>
</project>