You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/03 01:41:11 UTC

[pulsar] branch master updated: File system offload (#4403)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e807f1  File system offload (#4403)
0e807f1 is described below

commit 0e807f15c357476059383c80870abd9b8a59826c
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Jul 3 09:41:05 2019 +0800

    File system offload (#4403)
    
    Fixes #3216
    Implementation of offload to HDFS
    
    ### Motivation
    Implementation of offload to HDFS
    
    ### Verifying this change
    Add the test for this
---
 conf/broker.conf                                   |   6 +
 .../filesystem_offload_core_site.xml               |  40 ++-
 .../offloaders/src/assemble/offloaders.xml         |   6 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   4 +-
 pom.xml                                            |   3 +
 .../{TestS3Offload.java => TestBaseOffload.java}   |  78 ++---
 .../integration/offload/TestFileSystemOffload.java |  57 ++++
 .../tests/integration/offload/TestS3Offload.java   | 288 ++----------------
 .../suites/PulsarTieredStorageTestSuite.java       |  21 +-
 tests/integration/src/test/resources/pulsar.xml    |   3 +-
 ...rage-suite.xml => tiered-file-system-suite.xml} |   4 +-
 ...d-storage.xml => tiered-filesystem-storage.xml} |   6 +-
 ...iered-storage.xml => tiered-jcloud-storage.xml} |   4 +-
 ...e-suite.xml => tiered-storage-jcloud-suite.xml} |   2 +-
 tiered-storage/file-system/pom.xml                 |  95 ++++++
 .../filesystem/FileSystemConfigurationData.java    |  68 +++++
 .../FileSystemLedgerOffloaderFactory.java          |  40 +++
 .../impl/FileStoreBackedReadHandleImpl.java        | 239 +++++++++++++++
 .../impl/FileSystemManagedLedgerOffloader.java     | 331 +++++++++++++++++++++
 .../META-INF/services/pulsar-offloader.yaml        |  22 ++
 .../offload/filesystem/FileStoreTestBase.java      |  64 ++++
 .../impl/FileSystemManagedLedgerOffloaderTest.java | 157 ++++++++++
 tiered-storage/pom.xml                             |   1 +
 23 files changed, 1182 insertions(+), 357 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 022085a..20f2576 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -742,6 +742,12 @@ gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576
 # For more details, see the "Service Accounts" section of https://support.google.com/googleapi/answer/6158849
 gcsManagedLedgerOffloadServiceAccountKeyFile=
 
+#For File System Storage, file system profile path
+fileSystemProfilePath=../conf/filesystem_offload_core_site.xml
+
+#For File System Storage, file system uri
+fileSystemURI=
+
 ### --- Deprecated config variables --- ###
 
 # Deprecated. Use configurationStoreServers
diff --git a/tests/integration/src/test/resources/pulsar.xml b/conf/filesystem_offload_core_site.xml
similarity index 54%
copy from tests/integration/src/test/resources/pulsar.xml
copy to conf/filesystem_offload_core_site.xml
index c0720e0..d26cec2 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/conf/filesystem_offload_core_site.xml
@@ -18,15 +18,31 @@
     under the License.
 
 -->
-<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
-<!-- TODO: we have to put suite files in one file to avoid executing TESTNG test suites multiple times.
-           see {@link https://github.com/cbeust/testng/issues/508} -->
-<suite name="Pulsar Test Suite" parallel="instances" thread-count="1">
-    <suite-files>
-        <suite-file path="./pulsar-cli.xml" />
-        <suite-file path="./pulsar-process.xml" />
-        <suite-file path="./pulsar-sql.xml" />
-        <suite-file path="./pulsar-thread.xml" />
-        <suite-file path="./tiered-storage.xml" />
-    </suite-files>
-</suite>
+<configuration>
+    <!--file system uri, necessary-->
+    <property>
+        <name>fs.defaultFS</name>
+        <value></value>
+    </property>
+    <property>
+        <name>hadoop.tmp.dir</name>
+        <value>pulsar</value>
+    </property>
+    <property>
+        <name>io.file.buffer.size</name>
+        <value>4096</value>
+    </property>
+    <property>
+        <name>io.seqfile.compress.blocksize</name>
+        <value>1000000</value>
+    </property>
+    <property>
+        <name>io.seqfile.compression.type</name>
+        <value>BLOCK</value>
+    </property>
+    <property>
+        <name>io.map.index.interval</name>
+        <value>128</value>
+    </property>
+
+</configuration>
diff --git a/distribution/offloaders/src/assemble/offloaders.xml b/distribution/offloaders/src/assemble/offloaders.xml
index af73772..c6f4988 100644
--- a/distribution/offloaders/src/assemble/offloaders.xml
+++ b/distribution/offloaders/src/assemble/offloaders.xml
@@ -45,5 +45,11 @@
       <outputDirectory>offloaders</outputDirectory>
       <fileMode>644</fileMode>
     </file>
+
+    <file>
+      <source>${basedir}/../../tiered-storage/file-system/target/tiered-storage-file-system-${project.version}.nar</source>
+      <outputDirectory>offloaders</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
   </files>
 </assembly>
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bdb062f..15b32a9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1426,8 +1426,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) {
                 UUID uid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
                 // TODO: improve this to load ledger offloader by driver name recorded in metadata
+                Map<String, String> offloadDriverMetadata = OffloadUtils.getOffloadDriverMetadata(info);
+                offloadDriverMetadata.put("ManagedLedgerName", name);
                 openFuture = config.getLedgerOffloader().readOffloaded(ledgerId, uid,
-                        OffloadUtils.getOffloadDriverMetadata(info));
+                        offloadDriverMetadata);
             } else {
                 openFuture = bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId)
                         .withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute();
diff --git a/pom.xml b/pom.xml
index 9874e06..06e8910 100644
--- a/pom.xml
+++ b/pom.xml
@@ -186,6 +186,9 @@ flexible messaging model and an intuitive client API.</description>
     <jclouds.version>2.1.1</jclouds.version>
     <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
     <mysql-jdbc.version>8.0.11</mysql-jdbc.version>
+    <hdfs-offload-version3>3.2.0</hdfs-offload-version3>
+    <org.eclipse.jetty-hdfs-offload>9.3.24.v20180605</org.eclipse.jetty-hdfs-offload>
+    <test-hdfs-offload-jetty>9.3.24.v20180605</test-hdfs-offload-jetty>
     <elasticsearch.version>6.3.2</elasticsearch.version>
     <presto.version>0.206</presto.version>
     <flink.version>1.6.0</flink.version>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
similarity index 85%
copy from tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
copy to tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
index 6a58f93..408dd42 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java
@@ -18,33 +18,25 @@
  */
 package org.apache.pulsar.tests.integration.offload;
 
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ClientConfiguration;
-
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
-import org.apache.pulsar.tests.integration.containers.S3Container;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite;
-
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
 
-@Slf4j
-public class TestS3Offload extends PulsarTieredStorageTestSuite {
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+@Slf4j
+public abstract class TestBaseOffload extends PulsarTieredStorageTestSuite {
     private static final int ENTRY_SIZE = 1024;
 
     private static byte[] buildEntry(String pattern) {
@@ -57,43 +49,23 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
         return entry;
     }
 
-    private S3Container s3Container;
-
-    @BeforeClass
-    public void setupS3() {
-        s3Container = new S3Container(
-            pulsarCluster.getClusterName(),
-            S3Container.NAME)
-            .withNetwork(pulsarCluster.getNetwork())
-            .withNetworkAliases(S3Container.NAME);
-        s3Container.start();
-    }
-
-    @AfterClass
-    public void teardownS3() {
-        if (null != s3Container) {
-            s3Container.stop();
-        }
-    }
-
-    @Test(dataProvider =  "ServiceAndAdminUrls")
     public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception {
-        final String tenant = "s3-offload-test-cli-" + randomName(4);
+        final String tenant = "offload-test-cli-" + randomName(4);
         final String namespace = tenant + "/ns1";
         final String topic = "persistent://" + namespace + "/topic1";
 
         pulsarCluster.runAdminCommandOnAnyBroker( "tenants",
-            "create", "--allowed-clusters", pulsarCluster.getClusterName(),
-            "--admin-roles", "offload-admin", tenant);
+                "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+                "--admin-roles", "offload-admin", tenant);
 
         pulsarCluster.runAdminCommandOnAnyBroker(
-             "namespaces",
-            "create", "--clusters", pulsarCluster.getClusterName(), namespace);
+                "namespaces",
+                "create", "--clusters", pulsarCluster.getClusterName(), namespace);
 
         long firstLedger = -1;
         try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
             Producer<byte[]> producer = client.newProducer().topic(topic)
-                .blockIfQueueFull(true).enableBatching(false).create();) {
+                    .blockIfQueueFull(true).enableBatching(false).create();) {
             client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
 
             // write enough to topic to make it roll
@@ -115,7 +87,7 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
             Assert.assertTrue(output.contains("Nothing to offload"));
 
             output = pulsarCluster.runAdminCommandOnAnyBroker( "topics",
-                "offload-status", topic).getStdout();
+                    "offload-status", topic).getStdout();
             Assert.assertTrue(output.contains("Offload has not been run"));
 
             // offload with a low threshold
@@ -124,7 +96,7 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
             Assert.assertTrue(output.contains("Offload triggered"));
 
             output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
-                "offload-status", "-w", topic).getStdout();
+                    "offload-status", "-w", topic).getStdout();
             Assert.assertTrue(output.contains("Offload was a success"));
 
             // delete the first ledger, so that we cannot possibly read from it
@@ -149,9 +121,8 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
         }
     }
 
-    @Test(dataProvider =  "ServiceAndAdminUrls")
     public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception {
-        final String tenant = "s3-offload-test-threshold-" + randomName(4);
+        final String tenant = "offload-test-threshold-" + randomName(4);
         final String namespace = tenant + "/ns1";
         final String topic = "persistent://" + namespace + "/topic1";
 
@@ -168,8 +139,8 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
         long firstLedger = 0;
         try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
             Producer<byte[]> producer = client.newProducer().topic(topic)
-                .blockIfQueueFull(true).enableBatching(false).create();
-            ) {
+                    .blockIfQueueFull(true).enableBatching(false).create();
+        ) {
 
             client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
 
@@ -222,18 +193,18 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
         }
     }
 
-    private boolean ledgerOffloaded(List<LedgerInfo> ledgers, long ledgerId) {
+    private boolean ledgerOffloaded(List<PersistentTopicInternalStats.LedgerInfo> ledgers, long ledgerId) {
         return ledgers.stream().filter(l -> l.ledgerId == ledgerId)
-            .map(l -> l.offloaded).findFirst().get();
+                .map(l -> l.offloaded).findFirst().get();
     }
 
     private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic) throws Exception {
         try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
             Producer producer = client.newProducer().topic(topic)
-                .blockIfQueueFull(true).enableBatching(false).create();
+                    .blockIfQueueFull(true).enableBatching(false).create();
             PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
 
-            List<LedgerInfo> ledgers = admin.topics().getInternalStats(topic).ledgers;
+            List<PersistentTopicInternalStats.LedgerInfo> ledgers = admin.topics().getInternalStats(topic).ledgers;
             long currentLedger = ledgers.get(ledgers.size() - 1).ledgerId;
 
             client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
@@ -269,9 +240,8 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
         }
     }
 
-    @Test(dataProvider =  "ServiceAndAdminUrls")
     public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception {
-        final String tenant = "s3-offload-test-deletion-lag-" + randomName(4);
+        final String tenant = "offload-test-deletion-lag-" + randomName(4);
         final String namespace = tenant + "/ns1";
         final String topic = "persistent://" + namespace + "/topic1";
 
@@ -284,7 +254,7 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
 
         // set threshold to offload runs immediately after role
         pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
-                                                 "set-offload-threshold", "--size", "0", namespace);
+                "set-offload-threshold", "--size", "0", namespace);
 
         String output = pulsarCluster.runAdminCommandOnAnyBroker(
                 "namespaces", "get-offload-deletion-lag", namespace).getStdout();
@@ -297,7 +267,7 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
         Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
 
         pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace,
-                                                 "--lag", "0m");
+                "--lag", "0m");
         output = pulsarCluster.runAdminCommandOnAnyBroker(
                 "namespaces", "get-offload-deletion-lag", namespace).getStdout();
         Assert.assertTrue(output.contains("0 minute(s)"));
@@ -325,6 +295,4 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
         Thread.sleep(5000);
         Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
     }
-
-
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java
new file mode 100644
index 0000000..c0ddfbc
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestFileSystemOffload.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.offload;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class TestFileSystemOffload extends TestBaseOffload {
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception {
+        super.testPublishOffloadAndConsumeViaCLI(serviceUrl, adminUrl);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception {
+        super.testPublishOffloadAndConsumeViaThreshold(serviceUrl, adminUrl);
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception {
+        super.testPublishOffloadAndConsumeDeletionLag(serviceUrl, adminUrl);
+
+    }
+
+
+    @Override
+    protected Map<String, String> getEnv() {
+        Map<String, String> result = new HashMap<>();
+        result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
+        result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+        result.put("managedLedgerOffloadDriver", "filesystem");
+        result.put("fileSystemURI", "file:///");
+
+        return result;
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
index 6a58f93..0631664 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
@@ -18,54 +18,27 @@
  */
 package org.apache.pulsar.tests.integration.offload;
 
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeperAdmin;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
 import org.apache.pulsar.tests.integration.containers.S3Container;
-import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite;
-
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
-public class TestS3Offload extends PulsarTieredStorageTestSuite {
-
-    private static final int ENTRY_SIZE = 1024;
-
-    private static byte[] buildEntry(String pattern) {
-        byte[] entry = new byte[ENTRY_SIZE];
-        byte[] patternBytes = pattern.getBytes();
-
-        for (int i = 0; i < entry.length; i++) {
-            entry[i] = patternBytes[i % patternBytes.length];
-        }
-        return entry;
-    }
+public class TestS3Offload extends TestBaseOffload {
 
     private S3Container s3Container;
 
     @BeforeClass
     public void setupS3() {
         s3Container = new S3Container(
-            pulsarCluster.getClusterName(),
-            S3Container.NAME)
-            .withNetwork(pulsarCluster.getNetwork())
-            .withNetworkAliases(S3Container.NAME);
+                pulsarCluster.getClusterName(),
+                S3Container.NAME)
+                .withNetwork(pulsarCluster.getNetwork())
+                .withNetworkAliases(S3Container.NAME);
         s3Container.start();
     }
 
@@ -78,252 +51,31 @@ public class TestS3Offload extends PulsarTieredStorageTestSuite {
 
     @Test(dataProvider =  "ServiceAndAdminUrls")
     public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception {
-        final String tenant = "s3-offload-test-cli-" + randomName(4);
-        final String namespace = tenant + "/ns1";
-        final String topic = "persistent://" + namespace + "/topic1";
-
-        pulsarCluster.runAdminCommandOnAnyBroker( "tenants",
-            "create", "--allowed-clusters", pulsarCluster.getClusterName(),
-            "--admin-roles", "offload-admin", tenant);
-
-        pulsarCluster.runAdminCommandOnAnyBroker(
-             "namespaces",
-            "create", "--clusters", pulsarCluster.getClusterName(), namespace);
-
-        long firstLedger = -1;
-        try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Producer<byte[]> producer = client.newProducer().topic(topic)
-                .blockIfQueueFull(true).enableBatching(false).create();) {
-            client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
-
-            // write enough to topic to make it roll
-            int i = 0;
-            for (; i < ENTRIES_PER_LEDGER * 1.5; i++) {
-                producer.sendAsync(buildEntry("offload-message" + i));
-            }
-            producer.flush();
-        }
-
-        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
-            // read managed ledger info, check ledgers exist
-            firstLedger = admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
-
-            // first offload with a high threshold, nothing should offload
-
-            String output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
-                    "offload", "--size-threshold", "100G", topic).getStdout();
-            Assert.assertTrue(output.contains("Nothing to offload"));
-
-            output = pulsarCluster.runAdminCommandOnAnyBroker( "topics",
-                "offload-status", topic).getStdout();
-            Assert.assertTrue(output.contains("Offload has not been run"));
-
-            // offload with a low threshold
-            output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
-                    "offload", "--size-threshold", "1M", topic).getStdout();
-            Assert.assertTrue(output.contains("Offload triggered"));
-
-            output = pulsarCluster.runAdminCommandOnAnyBroker("topics",
-                "offload-status", "-w", topic).getStdout();
-            Assert.assertTrue(output.contains("Offload was a success"));
-
-            // delete the first ledger, so that we cannot possibly read from it
-            ClientConfiguration bkConf = new ClientConfiguration();
-            bkConf.setZkServers(pulsarCluster.getZKConnString());
-            try (BookKeeper bk = new BookKeeper(bkConf)) {
-                bk.deleteLedger(firstLedger);
-            }
-
-            // Unload topic to clear all caches, open handles, etc
-            admin.topics().unload(topic);
-        }
-
-        log.info("Read back the data (which would be in that first ledger)");
-        try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
-            // read back from topic
-            for (int i = 0; i < ENTRIES_PER_LEDGER * 1.5; i++) {
-                Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
-                Assert.assertEquals(buildEntry("offload-message" + i), m.getData());
-            }
-        }
+        super.testPublishOffloadAndConsumeViaCLI(serviceUrl, adminUrl);
     }
 
     @Test(dataProvider =  "ServiceAndAdminUrls")
     public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception {
-        final String tenant = "s3-offload-test-threshold-" + randomName(4);
-        final String namespace = tenant + "/ns1";
-        final String topic = "persistent://" + namespace + "/topic1";
-
-        pulsarCluster.runAdminCommandOnAnyBroker("tenants",
-                "create", "--allowed-clusters", pulsarCluster.getClusterName(),
-                "--admin-roles", "offload-admin", tenant);
-
-        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
-                "create", "--clusters", pulsarCluster.getClusterName(), namespace);
-
-        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
-                "set-offload-threshold", "--size", "1M", namespace);
-
-        long firstLedger = 0;
-        try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Producer<byte[]> producer = client.newProducer().topic(topic)
-                .blockIfQueueFull(true).enableBatching(false).create();
-            ) {
-
-            client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
-
-            // write enough to topic to make it roll twice
-            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
-                producer.sendAsync(buildEntry("offload-message" + i));
-            }
-
-            producer.flush();
-        }
-
-        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
-            firstLedger = admin.topics().getInternalStats(topic).ledgers.get(0).ledgerId;
-
-            // wait up to 30 seconds for offload to occur
-            for (int i = 0; i < 300 && !admin.topics().getInternalStats(topic).ledgers.get(0).offloaded; i++) {
-                Thread.sleep(100);
-            }
-            Assert.assertTrue(admin.topics().getInternalStats(topic).ledgers.get(0).offloaded);
-
-            // delete the first ledger, so that we cannot possibly read from it
-            ClientConfiguration bkConf = new ClientConfiguration();
-            bkConf.setZkServers(pulsarCluster.getZKConnString());
-            try (BookKeeper bk = new BookKeeper(bkConf)) {
-                bk.deleteLedger(firstLedger);
-            }
-
-            // Unload topic to clear all caches, open handles, etc
-            admin.topics().unload(topic);
-        }
-
-        log.info("Read back the data (which would be in that first ledger)");
-        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
-             Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
-            // read back from topic
-            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
-                Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
-                Assert.assertEquals(buildEntry("offload-message" + i), m.getData());
-            }
-        }
-
-        // try disabling
-        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
-                "set-offload-threshold", "--size", "-1", namespace);
-
-        // hard to validate that it has been disabled as we'd be waiting for
-        // something _not_ to happen (i.e. waiting for ages), so just check
-        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
-            Assert.assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1L);
-        }
-    }
-
-    private boolean ledgerOffloaded(List<LedgerInfo> ledgers, long ledgerId) {
-        return ledgers.stream().filter(l -> l.ledgerId == ledgerId)
-            .map(l -> l.offloaded).findFirst().get();
-    }
-
-    private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic) throws Exception {
-        try(PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Producer producer = client.newProducer().topic(topic)
-                .blockIfQueueFull(true).enableBatching(false).create();
-            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
-
-            List<LedgerInfo> ledgers = admin.topics().getInternalStats(topic).ledgers;
-            long currentLedger = ledgers.get(ledgers.size() - 1).ledgerId;
-
-            client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
-
-            // write enough to topic to make it roll twice
-            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
-                producer.sendAsync(buildEntry("offload-message" + i));
-            }
-            producer.send(buildEntry("final-offload-message"));
-
-            // wait up to 30 seconds for offload to occur
-            for (int i = 0;
-                 i < 300 && !ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers, currentLedger);
-                 i++) {
-                Thread.sleep(100);
-            }
-            Assert.assertTrue(ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers, currentLedger));
-
-            return currentLedger;
-        }
-    }
-
-    public boolean ledgerExistsInBookKeeper(long ledgerId) throws Exception {
-        ClientConfiguration bkConf = new ClientConfiguration();
-        bkConf.setZkServers(pulsarCluster.getZKConnString());
-        try (BookKeeperAdmin bk = new BookKeeperAdmin(bkConf)) {
-            try {
-                bk.openLedger(ledgerId).close();
-                return true;
-            } catch (BKException.BKNoSuchLedgerExistsException e) {
-                return false;
-            }
-        }
+        super.testPublishOffloadAndConsumeViaThreshold(serviceUrl, adminUrl);
     }
 
     @Test(dataProvider =  "ServiceAndAdminUrls")
     public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception {
-        final String tenant = "s3-offload-test-deletion-lag-" + randomName(4);
-        final String namespace = tenant + "/ns1";
-        final String topic = "persistent://" + namespace + "/topic1";
-
-        pulsarCluster.runAdminCommandOnAnyBroker("tenants",
-                "create", "--allowed-clusters", pulsarCluster.getClusterName(),
-                "--admin-roles", "offload-admin", tenant);
-
-        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
-                "create", "--clusters", pulsarCluster.getClusterName(), namespace);
+        super.testPublishOffloadAndConsumeDeletionLag(serviceUrl, adminUrl);
 
-        // set threshold to offload runs immediately after role
-        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
-                                                 "set-offload-threshold", "--size", "0", namespace);
-
-        String output = pulsarCluster.runAdminCommandOnAnyBroker(
-                "namespaces", "get-offload-deletion-lag", namespace).getStdout();
-        Assert.assertTrue(output.contains("Unset for namespace"));
-
-        long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
-        // give it up to 5 seconds to delete, it shouldn't
-        // so we wait this every time
-        Thread.sleep(5000);
-        Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
-
-        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace,
-                                                 "--lag", "0m");
-        output = pulsarCluster.runAdminCommandOnAnyBroker(
-                "namespaces", "get-offload-deletion-lag", namespace).getStdout();
-        Assert.assertTrue(output.contains("0 minute(s)"));
-
-        offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
-        // wait up to 10 seconds for ledger to be deleted
-        for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); i++) {
-            writeAndWaitForOffload(serviceUrl, adminUrl, topic);
-            Thread.sleep(1000);
-        }
-        Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger));
-
-        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "clear-offload-deletion-lag", namespace);
-
-        Thread.sleep(5); // wait 5 seconds to allow broker to see update
+    }
 
-        output = pulsarCluster.runAdminCommandOnAnyBroker(
-                "namespaces", "get-offload-deletion-lag", namespace).getStdout();
-        Assert.assertTrue(output.contains("Unset for namespace"));
 
-        offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+    @Override
+    protected Map<String, String> getEnv() {
+        Map<String, String> result = new HashMap<>();
+        result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
+        result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
+        result.put("managedLedgerOffloadDriver", "s3");
+        result.put("s3ManagedLedgerOffloadBucket", "pulsar-integtest");
+        result.put("s3ManagedLedgerOffloadServiceEndpoint", "http://" + S3Container.NAME + ":9090");
 
-        // give it up to 5 seconds to delete, it shouldn't
-        // so we wait this every time
-        Thread.sleep(5000);
-        Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
+        return result;
     }
 
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
index d248dce..67fe145 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java
@@ -18,12 +18,8 @@
  */
 package org.apache.pulsar.tests.integration.suites;
 
-import static java.util.stream.Collectors.joining;
-
-import java.util.stream.Stream;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
-import org.apache.pulsar.tests.integration.containers.S3Container;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
@@ -31,8 +27,13 @@ import org.testng.ITest;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.joining;
+
 @Slf4j
-public class PulsarTieredStorageTestSuite extends PulsarClusterTestBase implements ITest {
+public abstract class PulsarTieredStorageTestSuite extends PulsarClusterTestBase implements ITest {
 
     protected static final int ENTRIES_PER_LEDGER = 1024;
 
@@ -51,15 +52,9 @@ public class PulsarTieredStorageTestSuite extends PulsarClusterTestBase implemen
 
         log.info("Setting up cluster {} with {} bookies, {} brokers",
                 spec.clusterName(), spec.numBookies(), spec.numBrokers());
-
         pulsarCluster = PulsarCluster.forSpec(spec);
-
         for(BrokerContainer brokerContainer : pulsarCluster.getBrokers()){
-            brokerContainer.withEnv("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER));
-            brokerContainer.withEnv("managedLedgerMinLedgerRolloverTimeMinutes", "0");
-            brokerContainer.withEnv("managedLedgerOffloadDriver", "s3");
-            brokerContainer.withEnv("s3ManagedLedgerOffloadBucket", "pulsar-integtest");
-            brokerContainer.withEnv("s3ManagedLedgerOffloadServiceEndpoint", "http://" + S3Container.NAME + ":9090");
+            getEnv().forEach(brokerContainer::withEnv);
         }
 
         pulsarCluster.start();
@@ -77,4 +72,6 @@ public class PulsarTieredStorageTestSuite extends PulsarClusterTestBase implemen
     public String getTestName() {
         return "tiered-storage-test-suite";
     }
+
+    protected abstract Map<String, String> getEnv();
 }
diff --git a/tests/integration/src/test/resources/pulsar.xml b/tests/integration/src/test/resources/pulsar.xml
index c0720e0..79bf744 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -27,6 +27,7 @@
         <suite-file path="./pulsar-process.xml" />
         <suite-file path="./pulsar-sql.xml" />
         <suite-file path="./pulsar-thread.xml" />
-        <suite-file path="./tiered-storage.xml" />
+        <suite-file path="./tiered-jcloud-storage.xml" />
+        <suite-file path="./tiered-filesystem-storage.xml"/>
     </suite-files>
 </suite>
diff --git a/tests/integration/src/test/resources/tiered-storage-suite.xml b/tests/integration/src/test/resources/tiered-file-system-suite.xml
similarity index 87%
copy from tests/integration/src/test/resources/tiered-storage-suite.xml
copy to tests/integration/src/test/resources/tiered-file-system-suite.xml
index 5b299b7..e5ec338 100644
--- a/tests/integration/src/test/resources/tiered-storage-suite.xml
+++ b/tests/integration/src/test/resources/tiered-file-system-suite.xml
@@ -21,8 +21,8 @@
 <!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
 <!-- TODO: we have to put suite files in one file to avoid executing TESTNG test suites multiple times.
            see {@link https://github.com/cbeust/testng/issues/508} -->
-<suite name="Pulsar Test Suite" parallel="instances" thread-count="1">
+<suite name="Pulsar (Jcloud Tiered Storage)Test Suite" parallel="instances" thread-count="1">
     <suite-files>
-        <suite-file path="./tiered-storage.xml" />
+        <suite-file path="./tiered-filesystem-storage.xml" />
     </suite-files>
 </suite>
diff --git a/tests/integration/src/test/resources/tiered-storage.xml b/tests/integration/src/test/resources/tiered-filesystem-storage.xml
similarity index 82%
copy from tests/integration/src/test/resources/tiered-storage.xml
copy to tests/integration/src/test/resources/tiered-filesystem-storage.xml
index 8cbdaa7..b6279e3 100644
--- a/tests/integration/src/test/resources/tiered-storage.xml
+++ b/tests/integration/src/test/resources/tiered-filesystem-storage.xml
@@ -19,10 +19,10 @@
 
 -->
 <!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
-<suite name="Pulsar (Tiered Storage) Integration Tests" verbose="2" annotations="JDK">
-    <test name="tiered-storage-test-suite" preserve-order="true">
+<suite name="Pulsar (Filesystem Tiered Storage) Integration Tests" verbose="2" annotations="JDK">
+    <test name="tiered-storage-filesystem-test-suite" preserve-order="true">
         <classes>
-            <class name="org.apache.pulsar.tests.integration.offload.TestS3Offload" />
+            <class name="org.apache.pulsar.tests.integration.offload.TestFileSystemOffload" />
         </classes>
     </test>
 </suite>
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/tiered-storage.xml b/tests/integration/src/test/resources/tiered-jcloud-storage.xml
similarity index 85%
rename from tests/integration/src/test/resources/tiered-storage.xml
rename to tests/integration/src/test/resources/tiered-jcloud-storage.xml
index 8cbdaa7..a727144 100644
--- a/tests/integration/src/test/resources/tiered-storage.xml
+++ b/tests/integration/src/test/resources/tiered-jcloud-storage.xml
@@ -19,8 +19,8 @@
 
 -->
 <!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
-<suite name="Pulsar (Tiered Storage) Integration Tests" verbose="2" annotations="JDK">
-    <test name="tiered-storage-test-suite" preserve-order="true">
+<suite name="Pulsar (Jcloud Tiered Storage) Integration Tests" verbose="2" annotations="JDK">
+    <test name="tiered-storage-jcloud-test-suite" preserve-order="true">
         <classes>
             <class name="org.apache.pulsar.tests.integration.offload.TestS3Offload" />
         </classes>
diff --git a/tests/integration/src/test/resources/tiered-storage-suite.xml b/tests/integration/src/test/resources/tiered-storage-jcloud-suite.xml
similarity index 95%
rename from tests/integration/src/test/resources/tiered-storage-suite.xml
rename to tests/integration/src/test/resources/tiered-storage-jcloud-suite.xml
index 5b299b7..bd49803 100644
--- a/tests/integration/src/test/resources/tiered-storage-suite.xml
+++ b/tests/integration/src/test/resources/tiered-storage-jcloud-suite.xml
@@ -23,6 +23,6 @@
            see {@link https://github.com/cbeust/testng/issues/508} -->
 <suite name="Pulsar Test Suite" parallel="instances" thread-count="1">
     <suite-files>
-        <suite-file path="./tiered-storage.xml" />
+        <suite-file path="./tiered-jcloud-storage.xml" />
     </suite-files>
 </suite>
diff --git a/tiered-storage/file-system/pom.xml b/tiered-storage/file-system/pom.xml
new file mode 100644
index 0000000..9e76101
--- /dev/null
+++ b/tiered-storage/file-system/pom.xml
@@ -0,0 +1,95 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>tiered-storage-parent</artifactId>
+        <version>2.4.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>tiered-storage-file-system</artifactId>
+    <name>Apache Pulsar :: Tiered Storage :: File System</name>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>managed-ledger-original</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <version>${hdfs-offload-version3}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hdfs-offload-version3}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf3.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>managed-ledger-original</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <version>${hdfs-offload-version3}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+            <version>${org.eclipse.jetty-hdfs-offload}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-servlet</artifactId>
+            <version>${org.eclipse.jetty-hdfs-offload}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-util</artifactId>
+            <version>${org.eclipse.jetty-hdfs-offload}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-nar-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java
new file mode 100644
index 0000000..899887b
--- /dev/null
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemConfigurationData.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Properties;
+
+import static org.apache.pulsar.common.util.FieldParser.value;
+
+/**
+ * Configuration for file system.
+ */
+@Data
+public class FileSystemConfigurationData implements Serializable, Cloneable {
+
+    /**** --- Ledger Offloading --- ****/
+    // Driver to use to offload old data to long term storage
+    private String managedLedgerOffloadDriver = null;
+
+    private String fileSystemProfilePath = null;
+
+    private String fileSystemURI = null;
+
+    private int managedLedgerOffloadMaxThreads = 2;
+
+    /**
+     * Create a tiered storage configuration from the provided <tt>properties</tt>.
+     *
+     * @param properties the configuration properties
+     * @return tiered storage configuration
+     */
+    public static FileSystemConfigurationData create(Properties properties) {
+        FileSystemConfigurationData data = new FileSystemConfigurationData();
+        Field[] fields = FileSystemConfigurationData.class.getDeclaredFields();
+        Arrays.stream(fields).forEach(f -> {
+            if (properties.containsKey(f.getName())) {
+                try {
+                    f.setAccessible(true);
+                    f.set(data, value((String) properties.get(f.getName()), f));
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(String.format("failed to initialize %s field while setting value %s",
+                            f.getName(), properties.get(f.getName())), e);
+                }
+            }
+        });
+        return data;
+    }
+}
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java
new file mode 100644
index 0000000..cd52197
--- /dev/null
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem;
+
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+public class FileSystemLedgerOffloaderFactory implements LedgerOffloaderFactory<FileSystemManagedLedgerOffloader> {
+    @Override
+    public boolean isDriverSupported(String driverName) {
+        return FileSystemManagedLedgerOffloader.driverSupported(driverName);
+    }
+
+    @Override
+    public FileSystemManagedLedgerOffloader create(Properties properties, Map<String, String> userMetadata, OrderedScheduler scheduler) throws IOException {
+        FileSystemConfigurationData data = FileSystemConfigurationData.create(properties);
+        return FileSystemManagedLedgerOffloader.create(data, scheduler);
+    }
+}
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
new file mode 100644
index 0000000..b1663d2
--- /dev/null
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem.impl;
+
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.DataFormats;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class FileStoreBackedReadHandleImpl implements ReadHandle {
+    private static final Logger log = LoggerFactory.getLogger(FileStoreBackedReadHandleImpl.class);
+    private final ExecutorService executor;
+    private final MapFile.Reader reader;
+    private final long ledgerId;
+    private final LedgerMetadata ledgerMetadata;
+
+    private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId) throws IOException {
+        this.ledgerId = ledgerId;
+        this.executor = executor;
+        this.reader = reader;
+        LongWritable key = new LongWritable();
+        BytesWritable value = new BytesWritable();
+        try {
+            key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX);
+            reader.get(key, value);
+            this.ledgerMetadata = parseLedgerMetadata(value.copyBytes());
+        } catch (IOException e) {
+            log.error("Fail to read LedgerMetadata for ledgerId {}",
+                    ledgerId);
+            throw new IOException("Fail to read LedgerMetadata for ledgerId " + key.get());
+        }
+    }
+
+    @Override
+    public long getId() {
+        return ledgerId;
+    }
+
+    @Override
+    public LedgerMetadata getLedgerMetadata() {
+        return ledgerMetadata;
+
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        executor.submit(() -> {
+                try {
+                    reader.close();
+                } catch (IOException t) {
+                    promise.completeExceptionally(t);
+                }
+            });
+        return promise;
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
+        if (log.isDebugEnabled()) {
+            log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+        }
+        CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+        executor.submit(() -> {
+            if (firstEntry > lastEntry
+                    || firstEntry < 0
+                    || lastEntry > getLastAddConfirmed()) {
+                promise.completeExceptionally(new BKException.BKIncorrectParameterException());
+                return;
+            }
+            long entriesToRead = (lastEntry - firstEntry) + 1;
+            List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+            long nextExpectedId = firstEntry;
+            LongWritable key = new LongWritable();
+            BytesWritable value = new BytesWritable();
+            try {
+                key.set(nextExpectedId - 1);
+                reader.seek(key);
+                while (entriesToRead > 0) {
+                    reader.next(key, value);
+                    int length = value.getLength();
+                    long entryId = key.get();
+                    if (entryId == nextExpectedId) {
+                        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
+                        entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
+                        buf.writeBytes(value.copyBytes());
+                        entriesToRead--;
+                        nextExpectedId++;
+                    } else if (entryId > lastEntry) {
+                        log.info("Expected to read {}, but read {}, which is greater than last entry {}",
+                                nextExpectedId, entryId, lastEntry);
+                        throw new BKException.BKUnexpectedConditionException();
+                    }
+            }
+                promise.complete(LedgerEntriesImpl.create(entries));
+            } catch (Throwable t) {
+                promise.completeExceptionally(t);
+                entries.forEach(LedgerEntry::close);
+            }
+        });
+        return promise;
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
+        return readAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<Long> readLastAddConfirmedAsync() {
+        return CompletableFuture.completedFuture(getLastAddConfirmed());
+    }
+
+    @Override
+    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+        return CompletableFuture.completedFuture(getLastAddConfirmed());
+    }
+
+    @Override
+    public long getLastAddConfirmed() {
+        return getLedgerMetadata().getLastEntryId();
+    }
+
+    @Override
+    public long getLength() {
+        return getLedgerMetadata().getLength();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return getLedgerMetadata().isClosed();
+    }
+
+    @Override
+    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                                      long timeOutInMillis,
+                                                                                      boolean parallel) {
+        CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>();
+        promise.completeExceptionally(new UnsupportedOperationException());
+        return promise;
+    }
+
+    public static ReadHandle open(ScheduledExecutorService executor, MapFile.Reader reader, long ledgerId) throws IOException {
+            return new FileStoreBackedReadHandleImpl(executor, reader, ledgerId);
+    }
+
+    private static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws IOException {
+        DataFormats.LedgerMetadataFormat ledgerMetadataFormat = DataFormats.LedgerMetadataFormat.newBuilder().mergeFrom(bytes).build();
+        LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
+                .withLastEntryId(ledgerMetadataFormat.getLastEntryId())
+                .withPassword(ledgerMetadataFormat.getPassword().toByteArray())
+                .withClosedState()
+                .withMetadataFormatVersion(2)
+                .withLength(ledgerMetadataFormat.getLength())
+                .withAckQuorumSize(ledgerMetadataFormat.getAckQuorumSize())
+                .withCreationTime(ledgerMetadataFormat.getCtime())
+                .withWriteQuorumSize(ledgerMetadataFormat.getQuorumSize())
+                .withEnsembleSize(ledgerMetadataFormat.getEnsembleSize());
+        ledgerMetadataFormat.getSegmentList().forEach(segment -> {
+            ArrayList<BookieSocketAddress> addressArrayList = new ArrayList<>();
+            segment.getEnsembleMemberList().forEach(address -> {
+                try {
+                    addressArrayList.add(new BookieSocketAddress(address));
+                } catch (IOException e) {
+                    log.error("Exception when create BookieSocketAddress. ", e);
+                }
+            });
+            builder.newEnsembleEntry(segment.getFirstEntryId(), addressArrayList);
+        });
+
+        if (ledgerMetadataFormat.getCustomMetadataCount() > 0) {
+            Map<String, byte[]> customMetadata = Maps.newHashMap();
+            ledgerMetadataFormat.getCustomMetadataList().forEach(
+                    entry -> customMetadata.put(entry.getKey(), entry.getValue().toByteArray()));
+            builder.withCustomMetadata(customMetadata);
+        }
+
+        switch (ledgerMetadataFormat.getDigestType()) {
+            case HMAC:
+                builder.withDigestType(DigestType.MAC);
+                break;
+            case CRC32:
+                builder.withDigestType(DigestType.CRC32);
+                break;
+            case CRC32C:
+                builder.withDigestType(DigestType.CRC32C);
+                break;
+            case DUMMY:
+                builder.withDigestType(DigestType.DUMMY);
+                break;
+            default:
+                throw new IllegalArgumentException("Unable to convert digest type " + ledgerMetadataFormat.getDigestType());
+        }
+
+        return builder.build();
+    }
+
+}
diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
new file mode 100644
index 0000000..309076c
--- /dev/null
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.ByteString;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemConfigurationData;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.DataFormats;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+
+public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
+
+    private static final Logger log = LoggerFactory.getLogger(FileSystemManagedLedgerOffloader.class);
+    private static final String STORAGE_BASE_PATH = "storageBasePath";
+    private static final String DRIVER_NAMES = "filesystem";
+    private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";
+    static final long METADATA_KEY_INDEX = -1;
+    private final Configuration configuration;
+    private final String driverName;
+    private final String storageBasePath;
+    private final FileSystem fileSystem;
+    private OrderedScheduler scheduler;
+    private static final long ENTRIES_PER_READ = 100;
+    private OrderedScheduler assignmentScheduler;
+    public static boolean driverSupported(String driver) {
+        return DRIVER_NAMES.equals(driver);
+    }
+    @Override
+    public String getOffloadDriverName() {
+        return driverName;
+    }
+
+    public static FileSystemManagedLedgerOffloader create(FileSystemConfigurationData conf, OrderedScheduler scheduler) throws IOException {
+        return new FileSystemManagedLedgerOffloader(conf, scheduler);
+    }
+
+    private FileSystemManagedLedgerOffloader(FileSystemConfigurationData conf, OrderedScheduler scheduler) throws IOException {
+        this.configuration = new Configuration();
+        if (conf.getFileSystemProfilePath() != null) {
+            String[] paths = conf.getFileSystemProfilePath().split(",");
+            for (int i =0 ; i < paths.length; i++) {
+                configuration.addResource(new Path(paths[i]));
+            }
+        }
+        if (!"".equals(conf.getFileSystemURI()) && conf.getFileSystemURI() != null) {
+            configuration.set("fs.defaultFS", conf.getFileSystemURI());
+        }
+        if (configuration.get("fs.hdfs.impl") == null) {
+            this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        }
+
+        if (configuration.get("fs.file.impl") == null) {
+            this.configuration.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+        }
+
+        this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
+        this.driverName = conf.getManagedLedgerOffloadDriver();
+        this.storageBasePath = configuration.get("hadoop.tmp.dir");
+        this.scheduler = scheduler;
+        this.fileSystem = FileSystem.get(configuration);
+        this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
+                .numThreads(conf.getManagedLedgerOffloadMaxThreads())
+                .name("offload-assignment").build();
+    }
+    @VisibleForTesting
+    public FileSystemManagedLedgerOffloader(FileSystemConfigurationData conf, OrderedScheduler scheduler, String testHDFSPath, String baseDir) throws IOException {
+        this.configuration = new Configuration();
+        this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        this.configuration.set("fs.defaultFS", testHDFSPath);
+        this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
+        this.driverName = conf.getManagedLedgerOffloadDriver();
+        this.configuration.set("hadoop.tmp.dir", baseDir);
+        this.storageBasePath = baseDir;
+        this.scheduler = scheduler;
+        this.fileSystem = FileSystem.get(configuration);
+        this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
+                .numThreads(conf.getManagedLedgerOffloadMaxThreads())
+                .name("offload-assignment").build();
+    }
+
+    @Override
+    public Map<String, String> getOffloadDriverMetadata() {
+        String path = storageBasePath == null ? "null" : storageBasePath;
+        return ImmutableMap.of(
+                STORAGE_BASE_PATH, path
+        );
+    }
+
+    /*
+    * ledgerMetadata stored in an index of -1
+    * */
+    @Override
+    public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        scheduler.chooseThread(readHandle.getId()).submit(new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration, assignmentScheduler));
+        return promise;
+    }
+
+    private static class LedgerReader implements Runnable {
+
+        private final ReadHandle readHandle;
+        private final UUID uuid;
+        private final Map<String, String> extraMetadata;
+        private final CompletableFuture<Void> promise;
+        private final String storageBasePath;
+        private final Configuration configuration;
+        volatile Exception fileSystemWriteException = null;
+        private OrderedScheduler assignmentScheduler;
+
+        private LedgerReader(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata, CompletableFuture<Void> promise,
+                             String storageBasePath, Configuration configuration, OrderedScheduler assignmentScheduler) {
+            this.readHandle = readHandle;
+            this.uuid = uuid;
+            this.extraMetadata = extraMetadata;
+            this.promise = promise;
+            this.storageBasePath = storageBasePath;
+            this.configuration = configuration;
+            this.assignmentScheduler = assignmentScheduler;
+        }
+
+        @Override
+        public void run() {
+            if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) {
+                promise.completeExceptionally(
+                        new IllegalArgumentException("An empty or open ledger should never be offloaded"));
+                return;
+            }
+            long ledgerId = readHandle.getId();
+            String storagePath = getStoragePath(storageBasePath, extraMetadata.get(MANAGED_LEDGER_NAME));
+            String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
+            LongWritable key = new LongWritable();
+            BytesWritable value = new BytesWritable();
+            try {
+                MapFile.Writer dataWriter = new MapFile.Writer(configuration,
+                        new Path(dataFilePath),
+                        MapFile.Writer.keyClass(LongWritable.class),
+                        MapFile.Writer.valueClass(BytesWritable.class));
+                //store the ledgerMetadata in -1 index
+                key.set(METADATA_KEY_INDEX);
+                byte[] ledgerMetadata = buildLedgerMetadataFormat(readHandle.getLedgerMetadata());
+                value.set(buildLedgerMetadataFormat(readHandle.getLedgerMetadata()), 0, ledgerMetadata.length);
+                dataWriter.append(key, value);
+                AtomicLong haveOffloadEntryNumber = new AtomicLong(0);
+                long needToOffloadFirstEntryNumber = 0;
+                CountDownLatch countDownLatch;
+                do {
+                    long end = Math.min(needToOffloadFirstEntryNumber + ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed());
+                    log.debug("read ledger entries. start: {}, end: {}", needToOffloadFirstEntryNumber, end);
+                    LedgerEntries ledgerEntriesOnce = readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
+                    countDownLatch = new CountDownLatch(1);
+                    assignmentScheduler.chooseThread(ledgerId).submit(new FileSystemWriter(ledgerEntriesOnce, dataWriter,
+                            countDownLatch, haveOffloadEntryNumber, this)).addListener(() -> {}, Executors.newSingleThreadExecutor());
+                    needToOffloadFirstEntryNumber = end + 1;
+                } while (needToOffloadFirstEntryNumber - 1 != readHandle.getLastAddConfirmed() && fileSystemWriteException == null);
+                countDownLatch.await();
+                if (fileSystemWriteException != null) {
+                    throw fileSystemWriteException;
+                }
+                IOUtils.closeStream(dataWriter);
+                promise.complete(null);
+            } catch (Exception e) {
+                log.error("Exception when get CompletableFuture<LedgerEntries> : ManagerLedgerName: {}, " +
+                        "LedgerId: {}, UUID: {} ", extraMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, e);
+                if (e instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
+                }
+                promise.completeExceptionally(e);
+            }
+        }
+    }
+
+    private static class FileSystemWriter implements Runnable {
+
+        private final LedgerEntries ledgerEntriesOnce;
+
+        private final LongWritable key = new LongWritable();
+        private final BytesWritable value = new BytesWritable();
+
+        private final MapFile.Writer dataWriter;
+        private final CountDownLatch countDownLatch;
+        private final AtomicLong haveOffloadEntryNumber;
+        private final LedgerReader ledgerReader;
+
+
+        private FileSystemWriter(LedgerEntries ledgerEntriesOnce, MapFile.Writer dataWriter,
+                                 CountDownLatch countDownLatch, AtomicLong haveOffloadEntryNumber, LedgerReader ledgerReader) {
+            this.ledgerEntriesOnce = ledgerEntriesOnce;
+            this.dataWriter = dataWriter;
+            this.countDownLatch = countDownLatch;
+            this.haveOffloadEntryNumber = haveOffloadEntryNumber;
+            this.ledgerReader = ledgerReader;
+        }
+
+        @Override
+        public void run() {
+            if (ledgerReader.fileSystemWriteException == null) {
+                Iterator<LedgerEntry> iterator = ledgerEntriesOnce.iterator();
+                while (iterator.hasNext()) {
+                    LedgerEntry entry = iterator.next();
+                    long entryId = entry.getEntryId();
+                    key.set(entryId);
+                    try {
+                        value.set(entry.getEntryBytes(), 0, entry.getEntryBytes().length);
+                        dataWriter.append(key, value);
+                    } catch (IOException e) {
+                        ledgerReader.fileSystemWriteException = e;
+                        break;
+                    }
+                    haveOffloadEntryNumber.incrementAndGet();
+                }
+            }
+            countDownLatch.countDown();
+        }
+    }
+
+    @Override
+    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid, Map<String, String> offloadDriverMetadata) {
+
+        CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
+        String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME));
+        String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
+        scheduler.chooseThread(ledgerId).submit(() -> {
+            try {
+                MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath),
+                        configuration);
+                promise.complete(FileStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), reader, ledgerId));
+            } catch (Throwable t) {
+                log.error("Failed to open FileStoreBackedReadHandleImpl: ManagerLedgerName: {}, " +
+                        "LegerId: {}, UUID: {}", offloadDriverMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, t);
+                promise.completeExceptionally(t);
+            }
+        });
+        return promise;
+    }
+
+    private static String getStoragePath(String storageBasePath, String managedLedgerName) {
+        return storageBasePath == null ? managedLedgerName + "/" : storageBasePath + "/" + managedLedgerName + "/";
+    }
+
+    private static String getDataFilePath(String storagePath, long ledgerId, UUID uuid) {
+        return storagePath + ledgerId + "-" + uuid.toString();
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
+        String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME));
+        String dataFilePath = getDataFilePath(storagePath, ledgerId, uid);
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        try {
+            fileSystem.delete(new Path(dataFilePath), true);
+            promise.complete(null);
+        } catch (IOException e) {
+            log.error("Failed to delete Offloaded: ", e);
+            promise.completeExceptionally(e);
+        }
+        return promise;
+    }
+
+    private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
+        DataFormats.LedgerMetadataFormat.Builder builder = DataFormats.LedgerMetadataFormat.newBuilder();
+        builder.setQuorumSize(metadata.getWriteQuorumSize())
+                .setAckQuorumSize(metadata.getAckQuorumSize())
+                .setEnsembleSize(metadata.getEnsembleSize())
+                .setLength(metadata.getLength())
+                .setState(metadata.isClosed() ? DataFormats.LedgerMetadataFormat.State.CLOSED : DataFormats.LedgerMetadataFormat.State.OPEN)
+                .setLastEntryId(metadata.getLastEntryId())
+                .setCtime(metadata.getCtime())
+                .setDigestType(BookKeeper.DigestType.toProtoDigestType(
+                        BookKeeper.DigestType.fromApiDigestType(metadata.getDigestType())));
+
+        for (Map.Entry<String, byte[]> e : metadata.getCustomMetadata().entrySet()) {
+            builder.addCustomMetadataBuilder()
+                    .setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue()));
+        }
+
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : metadata.getAllEnsembles().entrySet()) {
+            builder.addSegmentBuilder()
+                    .setFirstEntryId(e.getKey())
+                    .addAllEnsembleMember(e.getValue().stream().map(a -> a.toString()).collect(Collectors.toList()));
+        }
+
+        return builder.build().toByteArray();
+    }
+}
diff --git a/tiered-storage/file-system/src/main/resources/META-INF/services/pulsar-offloader.yaml b/tiered-storage/file-system/src/main/resources/META-INF/services/pulsar-offloader.yaml
new file mode 100644
index 0000000..d7f7ed2
--- /dev/null
+++ b/tiered-storage/file-system/src/main/resources/META-INF/services/pulsar-offloader.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: filesystem
+description: fileSystem based offloader implementation
+offloaderFactoryClass: org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory
diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
new file mode 100644
index 0000000..4abff16
--- /dev/null
+++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem;
+
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Properties;
+
+public class FileStoreTestBase {
+    protected FileSystemManagedLedgerOffloader fileSystemManagedLedgerOffloader;
+    protected OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
+    protected final String basePath = "pulsar";
+    private MiniDFSCluster hdfsCluster;
+    private String hdfsURI;
+
+    @BeforeMethod
+    public void start() throws Exception {
+        File baseDir = Files.createTempDirectory(basePath).toFile().getAbsoluteFile();
+        Configuration conf = new Configuration();
+        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+        hdfsCluster = builder.build();
+
+        hdfsURI = "hdfs://localhost:"+ hdfsCluster.getNameNodePort() + "/";
+        Properties properties = new Properties();
+        fileSystemManagedLedgerOffloader = new FileSystemManagedLedgerOffloader(
+                FileSystemConfigurationData.create(properties),
+                scheduler, hdfsURI, basePath);
+    }
+
+    @AfterMethod
+    public void tearDown() {
+        hdfsCluster.shutdown(true, true);
+        hdfsCluster.close();
+    }
+
+    public String getURI() {
+        return hdfsURI;
+    }
+}
diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
new file mode 100644
index 0000000..e03c5e1
--- /dev/null
+++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.filesystem.impl;
+
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.offload.filesystem.FileStoreTestBase;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase {
+    private final PulsarMockBookKeeper bk;
+    private String topic = "public/default/persistent/testOffload";
+    private String storagePath = createStoragePath(topic);
+    private LedgerHandle lh;
+    private ReadHandle toWrite;
+    private final int numberOfEntries = 601;
+    private  Map<String, String> map = new HashMap<>();
+
+    public FileSystemManagedLedgerOffloaderTest() throws Exception {
+        this.bk = new PulsarMockBookKeeper(createMockZooKeeper(), scheduler.chooseThread(this));
+        this.toWrite = buildReadHandle();
+        map.put("ManagedLedgerName", topic);
+    }
+
+    private static MockZooKeeper createMockZooKeeper() throws Exception {
+        MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
+        List<ACL> dummyAclList = new ArrayList<ACL>(0);
+
+        ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
+                "".getBytes(UTF_8), dummyAclList, CreateMode.PERSISTENT);
+
+        zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(UTF_8), dummyAclList,
+                CreateMode.PERSISTENT);
+        return zk;
+    }
+
+    private ReadHandle buildReadHandle() throws Exception {
+
+        lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "foobar".getBytes());
+
+        int i = 0;
+        int blocksWritten = 1;
+        while (blocksWritten <= numberOfEntries) {
+            byte[] entry = ("foobar"+i).getBytes();
+            blocksWritten++;
+            lh.addEntry(entry);
+            i++;
+        }
+        lh.close();
+
+        return bk.newOpenLedgerOp().withLedgerId(lh.getId())
+                .withPassword("foobar".getBytes()).withDigestType(DigestType.CRC32).execute().get();
+    }
+
+    @Test
+    public void testOffloadAndRead() throws Exception {
+        LedgerOffloader offloader = fileSystemManagedLedgerOffloader;
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, map).get();
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, map).get();
+        Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
+        LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1);
+        LedgerEntries toWriteEntries = toWrite.read(0,numberOfEntries - 1);
+        Iterator<LedgerEntry> toTestIter = toTestEntries.iterator();
+        Iterator<LedgerEntry> toWriteIter = toWriteEntries.iterator();
+        while(toTestIter.hasNext()) {
+            LedgerEntry toWriteEntry = toWriteIter.next();
+            LedgerEntry toTestEntry = toTestIter.next();
+
+            Assert.assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId());
+            Assert.assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
+            Assert.assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
+            Assert.assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer());
+        }
+        toTestEntries = toTest.read(1, numberOfEntries - 1);
+        toWriteEntries = toWrite.read(1,numberOfEntries - 1);
+        toTestIter = toTestEntries.iterator();
+        toWriteIter = toWriteEntries.iterator();
+        while(toTestIter.hasNext()) {
+            LedgerEntry toWriteEntry = toWriteIter.next();
+            LedgerEntry toTestEntry = toTestIter.next();
+
+            Assert.assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId());
+            Assert.assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId());
+            Assert.assertEquals(toWriteEntry.getLength(), toTestEntry.getLength());
+            Assert.assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer());
+        }
+    }
+
+    @Test
+    public void testDeleteOffload() throws Exception {
+        LedgerOffloader offloader = fileSystemManagedLedgerOffloader;
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, map).get();
+        Configuration configuration = new Configuration();
+        FileSystem fileSystem = FileSystem.get(new URI(getURI()), configuration);
+        Assert.assertEquals(true, fileSystem.exists(new Path(createDataFilePath(storagePath, lh.getId(), uuid))));
+        Assert.assertEquals(true, fileSystem.exists(new Path(createIndexFilePath(storagePath, lh.getId(), uuid))));
+        offloader.deleteOffloaded(lh.getId(), uuid, map);
+        Assert.assertEquals(false, fileSystem.exists(new Path(createDataFilePath(storagePath, lh.getId(), uuid))));
+        Assert.assertEquals(false, fileSystem.exists(new Path(createIndexFilePath(storagePath, lh.getId(), uuid))));
+    }
+
+    private String createStoragePath(String managedLedgerName) {
+        return basePath == null ? managedLedgerName + "/" : basePath + "/" +  managedLedgerName + "/";
+    }
+
+    private String createIndexFilePath(String storagePath, long ledgerId, UUID uuid) {
+        return storagePath + ledgerId + "-" + uuid + "/index";
+    }
+
+    private String createDataFilePath(String storagePath, long ledgerId, UUID uuid) {
+        return storagePath + ledgerId + "-" + uuid + "/data";
+    }
+}
diff --git a/tiered-storage/pom.xml b/tiered-storage/pom.xml
index b8390ad..fc4cd51 100644
--- a/tiered-storage/pom.xml
+++ b/tiered-storage/pom.xml
@@ -38,5 +38,6 @@
 
   <modules>
     <module>jcloud</module>
+    <module>file-system</module>
   </modules>
 </project>