You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 02:29:58 UTC

[pulsar] branch master updated: PIP 76: Streaming Offload(Part I) (#9096)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 17f399e  PIP 76: Streaming Offload(Part I) (#9096)
17f399e is described below

commit 17f399ebebaaf03b969deea971f3320595398c54
Author: Renkai Ge <ga...@gmail.com>
AuthorDate: Tue Jan 26 10:29:17 2021 +0800

    PIP 76: Streaming Offload(Part I) (#9096)
    
    This PR contains the new interface and implementation of the offloader in the below PIP
    
    Unit test is still in progress
    
    - [x] StreamingDataBlockHeaderImpl
    - [x] StreamingBlobStoreBackedReadHandleImpl
    - [x] BufferedOffloadStream
    - [x] BlobStoreManagedLedgerOffloader
    - [x] StreamingOffloadIndexBlock
    
    PIP 76: https://github.com/apache/pulsar/wiki/PIP-76:-Streaming-Offload
---
 .../apache/bookkeeper/mledger/LedgerOffloader.java |  97 ++++-
 .../apache/bookkeeper/mledger/ManagedLedger.java   |   9 +-
 .../org/apache/bookkeeper/mledger/Position.java    |   4 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  10 +-
 .../mledger/impl/OffloadSegmentInfoImpl.java       |  61 +++
 .../bookkeeper/mledger/impl/PositionImpl.java      |   1 -
 managed-ledger/src/main/proto/MLDataFormats.proto  |  11 +
 .../mledger/impl/OffloadPrefixReadTest.java        |  10 +-
 .../mledger/offload/jcloud/DataBlockHeader.java    |   2 +-
 .../mledger/offload/jcloud/OffloadIndexBlock.java  |  15 +-
 .../offload/jcloud/OffloadIndexBlockBuilder.java   |   6 +-
 ...oadIndexBlock.java => OffloadIndexBlockV2.java} |  32 +-
 ...uilder.java => OffloadIndexBlockV2Builder.java} |  23 +-
 .../mledger/offload/jcloud/OffloadIndexEntry.java  |   6 +-
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java |  15 +-
 .../impl/BlobStoreBackedReadHandleImplV2.java      | 302 ++++++++++++++
 .../impl/BlobStoreManagedLedgerOffloader.java      | 341 +++++++++++++++-
 .../impl/BlockAwareSegmentInputStreamImpl.java     |   3 +-
 .../offload/jcloud/impl/BufferedOffloadStream.java | 135 +++++++
 .../offload/jcloud/impl/DataBlockHeaderImpl.java   |   4 +-
 .../offload/jcloud/impl/DataBlockUtils.java        |   6 +-
 .../jcloud/impl/OffloadIndexBlockBuilderImpl.java  |  97 -----
 .../offload/jcloud/impl/OffloadIndexBlockImpl.java |  22 +-
 .../impl/OffloadIndexBlockV2BuilderImpl.java       | 150 +++++++
 .../jcloud/impl/OffloadIndexBlockV2Impl.java       | 379 ++++++++++++++++++
 .../offload/jcloud/impl/OffloadIndexEntryImpl.java |   3 +
 ...Impl.java => StreamingDataBlockHeaderImpl.java} |  82 ++--
 .../provider/TieredStorageConfiguration.java       |  50 +++
 .../impl/BlobStoreManagedLedgerOffloaderBase.java  |  42 +-
 ...obStoreManagedLedgerOffloaderStreamingTest.java | 445 +++++++++++++++++++++
 .../jcloud/impl/BufferedOffloadStreamTest.java     | 196 +++++++++
 .../offload/jcloud/impl/DataBlockHeaderV2Test.java |  87 ++++
 .../offload/jcloud/impl/MockManagedLedger.java     | 328 +++++++++++++++
 .../offload/jcloud/impl/OffloadIndexTest.java      |  25 +-
 .../offload/jcloud/impl/OffloadIndexV2Test.java    | 327 +++++++++++++++
 .../jcloud/src/test/resources/log4j2-test.yml      |  61 +++
 36 files changed, 3121 insertions(+), 266 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index 4552199..32fbc9c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -22,19 +22,71 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-
+import lombok.ToString;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 
 /**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
  */
 @InterfaceAudience.LimitedPrivate
 @InterfaceStability.Evolving
 public interface LedgerOffloader {
 
+    @ToString
+    class OffloadResult {
+        public final long beginLedger;
+        public final long beginEntry;
+        public final long endLedger;
+        public final long endEntry;
+
+        public OffloadResult(long beginLedger, long beginEntry, long endLedger, long endEntry) {
+            this.beginLedger = beginLedger;
+            this.beginEntry = beginEntry;
+            this.endLedger = endLedger;
+            this.endEntry = endEntry;
+        }
+    }
+
+    /**
+     * Used to store driver info, buffer entries, mark progress, etc.
+     * Create one per segment.
+     */
+    interface OffloadHandle {
+        enum OfferEntryResult {
+            SUCCESS,
+            FAIL_BUFFER_FULL,
+            FAIL_SEGMENT_CLOSED,
+            FAIL_NOT_CONSECUTIVE
+        }
+
+        Position lastOffered();
+
+        CompletableFuture<Position> lastOfferedAsync();
+
+        /**
+         * The caller should manually release entry no matter what the offer result is.
+         */
+        OfferEntryResult offerEntry(Entry entry);
+
+        CompletableFuture<OfferEntryResult> offerEntryAsync(Entry entry);
+
+        CompletableFuture<OffloadResult> getOffloadResultAsync();
+
+        /**
+         * Manually close current offloading segment
+         * @return true if the segment is not already closed
+         */
+        boolean close();
+
+        default CompletableFuture<Boolean> AsyncClose() {
+            return CompletableFuture.completedFuture(close());
+        }
+    }
+
     // TODO: improve the user metadata in subsequent changes
     String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion";
     String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha";
@@ -63,7 +115,7 @@ public interface LedgerOffloader {
      * alongside the ledger data.
      *
      * When the returned future completes, the ledger has been persisted to the
-     * loadterm storage, so it is safe to delete the original copy in bookkeeper.
+     * longterm storage, so it is safe to delete the original copy in bookkeeper.
      *
      * The uid is used to identify an attempt to offload. The implementation should
      * use this to deterministically generate a unique name for the offloaded object.
@@ -86,6 +138,35 @@ public interface LedgerOffloader {
                                     Map<String, String> extraMetadata);
 
     /**
+     * Begin offload the passed in ledgers to longterm storage, it will finish
+     * when a segment reached it's size or time.
+     * Should only be called once for a LedgerOffloader instance.
+     * Metadata passed in is for inspection purposes only and should be stored
+     * alongside the segment data.
+     *
+     * When the returned OffloaderHandle.getOffloadResultAsync completes, the corresponding
+     * ledgers has been persisted to the
+     * longterm storage, so it is safe to delete the original copy in bookkeeper.
+     *
+     * The uid is used to identify an attempt to offload. The implementation should
+     * use this to deterministically generate a unique name for the offloaded object.
+     * This uid will be stored in the managed ledger metadata before attempting the
+     * call to streamingOffload(). If a subsequent or concurrent call to streamingOffload() finds
+     * a uid in the metadata, it will attempt to cleanup this attempt with a call
+     * to #deleteOffloaded(ReadHandle,UUID). Once the offload attempt completes,
+     * the managed ledger will update its metadata again, to record the completion,
+     * ensuring that subsequent calls will not attempt to offload the same ledger
+     * again.
+     *
+     * @return an OffloaderHandle, which when `completeFuture()` completed, denotes that the offload has been successful.
+     */
+    default CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, UUID uid, long beginLedger,
+                                                              long beginEntry,
+                                                              Map<String, String> driverMetadata) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
      * Create a ReadHandle which can be used to read a ledger back from longterm
      * storage.
      *
@@ -115,6 +196,16 @@ public interface LedgerOffloader {
     CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
                                             Map<String, String> offloadDriverMetadata);
 
+    default CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.OffloadContext ledgerContext,
+                                                        Map<String, String> offloadDriverMetadata) {
+        throw new UnsupportedOperationException();
+    }
+
+    default CompletableFuture<Void> deleteOffloaded(UUID uid,
+                                                    Map<String, String> offloadDriverMetadata) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Get offload policies of this LedgerOffloader
      *
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 4f4c226..a2e0ba0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -19,10 +19,8 @@
 package org.apache.bookkeeper.mledger;
 
 import io.netty.buffer.ByteBuf;
-
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -33,6 +31,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 
 /**
@@ -595,4 +594,10 @@ public interface ManagedLedger {
      * Get the ManagedLedgerInterceptor for ManagedLedger.
      * */
     ManagedLedgerInterceptor getManagedLedgerInterceptor();
+
+    /**
+     * Get basic ledger summary.
+     * will got null if corresponding ledger not exists.
+     */
+    CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java
index 1f34c8c..c76e7b7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java
@@ -34,4 +34,8 @@ public interface Position {
      * @return the position of the next logical entry
      */
     Position getNext();
+
+    long getLedgerId();
+
+    long getEntryId();
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7241ee5..1c4b08e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -34,6 +34,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
 import java.time.Clock;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,7 +65,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import io.netty.util.ReferenceCountUtil;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -1683,6 +1683,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return getLedgerHandle(ledgerId).thenApply(rh -> rh.getLedgerMetadata().toSafeString());
     }
 
+    @Override
+    public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
+        CompletableFuture<LedgerInfo> result = new CompletableFuture<>();
+        final LedgerInfo ledgerInfo = ledgers.get(ledgerId);
+        result.complete(ledgerInfo);
+        return result;
+    }
+
     CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
         CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId);
         if (ledgerHandle != null) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java
new file mode 100644
index 0000000..cca1676
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OffloadSegmentInfoImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+
+import java.util.Map;
+import java.util.UUID;
+import lombok.ToString;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+
+@ToString
+public class OffloadSegmentInfoImpl {
+    public OffloadSegmentInfoImpl(UUID uuid, long beginLedgerId, long beginEntryId, String driverName,
+                                  Map<String, String> driverMetadata) {
+        this.uuid = uuid;
+        this.beginLedgerId = beginLedgerId;
+        this.beginEntryId = beginEntryId;
+        this.driverName = driverName;
+        this.driverMetadata = driverMetadata;
+    }
+
+
+    public final UUID uuid;
+    public final long beginLedgerId;
+    public final long beginEntryId;
+    public final String driverName;
+    volatile private long endLedgerId;
+    volatile private long endEntryId;
+    volatile boolean closed = false;
+    public final Map<String, String> driverMetadata;
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public void closeSegment(long endLedger, long endEntry) {
+        this.endLedgerId = endLedger;
+        this.endEntryId = endEntry;
+        this.closed = true;
+    }
+
+    public LedgerOffloader.OffloadResult result() {
+        return new LedgerOffloader.OffloadResult(beginLedgerId, beginEntryId, endLedgerId, endEntryId);
+    }
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
index ef445f9..50382f9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
@@ -19,7 +19,6 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-
 import com.google.common.base.Objects;
 import com.google.common.collect.ComparisonChain;
 import org.apache.bookkeeper.mledger.Position;
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index db3b4ff..a5be8e4 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -38,6 +38,17 @@ message OffloadContext {
     optional bool bookkeeperDeleted = 4;
     optional int64 timestamp = 5;
     optional OffloadDriverMetadata driverMetadata = 6;
+    repeated OffloadSegment offloadSegment = 7;
+}
+
+message OffloadSegment {
+    optional int64 uidMsb = 1;
+    optional int64 uidLsb = 2;
+    optional bool complete = 3;
+    optional int64 assignedTimestamp = 4; //timestamp in millisecond
+    optional int64 offloadedTimestamp = 5; //timestamp in millisecond
+    optional int64 endEntryId = 6;
+    optional OffloadDriverMetadata driverMetadata = 7;
 }
 
 message ManagedLedgerInfo {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index 69011cac..a3d2283 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -97,21 +97,21 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
             assertEquals(new String(e.getData()), "entry-" + i++);
         }
         verify(offloader, times(1))
-            .readOffloaded(anyLong(), any(), anyMap());
+                .readOffloaded(anyLong(), (UUID) any(), anyMap());
         verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), anyMap());
 
         for (Entry e : cursor.readEntries(10)) {
             assertEquals(new String(e.getData()), "entry-" + i++);
         }
         verify(offloader, times(2))
-                .readOffloaded(anyLong(), any(), anyMap());
+                .readOffloaded(anyLong(), (UUID) any(), anyMap());
         verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());
 
         for (Entry e : cursor.readEntries(5)) {
             assertEquals(new String(e.getData()), "entry-" + i++);
         }
         verify(offloader, times(2))
-                .readOffloaded(anyLong(), any(), anyMap());
+                .readOffloaded(anyLong(), (UUID) any(), anyMap());
     }
 
     @Test
@@ -164,7 +164,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
         }
         // For offloaded first and not deleted ledgers, they should be read from bookkeeper.
         verify(offloader, never())
-                .readOffloaded(anyLong(), any(), anyMap());
+                .readOffloaded(anyLong(), (UUID) any(), anyMap());
 
         // Delete offladed message from bookkeeper
         assertEventuallyTrue(() -> bkc.getLedgers().contains(firstLedger.getLedgerId()));
@@ -186,7 +186,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
 
         // Ledgers deleted from bookkeeper, now should read from offloader
         verify(offloader, atLeastOnce())
-                .readOffloaded(anyLong(), any(), anyMap());
+                .readOffloaded(anyLong(), (UUID) any(), anyMap());
         verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());
 
     }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/DataBlockHeader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/DataBlockHeader.java
index 6a7c1eb..0a3565c 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/DataBlockHeader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/DataBlockHeader.java
@@ -22,7 +22,7 @@ import java.io.InputStream;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
 
 /**
- * The data block header in code storage for each data block.
+ * The data block header in tiered storage for each data block.
  *
  * <p>Currently, It is in format:
  * [ magic_word -- int ][ block_len -- int ][ first_entry_id  -- long][padding]
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java
index 9899740..01b610a 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java
@@ -29,7 +29,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
  * The Index block abstraction used for offload a ledger to long term storage.
  */
 @Unstable
-public interface OffloadIndexBlock extends Closeable {
+public interface OffloadIndexBlock extends Closeable, OffloadIndexBlockV2 {
 
     /**
      * Get the content of the index block as InputStream.
@@ -86,5 +86,18 @@ public interface OffloadIndexBlock extends Closeable {
             return streamSize;
         }
     }
+
+    default OffloadIndexEntry getIndexEntryForEntry(long ledgerId, long messageEntryId) throws IOException {
+        return getIndexEntryForEntry(messageEntryId);
+    }
+
+    default long getStartEntryId(long ledgerId) {
+        return 0; //Offload index block v1 always start with 0;
+    }
+
+    default LedgerMetadata getLedgerMetadata(long ledgerId) {
+        return getLedgerMetadata();
+    }
+
 }
 
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java
index 7c398e3..41a7009 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java
@@ -23,7 +23,7 @@ import java.io.InputStream;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
-import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexBlockBuilderImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexBlockV2BuilderImpl;
 
 /**
  * Interface for builder of index block used for offload a ledger to long term storage.
@@ -71,12 +71,12 @@ public interface OffloadIndexBlockBuilder {
     /**
      * Construct OffloadIndex from an InputStream.
      */
-    OffloadIndexBlock fromStream(InputStream is) throws IOException;
+    OffloadIndexBlockV2 fromStream(InputStream is) throws IOException;
 
     /**
      * create an OffloadIndexBlockBuilder.
      */
     static OffloadIndexBlockBuilder create() {
-        return new OffloadIndexBlockBuilderImpl();
+        return new OffloadIndexBlockV2BuilderImpl();
     }
 }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockV2.java
similarity index 72%
copy from tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java
copy to tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockV2.java
index 9899740..793d974 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlock.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockV2.java
@@ -19,9 +19,7 @@
 package org.apache.bookkeeper.mledger.offload.jcloud;
 
 import java.io.Closeable;
-import java.io.FilterInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
 
@@ -29,7 +27,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
  * The Index block abstraction used for offload a ledger to long term storage.
  */
 @Unstable
-public interface OffloadIndexBlock extends Closeable {
+public interface OffloadIndexBlockV2 extends Closeable {
 
     /**
      * Get the content of the index block as InputStream.
@@ -37,7 +35,7 @@ public interface OffloadIndexBlock extends Closeable {
      *   | index_magic_header | index_block_len | index_entry_count |
      *   | data_object_size | segment_metadata_length | segment metadata | index entries ... |
      */
-    IndexInputStream toStream() throws IOException;
+    OffloadIndexBlock.IndexInputStream toStream() throws IOException;
 
     /**
      * Get the related OffloadIndexEntry that contains the given messageEntryId.
@@ -46,7 +44,9 @@ public interface OffloadIndexBlock extends Closeable {
      *                      the entry id of message
      * @return the offload index entry
      */
-    OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws IOException;
+    OffloadIndexEntry getIndexEntryForEntry(long ledgerId, long messageEntryId) throws IOException;
+
+    public long getStartEntryId(long ledgerId);
 
     /**
      * Get the entry count that contained in this index Block.
@@ -55,8 +55,9 @@ public interface OffloadIndexBlock extends Closeable {
 
     /**
      * Get LedgerMetadata.
+     * @return
      */
-    LedgerMetadata getLedgerMetadata();
+    LedgerMetadata getLedgerMetadata(long ledgerId);
 
     /**
      * Get the total size of the data object.
@@ -67,24 +68,5 @@ public interface OffloadIndexBlock extends Closeable {
      * Get the length of the header in the blocks in the data object.
      */
     long getDataBlockHeaderLength();
-
-    /**
-     * An input stream which knows the size of the stream upfront.
-     */
-    class IndexInputStream extends FilterInputStream {
-        final long streamSize;
-
-        public IndexInputStream(InputStream in, long streamSize) {
-            super(in);
-            this.streamSize = streamSize;
-        }
-
-        /**
-         * @return the number of bytes in the stream.
-         */
-        public long getStreamSize() {
-            return streamSize;
-        }
-    }
 }
 
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockV2Builder.java
similarity index 76%
copy from tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java
copy to tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockV2Builder.java
index 7c398e3..a79bc9f 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockV2Builder.java
@@ -20,24 +20,25 @@ package org.apache.bookkeeper.mledger.offload.jcloud;
 
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
-import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexBlockBuilderImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexBlockV2BuilderImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 
 /**
  * Interface for builder of index block used for offload a ledger to long term storage.
  */
 @Unstable
 @LimitedPrivate
-public interface OffloadIndexBlockBuilder {
+public interface OffloadIndexBlockV2Builder {
 
     /**
      * Build index block with the passed in ledger metadata.
      *
+     * @param ledgerId
      * @param metadata the ledger metadata
      */
-    OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata);
+    OffloadIndexBlockV2Builder addLedgerMeta(Long ledgerId, LedgerInfo metadata);
 
     /**
      * Add one payload block related information into index block.
@@ -49,34 +50,34 @@ public interface OffloadIndexBlockBuilder {
      * @param partId the payload block Id
      * @param blockSize the payload block size
      */
-    OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize);
+    OffloadIndexBlockV2Builder addBlock(long ledgerId, long firstEntryId, int partId, int blockSize);
 
     /**
      * Specify the length of data object this index is associated with.
      * @param dataObjectLength the length of the data object
      */
-    OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength);
+    OffloadIndexBlockV2Builder withDataObjectLength(long dataObjectLength);
 
     /**
      * Specify the length of the block headers in the data object.
      * @param dataHeaderLength the length of the headers
      */
-    OffloadIndexBlockBuilder withDataBlockHeaderLength(long dataHeaderLength);
+    OffloadIndexBlockV2Builder withDataBlockHeaderLength(long dataHeaderLength);
 
     /**
      * Finalize the immutable OffloadIndexBlock.
      */
-    OffloadIndexBlock build();
+    OffloadIndexBlockV2 buildV2();
 
     /**
      * Construct OffloadIndex from an InputStream.
      */
-    OffloadIndexBlock fromStream(InputStream is) throws IOException;
+    OffloadIndexBlockV2 fromStream(InputStream is) throws IOException;
 
     /**
      * create an OffloadIndexBlockBuilder.
      */
-    static OffloadIndexBlockBuilder create() {
-        return new OffloadIndexBlockBuilderImpl();
+    static OffloadIndexBlockV2Builder create() {
+        return new OffloadIndexBlockV2BuilderImpl();
     }
 }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexEntry.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexEntry.java
index e51e928..99d2206 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexEntry.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexEntry.java
@@ -23,8 +23,8 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
 
 /**
  * The Index Entry in OffloadIndexBlock.
- * It consists of the message entry id, the code storage block part id for this message entry,
- * and the offset in code storage block for this message id.
+ * It consists of the message entry id, the tiered storage block part id for this message entry,
+ * and the offset in tiered storage block for this message id.
  */
 @Unstable
 @LimitedPrivate
@@ -36,7 +36,7 @@ public interface OffloadIndexEntry {
     long getEntryId();
 
     /**
-     * Get the block part id of code storage.
+     * Get the block part id of tiered storage.
      */
     int getPartId();
 
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 16368fa..e3fd181 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -21,12 +21,12 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 import io.netty.buffer.ByteBuf;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
-import lombok.val;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -134,8 +134,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                                      nextExpectedId, entryId, lastEntry);
                             throw new BKException.BKUnexpectedConditionException();
                         } else {
-                            val skipped = inputStream.skip(length);
-                            log.info("Skipped {} bytes.", skipped);
+                            long ignored = inputStream.skip(length);
                         }
                     }
 
@@ -195,12 +194,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
         Blob blob = blobStore.getBlob(bucket, indexKey);
         versionCheck.check(indexKey, blob);
         OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create();
-        OffloadIndexBlock index = indexBuilder.fromStream(blob.getPayload().openStream());
+        final InputStream payLoadStream = blob.getPayload().openStream();
+        OffloadIndexBlock index = (OffloadIndexBlock) indexBuilder.fromStream(payLoadStream);
+        payLoadStream.close();
 
         BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
-            versionCheck,
-            index.getDataObjectLength(),
-            readBufferSize);
+                versionCheck,
+                index.getDataObjectLength(),
+                readBufferSize);
         return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor);
     }
 }
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
new file mode 100644
index 0000000..d8749f3
--- /dev/null
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.val;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
+    private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImplV2.class);
+
+    private final long ledgerId;
+    private final List<OffloadIndexBlockV2> indices;
+    private final List<BackedInputStream> inputStreams;
+    private final List<DataInputStream> dataStreams;
+    private final ExecutorService executor;
+
+    static class GroupedReader {
+        @Override
+        public String toString() {
+            return "GroupedReader{" +
+                    "ledgerId=" + ledgerId +
+                    ", firstEntry=" + firstEntry +
+                    ", lastEntry=" + lastEntry +
+                    '}';
+        }
+
+        public final long ledgerId;
+        public final long firstEntry;
+        public final long lastEntry;
+        OffloadIndexBlockV2 index;
+        BackedInputStream inputStream;
+        DataInputStream dataStream;
+
+        public GroupedReader(long ledgerId, long firstEntry, long lastEntry,
+                             OffloadIndexBlockV2 index,
+                             BackedInputStream inputStream, DataInputStream dataStream) {
+            this.ledgerId = ledgerId;
+            this.firstEntry = firstEntry;
+            this.lastEntry = lastEntry;
+            this.index = index;
+            this.inputStream = inputStream;
+            this.dataStream = dataStream;
+        }
+    }
+
+    private BlobStoreBackedReadHandleImplV2(long ledgerId, List<OffloadIndexBlockV2> indices,
+                                            List<BackedInputStream> inputStreams,
+                                            ExecutorService executor) {
+        this.ledgerId = ledgerId;
+        this.indices = indices;
+        this.inputStreams = inputStreams;
+        this.dataStreams = new LinkedList<>();
+        for (BackedInputStream inputStream : inputStreams) {
+            dataStreams.add(new DataInputStream(inputStream));
+        }
+        this.executor = executor;
+    }
+
+    @Override
+    public long getId() {
+        return ledgerId;
+    }
+
+    @Override
+    public LedgerMetadata getLedgerMetadata() {
+        //get the most complete one
+        return indices.get(indices.size() - 1).getLedgerMetadata(ledgerId);
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        executor.submit(() -> {
+            try {
+                for (OffloadIndexBlockV2 indexBlock : indices) {
+                    indexBlock.close();
+                }
+                for (DataInputStream dataStream : dataStreams) {
+                    dataStream.close();
+                }
+                promise.complete(null);
+            } catch (IOException t) {
+                promise.completeExceptionally(t);
+            }
+        });
+        return promise;
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
+        log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
+        CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+        if (firstEntry > lastEntry
+                || firstEntry < 0
+                || lastEntry > getLastAddConfirmed()) {
+            promise.completeExceptionally(new IllegalArgumentException());
+            return promise;
+        }
+        executor.submit(() -> {
+            List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+            List<GroupedReader> groupedReaders = null;
+            try {
+                groupedReaders = getGroupedReader(firstEntry, lastEntry);
+            } catch (Exception e) {
+                promise.completeExceptionally(e);
+                return;
+            }
+
+            for (GroupedReader groupedReader : groupedReaders) {
+                long entriesToRead = (groupedReader.lastEntry - groupedReader.firstEntry) + 1;
+                long nextExpectedId = groupedReader.firstEntry;
+                try {
+                    while (entriesToRead > 0) {
+                        int length = groupedReader.dataStream.readInt();
+                        if (length < 0) { // hit padding or new block
+                            groupedReader.inputStream
+                                    .seek(groupedReader.index
+                                            .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+                                            .getDataOffset());
+                            continue;
+                        }
+                        long entryId = groupedReader.dataStream.readLong();
+
+                        if (entryId == nextExpectedId) {
+                            ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
+                            entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
+                            int toWrite = length;
+                            while (toWrite > 0) {
+                                toWrite -= buf.writeBytes(groupedReader.dataStream, toWrite);
+                            }
+                            entriesToRead--;
+                            nextExpectedId++;
+                        } else if (entryId > nextExpectedId) {
+                            groupedReader.inputStream
+                                    .seek(groupedReader.index
+                                            .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+                                            .getDataOffset());
+                            continue;
+                        } else if (entryId < nextExpectedId
+                                && !groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+                                .equals(
+                                        groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId, entryId))) {
+                            groupedReader.inputStream
+                                    .seek(groupedReader.index
+                                            .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+                                            .getDataOffset());
+                            continue;
+                        } else if (entryId > groupedReader.lastEntry) {
+                            log.info("Expected to read {}, but read {}, which is greater than last entry {}",
+                                    nextExpectedId, entryId, groupedReader.lastEntry);
+                            throw new BKException.BKUnexpectedConditionException();
+                        } else {
+                            val skipped = groupedReader.inputStream.skip(length);
+                        }
+                    }
+                } catch (Throwable t) {
+                    promise.completeExceptionally(t);
+                    entries.forEach(LedgerEntry::close);
+                }
+
+                promise.complete(LedgerEntriesImpl.create(entries));
+            }
+        });
+        return promise;
+    }
+
+    private List<GroupedReader> getGroupedReader(long firstEntry, long lastEntry) throws Exception {
+        List<GroupedReader> groupedReaders = new LinkedList<>();
+        for (int i = indices.size() - 1; i >= 0 && firstEntry <= lastEntry; i--) {
+            final OffloadIndexBlockV2 index = indices.get(i);
+            final long startEntryId = index.getStartEntryId(ledgerId);
+            if (startEntryId > lastEntry) {
+                log.debug("entries are in earlier indices, skip this segment ledger id: {}, begin entry id: {}",
+                        ledgerId, startEntryId);
+            } else {
+                groupedReaders.add(new GroupedReader(ledgerId, startEntryId, lastEntry, index, inputStreams.get(i),
+                        dataStreams.get(i)));
+                lastEntry = startEntryId - 1;
+            }
+        }
+
+        Preconditions.checkArgument(firstEntry > lastEntry);
+        for (int i = 0; i < groupedReaders.size() - 1; i++) {
+            final GroupedReader readerI = groupedReaders.get(i);
+            final GroupedReader readerII = groupedReaders.get(i + 1);
+            Preconditions.checkArgument(readerI.ledgerId == readerII.ledgerId);
+            Preconditions.checkArgument(readerI.firstEntry >= readerII.lastEntry);
+        }
+        return groupedReaders;
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
+        return readAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<Long> readLastAddConfirmedAsync() {
+        return CompletableFuture.completedFuture(getLastAddConfirmed());
+    }
+
+    @Override
+    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+        return CompletableFuture.completedFuture(getLastAddConfirmed());
+    }
+
+    @Override
+    public long getLastAddConfirmed() {
+        return getLedgerMetadata().getLastEntryId();
+    }
+
+    @Override
+    public long getLength() {
+        return getLedgerMetadata().getLength();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return getLedgerMetadata().isClosed();
+    }
+
+    @Override
+    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                                      long timeOutInMillis,
+                                                                                      boolean parallel) {
+        CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>();
+        promise.completeExceptionally(new UnsupportedOperationException());
+        return promise;
+    }
+
+    public static ReadHandle open(ScheduledExecutorService executor,
+                                  BlobStore blobStore, String bucket, List<String> keys, List<String> indexKeys,
+                                  VersionCheck versionCheck,
+                                  long ledgerId, int readBufferSize)
+            throws IOException {
+        List<BackedInputStream> inputStreams = new LinkedList<>();
+        List<OffloadIndexBlockV2> indice = new LinkedList<>();
+        for (int i = 0; i < indexKeys.size(); i++) {
+            String indexKey = indexKeys.get(i);
+            String key = keys.get(i);
+            log.debug("open bucket: {} index key: {}", bucket, indexKey);
+            Blob blob = blobStore.getBlob(bucket, indexKey);
+            log.debug("indexKey blob: {} {}", indexKey, blob);
+            versionCheck.check(indexKey, blob);
+            OffloadIndexBlockV2Builder indexBuilder = OffloadIndexBlockV2Builder.create();
+            final InputStream payloadStream = blob.getPayload().openStream();
+            OffloadIndexBlockV2 index = indexBuilder.fromStream(payloadStream);
+            payloadStream.close();
+
+            BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
+                    versionCheck,
+                    index.getDataObjectLength(),
+                    readBufferSize);
+            inputStreams.add(inputStream);
+            indice.add(index);
+        }
+        return new BlobStoreBackedReadHandleImplV2(ledgerId, indice, inputStreams, executor);
+    }
+}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 8607e00..c76864d 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -22,22 +22,39 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.time.Duration;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle.OfferEntryResult;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.OffloadSegmentInfoImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock.IndexInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.BlobStoreLocation;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
@@ -50,6 +67,7 @@ import org.jclouds.domain.LocationBuilder;
 import org.jclouds.domain.LocationScope;
 import org.jclouds.io.Payload;
 import org.jclouds.io.Payloads;
+import org.jclouds.io.payloads.InputStreamPayload;
 
 /**
  * Tiered Storage Offloader that is backed by a JCloud Blob Store.
@@ -70,10 +88,24 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
     private final Map<String, String> userMetadata;
 
     private final ConcurrentMap<BlobStoreLocation, BlobStore> blobStores = new ConcurrentHashMap<>();
+    private OffloadSegmentInfoImpl segmentInfo;
+    private AtomicLong bufferLength = new AtomicLong(0);
+    private AtomicLong segmentLength = new AtomicLong(0);
+    final private long maxBufferLength;
+    final private ConcurrentLinkedQueue<Entry> offloadBuffer = new ConcurrentLinkedQueue<>();
+    private CompletableFuture<OffloadResult> offloadResult;
+    private volatile PositionImpl lastOfferedPosition = PositionImpl.latest;
+    private final Duration maxSegmentCloseTime;
+    private final long minSegmentCloseTimeMillis;
+    private final long segmentBeginTimeMillis;
+    private final long maxSegmentLength;
+    private final int streamingBlockSize;
+    private volatile ManagedLedger ml;
+    private OffloadIndexBlockV2Builder streamingIndexBuilder;
 
     public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config,
-            Map<String, String> userMetadata,
-            OrderedScheduler scheduler) throws IOException {
+                                                         Map<String, String> userMetadata,
+                                                         OrderedScheduler scheduler) throws IOException {
 
         return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata);
     }
@@ -84,13 +116,20 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
         this.scheduler = scheduler;
         this.userMetadata = userMetadata;
         this.config = config;
+        this.streamingBlockSize = config.getMinBlockSizeInBytes();
+        this.maxSegmentCloseTime = Duration.ofSeconds(config.getMaxSegmentTimeInSecond());
+        this.maxSegmentLength = config.getMaxSegmentSizeInBytes();
+        this.minSegmentCloseTimeMillis = Duration.ofSeconds(config.getMinSegmentTimeInSecond()).toMillis();
+        //ensure buffer can have enough content to fill a block
+        this.maxBufferLength = Math.max(config.getWriteBufferSizeInBytes(), config.getMinBlockSizeInBytes());
+        this.segmentBeginTimeMillis = System.currentTimeMillis();
 
         if (!Strings.isNullOrEmpty(config.getRegion())) {
             this.writeLocation = new LocationBuilder()
-                .scope(LocationScope.REGION)
-                .id(config.getRegion())
-                .description(config.getRegion())
-                .build();
+                    .scope(LocationScope.REGION)
+                    .id(config.getRegion())
+                    .description(config.getRegion())
+                    .build();
         } else {
             this.writeLocation = null;
         }
@@ -201,7 +240,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
 
             // upload index block
             try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build();
-                 OffloadIndexBlock.IndexInputStream indexStream = index.toStream()) {
+                 IndexInputStream indexStream = index.toStream()) {
                 // write the index block
                 BlobBuilder blobBuilder = writeBlobStore.blobBuilder(indexBlockKey);
                 DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
@@ -210,8 +249,8 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
                 indexPayload.getContentMetadata().setContentType("application/octet-stream");
 
                 Blob blob = blobBuilder
-                    .payload(indexPayload)
-                    .contentLength((long) indexStream.getStreamSize())
+                        .payload(indexPayload)
+                        .contentLength((long) indexStream.getStreamSize())
                     .build();
 
                 writeBlobStore.putBlob(config.getBucket(), blob);
@@ -230,6 +269,214 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
         return promise;
     }
 
+    BlobStore blobStore;
+    String streamingDataBlockKey;
+    String streamingDataIndexKey;
+    MultipartUpload streamingMpu = null;
+    List<MultipartPart> streamingParts = Lists.newArrayList();
+
+    @Override
+    public CompletableFuture<OffloadHandle> streamingOffload(@NonNull ManagedLedger ml, UUID uuid, long beginLedger,
+                                                             long beginEntry,
+                                                             Map<String, String> driverMetadata) {
+        if (this.ml != null) {
+            log.error("streamingOffload should only be called once");
+            final CompletableFuture<OffloadHandle> result = new CompletableFuture<>();
+            result.completeExceptionally(new RuntimeException("streamingOffload should only be called once"));
+        }
+
+        this.ml = ml;
+        this.segmentInfo = new OffloadSegmentInfoImpl(uuid, beginLedger, beginEntry, config.getDriver(),
+                driverMetadata);
+        log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+        this.offloadResult = new CompletableFuture<>();
+        blobStore = blobStores.get(config.getBlobStoreLocation());
+        streamingIndexBuilder = OffloadIndexBlockV2Builder.create();
+        streamingDataBlockKey = segmentInfo.uuid.toString();
+        streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+        BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+        DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+        Blob blob = blobBuilder.build();
+        streamingMpu = blobStore
+                .initiateMultipartUpload(config.getBucket(), blob.getMetadata(), new PutOptions());
+
+        scheduler.chooseThread(segmentInfo).execute(() -> {
+            log.info("start offloading segment: {}", segmentInfo);
+            streamingOffloadLoop(1, 0);
+        });
+        scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(), TimeUnit.MILLISECONDS);
+
+        return CompletableFuture.completedFuture(new OffloadHandle() {
+            @Override
+            public Position lastOffered() {
+                return BlobStoreManagedLedgerOffloader.this.lastOffered();
+            }
+
+            @Override
+            public CompletableFuture<Position> lastOfferedAsync() {
+                return CompletableFuture.completedFuture(lastOffered());
+            }
+
+            @Override
+            public OfferEntryResult offerEntry(Entry entry) {
+                return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+            }
+
+            @Override
+            public CompletableFuture<OfferEntryResult> offerEntryAsync(Entry entry) {
+                return CompletableFuture.completedFuture(offerEntry(entry));
+            }
+
+            @Override
+            public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+                return BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+            }
+
+            @Override
+            public boolean close() {
+                return BlobStoreManagedLedgerOffloader.this.closeSegment();
+            }
+        });
+    }
+
+    private void streamingOffloadLoop(int partId, int dataObjectLength) {
+        log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+        if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+            buildIndexAndCompleteResult(dataObjectLength);
+            offloadResult.complete(segmentInfo.result());
+        } else if ((segmentInfo.isClosed() && !offloadBuffer.isEmpty())
+                // last time to build and upload block
+                || bufferLength.get() >= streamingBlockSize
+            // buffer size full, build and upload block
+        ) {
+            List<Entry> entries = new LinkedList<>();
+            int blockEntrySize = 0;
+            final Entry firstEntry = offloadBuffer.poll();
+            entries.add(firstEntry);
+            long blockLedgerId = firstEntry.getLedgerId();
+            long blockEntryId = firstEntry.getEntryId();
+
+            while (!offloadBuffer.isEmpty() && offloadBuffer.peek().getLedgerId() == blockLedgerId
+                    && blockEntrySize <= streamingBlockSize) {
+                final Entry entryInBlock = offloadBuffer.poll();
+                final int entrySize = entryInBlock.getLength();
+                bufferLength.addAndGet(-entrySize);
+                blockEntrySize += entrySize;
+                entries.add(entryInBlock);
+            }
+            final int blockSize = BufferedOffloadStream
+                    .calculateBlockSize(streamingBlockSize, entries.size(), blockEntrySize);
+            buildBlockAndUpload(blockSize, entries, blockLedgerId, blockEntryId, partId);
+            streamingOffloadLoop(partId + 1, dataObjectLength + blockSize);
+        } else {
+            log.debug("not enough data, delay schedule for part: {} length: {}", partId, dataObjectLength);
+            scheduler.chooseThread(segmentInfo)
+                    .schedule(() -> {
+                        streamingOffloadLoop(partId, dataObjectLength);
+                    }, 100, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private void buildBlockAndUpload(int blockSize, List<Entry> entries, long blockLedgerId, long beginEntryId,
+                                     int partId) {
+        try (final BufferedOffloadStream payloadStream = new BufferedOffloadStream(blockSize, entries,
+                blockLedgerId, beginEntryId)) {
+            log.debug("begin upload payload: {} {}", blockLedgerId, beginEntryId);
+            Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+            partPayload.getContentMetadata().setContentType("application/octet-stream");
+            streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId, partPayload));
+            streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+            streamingIndexBuilder.addBlock(blockLedgerId, beginEntryId, partId, blockSize);
+            final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ml.getLedgerInfo(blockLedgerId).get();
+            final MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder ledgerInfoBuilder = MLDataFormats.ManagedLedgerInfo.LedgerInfo
+                    .newBuilder();
+            if (ledgerInfo != null) {
+                ledgerInfoBuilder.mergeFrom(ledgerInfo);
+            }
+            if (ledgerInfoBuilder.getEntries() == 0) {
+                //ledger unclosed, use last entry id of the block
+                ledgerInfoBuilder.setEntries(payloadStream.getEndEntryId() + 1);
+            }
+            streamingIndexBuilder.addLedgerMeta(blockLedgerId, ledgerInfoBuilder.build());
+            log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}",
+                    config.getBucket(), streamingDataBlockKey, partId, streamingMpu.id());
+        } catch (Throwable e) {
+            blobStore.abortMultipartUpload(streamingMpu);
+            offloadResult.completeExceptionally(e);
+            return;
+        }
+    }
+
+    private void buildIndexAndCompleteResult(long dataObjectLength) {
+        try {
+            blobStore.completeMultipartUpload(streamingMpu, streamingParts);
+            streamingIndexBuilder.withDataObjectLength(dataObjectLength);
+            final OffloadIndexBlockV2 index = streamingIndexBuilder.buildV2();
+            final IndexInputStream indexStream = index.toStream();
+            final BlobBuilder indexBlobBuilder = blobStore.blobBuilder(streamingDataIndexKey);
+            streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+
+            DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
+            try (final InputStreamPayload indexPayLoad = Payloads.newInputStreamPayload(indexStream)) {
+                indexPayLoad.getContentMetadata().setContentLength(indexStream.getStreamSize());
+                indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
+                final Blob indexBlob = indexBlobBuilder.payload(indexPayLoad)
+                        .contentLength(indexStream.getStreamSize())
+                        .build();
+                blobStore.putBlob(config.getBucket(), indexBlob);
+
+                final OffloadResult result = segmentInfo.result();
+                offloadResult.complete(result);
+                log.debug("offload segment completed {}", result);
+            } catch (Exception e) {
+                log.error("streaming offload failed", e);
+                offloadResult.completeExceptionally(e);
+            }
+        } catch (Exception e) {
+            log.error("streaming offload failed", e);
+            offloadResult.completeExceptionally(e);
+        }
+    }
+
+    private CompletableFuture<OffloadResult> getOffloadResultAsync() {
+        return this.offloadResult;
+    }
+
+    private synchronized OfferEntryResult offerEntry(Entry entry) {
+
+        if (segmentInfo.isClosed()) {
+            log.debug("Segment already closed {}", segmentInfo);
+            return OfferEntryResult.FAIL_SEGMENT_CLOSED;
+        } else if (maxBufferLength <= bufferLength.get()) {
+            //buffer length can over fill maxBufferLength a bit with the last entry
+            //to prevent insufficient content to build a block
+            return OfferEntryResult.FAIL_BUFFER_FULL;
+        } else {
+            final EntryImpl entryImpl = EntryImpl
+                    .create(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer());
+            offloadBuffer.add(entryImpl);
+            bufferLength.getAndAdd(entryImpl.getLength());
+            segmentLength.getAndAdd(entryImpl.getLength());
+            lastOfferedPosition = entryImpl.getPosition();
+            if (segmentLength.get() >= maxSegmentLength
+                    && System.currentTimeMillis() - segmentBeginTimeMillis >= minSegmentCloseTimeMillis) {
+                closeSegment();
+            }
+            return OfferEntryResult.SUCCESS;
+        }
+    }
+
+    private synchronized boolean closeSegment() {
+        final boolean result = !segmentInfo.isClosed();
+        log.debug("close segment {} {}", lastOfferedPosition.getLedgerId(), lastOfferedPosition.getEntryId());
+        this.segmentInfo.closeSegment(lastOfferedPosition.getLedgerId(), lastOfferedPosition.getEntryId());
+        return result;
+    }
+
+    private PositionImpl lastOffered() {
+        return lastOfferedPosition;
+    }
+
     /**
      * Attempts to create a BlobStoreLocation from the values in the offloadDriverMetadata,
      * however, if no values are available, it defaults to the currently configured
@@ -255,17 +502,50 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
         String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
         String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
         scheduler.chooseThread(ledgerId).submit(() -> {
-                try {
-                    promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
-                                                                 readBlobstore,
-                                                                 readBucket, key, indexKey,
-                                                                 DataBlockUtils.VERSION_CHECK,
-                                                                 ledgerId, config.getReadBufferSizeInBytes()));
-                } catch (Throwable t) {
-                    log.error("Failed readOffloaded: ", t);
-                    promise.completeExceptionally(t);
-                }
-            });
+            try {
+                promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
+                        readBlobstore,
+                        readBucket, key, indexKey,
+                        DataBlockUtils.VERSION_CHECK,
+                        ledgerId, config.getReadBufferSizeInBytes()));
+            } catch (Throwable t) {
+                log.error("Failed readOffloaded: ", t);
+                promise.completeExceptionally(t);
+            }
+        });
+        return promise;
+    }
+
+    @Override
+    public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.OffloadContext ledgerContext,
+                                                       Map<String, String> offloadDriverMetadata) {
+        BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
+        String readBucket = bsKey.getBucket();
+        BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
+        CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
+        final List<MLDataFormats.OffloadSegment> offloadSegmentList = ledgerContext.getOffloadSegmentList();
+        List<String> keys = Lists.newLinkedList();
+        List<String> indexKeys = Lists.newLinkedList();
+        offloadSegmentList.forEach(seg -> {
+            final UUID uuid = new UUID(seg.getUidMsb(), seg.getUidLsb());
+            final String key = uuid.toString();
+            final String indexKey = DataBlockUtils.indexBlockOffloadKey(uuid);
+            keys.add(key);
+            indexKeys.add(indexKey);
+        });
+
+        scheduler.chooseThread(ledgerId).submit(() -> {
+            try {
+                promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId),
+                        readBlobstore,
+                        readBucket, keys, indexKeys,
+                        DataBlockUtils.VERSION_CHECK,
+                        ledgerId, config.getReadBufferSizeInBytes()));
+            } catch (Throwable t) {
+                log.error("Failed readOffloaded: ", t);
+                promise.completeExceptionally(t);
+            }
+        });
         return promise;
     }
 
@@ -292,6 +572,27 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
         return promise;
     }
 
+    @Override
+    public CompletableFuture<Void> deleteOffloaded(UUID uid, Map<String, String> offloadDriverMetadata) {
+        BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
+        String readBucket = bsKey.getBucket(offloadDriverMetadata);
+        BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        scheduler.submit(() -> {
+            try {
+                readBlobstore.removeBlobs(readBucket,
+                        ImmutableList.of(uid.toString(),
+                                DataBlockUtils.indexBlockOffloadKey(uid)));
+                promise.complete(null);
+            } catch (Throwable t) {
+                log.error("Failed delete Blob", t);
+                promise.completeExceptionally(t);
+            }
+        });
+
+        return promise;
+    }
 
     @Override
     public OffloadPolicies getOffloadPolicies() {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
index dcc693a..a4ffdea 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
@@ -19,7 +19,6 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import static com.google.common.base.Preconditions.checkState;
-
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
@@ -44,7 +43,7 @@ import org.slf4j.LoggerFactory;
 public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStream {
     private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
 
-    private static final int[] BLOCK_END_PADDING = new int[] { 0xFE, 0xDC, 0xDE, 0xAD };
+    static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD };
 
     private final ReadHandle ledger;
     private final long startEntryId;
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStream.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStream.java
new file mode 100644
index 0000000..3a323ce
--- /dev/null
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStream.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+
+@Slf4j
+public class BufferedOffloadStream extends InputStream {
+    static final int[] BLOCK_END_PADDING = BlockAwareSegmentInputStreamImpl.BLOCK_END_PADDING;
+
+    private final long ledgerId;
+    private final long beginEntryId;
+
+    public BufferedOffloadStream(int blockSize, List<Entry> entries, long ledgerId, long beginEntryId) {
+        this.ledgerId = ledgerId;
+        this.beginEntryId = beginEntryId;
+        this.endEntryId = beginEntryId;
+        this.blockSize = blockSize;
+        this.entryBuffer = entries;
+        this.blockHead = StreamingDataBlockHeaderImpl.of(blockSize, ledgerId, beginEntryId)
+                .toStream();
+    }
+
+    public long getEndEntryId() {
+        return endEntryId;
+    }
+
+    private volatile long endEntryId;
+    static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */;
+    private final long blockSize;
+    private final List<Entry> entryBuffer;
+    private final InputStream blockHead;
+    int offset = 0;
+    static final int NOT_INITIALIZED = -1;
+    int validDataOffset = NOT_INITIALIZED;
+    CompositeByteBuf currentEntry;
+
+    public long getLedgerId() {
+        return ledgerId;
+    }
+
+    public long getBeginEntryId() {
+        return beginEntryId;
+    }
+
+    public long getBlockSize() {
+        return blockSize;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (blockHead.available() > 0) {
+            offset++;
+            return blockHead.read();
+        }
+        //if current exists, use current first
+        if (currentEntry != null) {
+            if (currentEntry.readableBytes() > 0) {
+                offset += 1;
+                return currentEntry.readUnsignedByte();
+            } else {
+                currentEntry.release();
+                currentEntry = null;
+            }
+        }
+
+        if (blockSize <= offset) {
+            return -1;
+        } else if (validDataOffset != NOT_INITIALIZED) {
+            return BLOCK_END_PADDING[(offset++ - validDataOffset) % BLOCK_END_PADDING.length];
+        }
+
+
+        if (entryBuffer.isEmpty()) {
+            validDataOffset = offset;
+            return read();
+        }
+
+        Entry headEntry = entryBuffer.remove(0);
+
+        //create new block when a ledger end
+        if (headEntry.getLedgerId() != this.ledgerId) {
+            throw new RuntimeException(
+                    String.format("there should not be multi ledger in a block %s %s", headEntry.getLedgerId(),
+                            this.ledgerId));
+        }
+
+        final int entryLength = headEntry.getLength();
+        final long entryId = headEntry.getEntryId();
+        CompositeByteBuf entryBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(2);
+        ByteBuf entryHeaderBuf = PulsarByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);
+        entryHeaderBuf.writeInt(entryLength).writeLong(entryId);
+        entryBuf.addComponents(true, entryHeaderBuf, headEntry.getDataBuffer().retain());
+        endEntryId = headEntry.getEntryId();
+        headEntry.release();
+        currentEntry = entryBuf;
+        return read();
+
+    }
+
+    @Override
+    public void close() throws IOException {
+        blockHead.close();
+    }
+
+    public static int calculateBlockSize(int streamingBlockSize, int entryCount, int entrySize) {
+        int validDataSize = (entryCount * ENTRY_HEADER_SIZE
+                + entrySize
+                + StreamingDataBlockHeaderImpl.getDataStartOffset());
+        return Math.max(streamingBlockSize, validDataSize);
+    }
+}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
index 9239ec2..cd46c52 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
@@ -19,10 +19,8 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import com.google.common.io.CountingInputStream;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.PooledByteBufAllocator;
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -31,7 +29,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 
 /**
- * The data block header in code storage for each data block.
+ * The data block header in tiered storage for each data block.
  */
 public class DataBlockHeaderImpl implements DataBlockHeader {
     // Magic Word for data block.
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockUtils.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockUtils.java
index abbb032..440221f 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockUtils.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockUtils.java
@@ -19,11 +19,9 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import com.google.common.collect.ImmutableMap;
-
 import java.io.IOException;
 import java.util.Map;
 import java.util.UUID;
-
 import org.jclouds.blobstore.domain.Blob;
 import org.jclouds.blobstore.domain.BlobBuilder;
 
@@ -59,6 +57,10 @@ public class DataBlockUtils {
         return String.format("%s-ledger-%d-index", uuid.toString(), ledgerId);
     }
 
+    public static String indexBlockOffloadKey(UUID uuid) {
+        return String.format("%s-index", uuid.toString());
+    }
+
     public static void addVersionInfo(BlobBuilder blobBuilder, Map<String, String> userMetadata) {
         ImmutableMap.Builder<String, String> metadataBuilder = ImmutableMap.builder();
         metadataBuilder.putAll(userMetadata);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockBuilderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockBuilderImpl.java
deleted file mode 100644
index 42f4567..0000000
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockBuilderImpl.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.bookkeeper.mledger.offload.jcloud.impl;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import org.apache.bookkeeper.client.api.LedgerMetadata;
-import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
-import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
-
-/**
- * Interface for builder of index block used for offload a ledger to long term storage.
- */
-public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
-
-    private LedgerMetadata ledgerMetadata;
-    private long dataObjectLength;
-    private long dataHeaderLength;
-    private List<OffloadIndexEntryImpl> entries;
-    private int lastBlockSize;
-
-    public OffloadIndexBlockBuilderImpl() {
-        this.entries = Lists.newArrayList();
-    }
-
-    @Override
-    public OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength) {
-        this.dataObjectLength = dataObjectLength;
-        return this;
-    }
-
-    @Override
-    public OffloadIndexBlockBuilder withDataBlockHeaderLength(long dataHeaderLength) {
-        this.dataHeaderLength = dataHeaderLength;
-        return this;
-    }
-
-    @Override
-    public OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata) {
-        this.ledgerMetadata = metadata;
-        return this;
-    }
-
-    @Override
-    public OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize) {
-        checkState(dataHeaderLength > 0);
-
-        // we should added one by one.
-        long offset;
-        if (firstEntryId == 0) {
-            checkState(entries.size() == 0);
-            offset = 0;
-        } else {
-            checkState(entries.size() > 0);
-            offset = entries.get(entries.size() - 1).getOffset() + lastBlockSize;
-        }
-        lastBlockSize = blockSize;
-
-        this.entries.add(OffloadIndexEntryImpl.of(firstEntryId, partId, offset, dataHeaderLength));
-        return this;
-    }
-
-    @Override
-    public OffloadIndexBlock fromStream(InputStream is) throws IOException {
-        return OffloadIndexBlockImpl.get(is);
-    }
-
-    @Override
-    public OffloadIndexBlock build() {
-        checkState(ledgerMetadata != null);
-        checkState(!entries.isEmpty());
-        checkState(dataObjectLength > 0);
-        checkState(dataHeaderLength > 0);
-        return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, dataHeaderLength, entries);
-    }
-
-}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
index 864a942..e525871 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
@@ -27,7 +27,6 @@ import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -80,7 +79,11 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
         return block;
     }
 
-    public static OffloadIndexBlockImpl get(InputStream stream) throws IOException {
+    public static OffloadIndexBlockImpl get(int magic, DataInputStream stream) throws IOException {
+        if (magic != INDEX_MAGIC_WORD) {
+            throw new IOException(String.format("Invalid MagicWord. read: 0x%x  expected: 0x%x",
+                    magic, INDEX_MAGIC_WORD));
+        }
         OffloadIndexBlockImpl block = RECYCLER.get();
         block.indexEntries = Maps.newTreeMap();
         block.fromStream(stream);
@@ -164,7 +167,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
 
         // write entries
         this.indexEntries.entrySet().forEach(entry ->
-            out.writeLong(entry.getValue().getEntryId())
+                out.writeLong(entry.getValue().getEntryId())
                 .writeInt(entry.getValue().getPartId())
                 .writeLong(entry.getValue().getOffset()));
 
@@ -198,7 +201,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
 
             if (ledgerMetadataFormat.getCustomMetadataCount() > 0) {
                 ledgerMetadataFormat.getCustomMetadataList().forEach(
-                    entry -> this.customMetadata.put(entry.getKey(), entry.getValue().toByteArray()));
+                        entry -> this.customMetadata.put(entry.getKey(), entry.getValue().toByteArray()));
             }
 
             ledgerMetadataFormat.getSegmentList().forEach(segment -> {
@@ -327,13 +330,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
         return new InternalLedgerMetadata(builder.build());
     }
 
-    private OffloadIndexBlock fromStream(InputStream stream) throws IOException {
-        DataInputStream dis = new DataInputStream(stream);
-        int magic = dis.readInt();
-        if (magic != this.INDEX_MAGIC_WORD) {
-            throw new IOException(String.format("Invalid MagicWord. read: 0x%x  expected: 0x%x",
-                                                magic, INDEX_MAGIC_WORD));
-        }
+    private OffloadIndexBlock fromStream(DataInputStream dis) throws IOException {
         dis.readInt(); // no used index block length
         this.dataObjectLength = dis.readLong();
         this.dataHeaderLength = dis.readLong();
@@ -351,9 +348,8 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
         for (int i = 0; i < indexEntryCount; i++) {
             long entryId = dis.readLong();
             this.indexEntries.putIfAbsent(entryId, OffloadIndexEntryImpl.of(entryId, dis.readInt(),
-                                                                            dis.readLong(), dataHeaderLength));
+                    dis.readLong(), dataHeaderLength));
         }
-
         return this;
     }
 
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2BuilderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2BuilderImpl.java
new file mode 100644
index 0000000..fa8bc14
--- /dev/null
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2BuilderImpl.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.collect.Lists;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+
+/**
+ * Interface for builder of index block used for offload a ledger to long term storage.
+ */
+public class OffloadIndexBlockV2BuilderImpl implements OffloadIndexBlockBuilder, OffloadIndexBlockV2Builder {
+
+    private final Map<Long, LedgerInfo> ledgerMetadataMap;
+    private LedgerMetadata ledgerMetadata;
+    private long dataObjectLength;
+    private long dataHeaderLength;
+    private List<OffloadIndexEntryImpl> entries;
+    private int lastBlockSize;
+    private int lastStreamingBlockSize;
+    private long streamingOffset = 0;
+    private final SortedMap<Long, List<OffloadIndexEntryImpl>> entryMap = new TreeMap<>();
+
+
+    public OffloadIndexBlockV2BuilderImpl() {
+        this.entries = Lists.newArrayList();
+        this.ledgerMetadataMap = new HashMap<>();
+    }
+
+    @Override
+    public OffloadIndexBlockV2BuilderImpl withDataObjectLength(long dataObjectLength) {
+        this.dataObjectLength = dataObjectLength;
+        return this;
+    }
+
+    @Override
+    public OffloadIndexBlockV2BuilderImpl withDataBlockHeaderLength(long dataHeaderLength) {
+        this.dataHeaderLength = dataHeaderLength;
+        return this;
+    }
+
+    @Override
+    public OffloadIndexBlockV2BuilderImpl withLedgerMetadata(LedgerMetadata metadata) {
+        this.ledgerMetadata = metadata;
+        return this;
+    }
+
+    @Override
+    public OffloadIndexBlockV2BuilderImpl addLedgerMeta(Long ledgerId, LedgerInfo metadata) {
+        this.ledgerMetadataMap.put(ledgerId, metadata);
+        return this;
+    }
+
+    @Override
+    public OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize) {
+        checkState(dataHeaderLength > 0);
+
+        // we should added one by one.
+        long offset;
+        if (firstEntryId == 0) {
+            checkState(entries.size() == 0);
+            offset = 0;
+        } else {
+            checkState(entries.size() > 0);
+            offset = entries.get(entries.size() - 1).getOffset() + lastBlockSize;
+        }
+        lastBlockSize = blockSize;
+
+        this.entries.add(OffloadIndexEntryImpl.of(firstEntryId, partId, offset, dataHeaderLength));
+        return this;
+    }
+
+    @Override
+    public OffloadIndexBlockV2Builder addBlock(long ledgerId, long firstEntryId, int partId, int blockSize) {
+        checkState(dataHeaderLength > 0);
+
+        streamingOffset = streamingOffset + lastStreamingBlockSize;
+        lastStreamingBlockSize = blockSize;
+
+        final List<OffloadIndexEntryImpl> list = entryMap.getOrDefault(ledgerId, new LinkedList<>());
+        list.add(OffloadIndexEntryImpl.of(firstEntryId, partId, streamingOffset, dataHeaderLength));
+        entryMap.put(ledgerId, list);
+        return this;
+    }
+
+    @Override
+    public OffloadIndexBlockV2 fromStream(InputStream is) throws IOException {
+        final DataInputStream dataInputStream = new DataInputStream(is);
+        final int magic = dataInputStream.readInt();
+        if (magic == OffloadIndexBlockImpl.getIndexMagicWord()) {
+            return OffloadIndexBlockImpl.get(magic, dataInputStream);
+        } else if (magic == OffloadIndexBlockV2Impl.getIndexMagicWord()) {
+            return OffloadIndexBlockV2Impl.get(magic, dataInputStream);
+        } else {
+            throw new IOException(String.format("Invalid MagicWord. read: 0x%x  expected: 0x%x or 0x%x",
+                    magic, OffloadIndexBlockImpl.getIndexMagicWord(),
+                    OffloadIndexBlockV2Impl.getIndexMagicWord()));
+        }
+    }
+
+    @Override
+    public OffloadIndexBlock build() {
+        checkState(ledgerMetadata != null);
+        checkState(!entries.isEmpty());
+        checkState(dataObjectLength > 0);
+        checkState(dataHeaderLength > 0);
+        return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, dataHeaderLength, entries);
+    }
+
+    @Override
+    public OffloadIndexBlockV2 buildV2() {
+        checkState(!ledgerMetadataMap.isEmpty());
+        checkState(true);
+        checkState(!entryMap.isEmpty());
+        checkState(dataObjectLength > 0);
+        checkState(dataHeaderLength > 0);
+        return OffloadIndexBlockV2Impl.get(ledgerMetadataMap, dataObjectLength, dataHeaderLength, entryMap);
+    }
+
+}
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2Impl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2Impl.java
new file mode 100644
index 0000000..d58a4c1
--- /dev/null
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2Impl.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock.IndexInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OffloadIndexBlockV2Impl implements OffloadIndexBlockV2 {
+    private static final Logger log = LoggerFactory.getLogger(OffloadIndexBlockImpl.class);
+
+    private static final int INDEX_MAGIC_WORD = 0x3D1FB0BC;
+
+    private Map<Long, LedgerInfo> segmentMetadata;
+    final private Map<Long, LedgerMetadata> compatibleMetadata = Maps.newTreeMap();
+    private long dataObjectLength;
+    private long dataHeaderLength;
+    //    private TreeMap<Long, OffloadIndexEntryImpl> indexEntries;
+    private Map<Long, TreeMap<Long, OffloadIndexEntryImpl>> indexEntries;
+
+
+    private final Handle<OffloadIndexBlockV2Impl> recyclerHandle;
+
+    private static final Recycler<OffloadIndexBlockV2Impl> RECYCLER = new Recycler<OffloadIndexBlockV2Impl>() {
+        @Override
+        protected OffloadIndexBlockV2Impl newObject(Handle<OffloadIndexBlockV2Impl> handle) {
+            return new OffloadIndexBlockV2Impl(handle);
+        }
+    };
+
+    private OffloadIndexBlockV2Impl(Handle<OffloadIndexBlockV2Impl> recyclerHandle) {
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    public static OffloadIndexBlockV2Impl get(Map<Long, LedgerInfo> metadata, long dataObjectLength,
+                                              long dataHeaderLength,
+                                              Map<Long, List<OffloadIndexEntryImpl>> entries) {
+        OffloadIndexBlockV2Impl block = RECYCLER.get();
+        block.indexEntries = new HashMap<>();
+        entries.forEach((ledgerId, list) -> {
+            final TreeMap<Long, OffloadIndexEntryImpl> inLedger = block.indexEntries
+                    .getOrDefault(ledgerId, new TreeMap<>());
+            list.forEach(indexEntry -> {
+                inLedger.put(indexEntry.getEntryId(), indexEntry);
+            });
+            block.indexEntries.put(ledgerId, inLedger);
+        });
+
+        block.segmentMetadata = metadata;
+        block.dataObjectLength = dataObjectLength;
+        block.dataHeaderLength = dataHeaderLength;
+        return block;
+    }
+
+    public static OffloadIndexBlockV2Impl get(int magic, DataInputStream stream) throws IOException {
+        OffloadIndexBlockV2Impl block = RECYCLER.get();
+        block.indexEntries = Maps.newTreeMap();
+        block.segmentMetadata = Maps.newTreeMap();
+        if (magic != INDEX_MAGIC_WORD) {
+            throw new IOException(String.format("Invalid MagicWord. read: 0x%x  expected: 0x%x",
+                    magic, INDEX_MAGIC_WORD));
+        }
+        block.fromStream(stream);
+        return block;
+    }
+
+    public void recycle() {
+        dataObjectLength = -1;
+        dataHeaderLength = -1;
+        segmentMetadata = null;
+        indexEntries.clear();
+        indexEntries = null;
+        if (recyclerHandle != null) {
+            recyclerHandle.recycle(this);
+        }
+    }
+
+    @Override
+    public OffloadIndexEntry getIndexEntryForEntry(long ledgerId, long messageEntryId) throws IOException {
+        if (messageEntryId > getLedgerMetadata(ledgerId).getLastEntryId()) {
+            log.warn("Try to get entry: {}, which beyond lastEntryId {}, return null",
+                    messageEntryId, getLedgerMetadata(ledgerId).getLastEntryId());
+            throw new IndexOutOfBoundsException("Entry index: " + messageEntryId
+                    + " beyond lastEntryId: " + getLedgerMetadata(ledgerId).getLastEntryId());
+        }
+        // find the greatest mapping Id whose entryId <= messageEntryId
+        return this.indexEntries.get(ledgerId).floorEntry(messageEntryId).getValue();
+    }
+
+    public long getStartEntryId(long ledgerId) {
+        return this.indexEntries.get(ledgerId).firstEntry().getValue().getEntryId();
+    }
+
+    @Override
+    public int getEntryCount() {
+        int ans = 0;
+        for (TreeMap<Long, OffloadIndexEntryImpl> v : this.indexEntries.values()) {
+            ans += v.size();
+        }
+
+        return ans;
+    }
+
+    @Override
+    public LedgerMetadata getLedgerMetadata(long ledgerId) {
+        if (compatibleMetadata.containsKey(ledgerId)) {
+            return compatibleMetadata.get(ledgerId);
+        } else if (segmentMetadata.containsKey(ledgerId)) {
+            final CompatibleMetadata result = new CompatibleMetadata(segmentMetadata.get(ledgerId));
+            compatibleMetadata.put(ledgerId, result);
+            return result;
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public long getDataObjectLength() {
+        return this.dataObjectLength;
+    }
+
+    @Override
+    public long getDataBlockHeaderLength() {
+        return this.dataHeaderLength;
+    }
+
+    /**
+     * Get the content of the index block as InputStream.
+     * Read out in format:
+     *   | index_magic_header | index_block_len | data_object_len | data_header_len |
+     *   | index_entry_count  | segment_metadata_len | segment metadata | index entries... |
+     */
+    @Override
+    public IndexInputStream toStream() throws IOException {
+
+        int indexBlockLength = 4 /* magic header */
+                + 4 /* index block length */
+                + 8 /* data object length */
+                + 8;/* data header length */
+
+        Map<Long, byte[]> metaBytesMap = new HashMap<>();
+        for (Map.Entry<Long, TreeMap<Long, OffloadIndexEntryImpl>> e : this.indexEntries.entrySet()) {
+            Long ledgerId = e.getKey();
+            TreeMap<Long, OffloadIndexEntryImpl> ledgerIndexEntries = e.getValue();
+            int indexEntryCount = ledgerIndexEntries.size();
+            byte[] ledgerMetadataByte = this.segmentMetadata.get(ledgerId).toByteArray();
+            int segmentMetadataLength = ledgerMetadataByte.length;
+            indexBlockLength += 8 /* ledger id length */
+                    + 4 /* index entry count */
+                    + 4 /* segment metadata length */
+                    + segmentMetadataLength
+                    + indexEntryCount * (8 + 4 + 8);
+            metaBytesMap.put(ledgerId, ledgerMetadataByte);
+        }
+
+        ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
+
+        out.writeInt(INDEX_MAGIC_WORD)
+                .writeInt(indexBlockLength)
+                .writeLong(dataObjectLength)
+                .writeLong(dataHeaderLength);
+
+        for (Map.Entry<Long, TreeMap<Long, OffloadIndexEntryImpl>> e : this.indexEntries.entrySet()) {
+            Long ledgerId = e.getKey();
+            TreeMap<Long, OffloadIndexEntryImpl> ledgerIndexEntries = e.getValue();
+            int indexEntryCount = ledgerIndexEntries.size();
+            byte[] ledgerMetadataByte = metaBytesMap.get(ledgerId);
+            out.writeLong(ledgerId)
+                    .writeInt(indexEntryCount)
+                    .writeInt(ledgerMetadataByte.length)
+                    .writeBytes(ledgerMetadataByte);
+            ledgerIndexEntries.values().forEach(idxEntry -> {
+                out.writeLong(idxEntry.getEntryId())
+                        .writeInt(idxEntry.getPartId())
+                        .writeLong(idxEntry.getOffset());
+            });
+        }
+
+        return new IndexInputStream(new ByteBufInputStream(out, true), indexBlockLength);
+    }
+
+    private static LedgerInfo parseLedgerInfo(byte[] bytes) throws IOException {
+        return LedgerInfo.newBuilder().mergeFrom(bytes).build();
+    }
+
+    private OffloadIndexBlockV2 fromStream(DataInputStream dis) throws IOException {
+
+        dis.readInt(); // no used index block length
+        this.dataObjectLength = dis.readLong();
+        this.dataHeaderLength = dis.readLong();
+        while (dis.available() > 0) {
+            long ledgerId = dis.readLong();
+            int indexEntryCount = dis.readInt();
+            int segmentMetadataLength = dis.readInt();
+
+            byte[] metadataBytes = new byte[segmentMetadataLength];
+
+            if (segmentMetadataLength != dis.read(metadataBytes)) {
+                log.error("Read ledgerMetadata from bytes failed");
+                throw new IOException("Read ledgerMetadata from bytes failed");
+            }
+            final LedgerInfo ledgerInfo = parseLedgerInfo(metadataBytes);
+            this.segmentMetadata.put(ledgerId, ledgerInfo);
+            final TreeMap<Long, OffloadIndexEntryImpl> indexEntries = new TreeMap<>();
+
+            for (int i = 0; i < indexEntryCount; i++) {
+                long entryId = dis.readLong();
+                indexEntries.putIfAbsent(entryId, OffloadIndexEntryImpl.of(entryId, dis.readInt(),
+                        dis.readLong(), dataHeaderLength));
+            }
+            this.indexEntries.put(ledgerId, indexEntries);
+        }
+
+        return this;
+    }
+
+    public static int getIndexMagicWord() {
+        return INDEX_MAGIC_WORD;
+    }
+
+    @Override
+    public void close() {
+        recycle();
+    }
+
+    @VisibleForTesting
+    static class CompatibleMetadata implements LedgerMetadata {
+        LedgerInfo ledgerInfo;
+
+        public CompatibleMetadata(LedgerInfo ledgerInfo) {
+            this.ledgerInfo = ledgerInfo;
+        }
+
+        @Override
+        public long getLedgerId() {
+            return ledgerInfo.getLedgerId();
+        }
+
+        @Override
+        public int getEnsembleSize() {
+            return 0;
+        }
+
+        @Override
+        public int getWriteQuorumSize() {
+            return 0;
+        }
+
+        @Override
+        public int getAckQuorumSize() {
+            return 0;
+        }
+
+        @Override
+        public long getLastEntryId() {
+            return ledgerInfo.getEntries() - 1;
+        }
+
+        @Override
+        public long getLength() {
+            return ledgerInfo.getSize();
+        }
+
+        @Override
+        public boolean hasPassword() {
+            return false;
+        }
+
+        @Override
+        public byte[] getPassword() {
+            return new byte[0];
+        }
+
+        @Override
+        public DigestType getDigestType() {
+            return null;
+        }
+
+        @Override
+        public long getCtime() {
+            return 0;
+        }
+
+        @Override
+        public boolean isClosed() {
+            return true;
+        }
+
+        @Override
+        public Map<String, byte[]> getCustomMetadata() {
+            return null;
+        }
+
+        @Override
+        public List<BookieId> getEnsembleAt(long entryId) {
+            return null;
+        }
+
+        @Override
+        public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
+            return null;
+        }
+
+        @Override
+        public State getState() {
+            return null;
+        }
+
+        @Override
+        public String toSafeString() {
+            return null;
+        }
+
+        @Override
+        public int getMetadataFormatVersion() {
+            return 0;
+        }
+
+        @Override
+        public long getCToken() {
+            return 0;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            CompatibleMetadata that = (CompatibleMetadata) o;
+            return ledgerInfo.equals(that.ledgerInfo);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(ledgerInfo);
+        }
+    }
+}
+
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
index 10408fe..0a5ebb1 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexEntryImpl.java
@@ -37,14 +37,17 @@ public class OffloadIndexEntryImpl implements OffloadIndexEntry {
     public long getEntryId() {
         return entryId;
     }
+
     @Override
     public int getPartId() {
         return partId;
     }
+
     @Override
     public long getOffset() {
         return offset;
     }
+
     @Override
     public long getDataOffset() {
         return offset + blockHeaderSize;
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingDataBlockHeaderImpl.java
similarity index 71%
copy from tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
copy to tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingDataBlockHeaderImpl.java
index 9239ec2..ed383a6 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingDataBlockHeaderImpl.java
@@ -19,10 +19,8 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import com.google.common.io.CountingInputStream;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.PooledByteBufAllocator;
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -31,44 +29,30 @@ import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 
 /**
- * The data block header in code storage for each data block.
+ * The data block header in tiered storage for each data block.
  */
-public class DataBlockHeaderImpl implements DataBlockHeader {
-    // Magic Word for data block.
+public class StreamingDataBlockHeaderImpl implements DataBlockHeader {
+    // Magic Word for streaming data block.
     // It is a sequence of bytes used to identify the start of a block.
-    static final int MAGIC_WORD = 0xFBDBABCB;
+    static final int MAGIC_WORD = 0x26A66D32;
     // This is bigger than header size. Leaving some place for alignment and future enhancement.
     // Payload use this as the start offset.
-    private static final int HEADER_MAX_SIZE = 128;
+    public static final int HEADER_MAX_SIZE = 128;
     private static final int HEADER_BYTES_USED = 4 /* magic */
-                                               + 8 /* header len */
-                                               + 8 /* block len */
-                                               + 8 /* first entry id */;
+            + 8 /* header len */
+            + 8 /* block len */
+            + 8 /* first entry id */
+            + 8 /* ledger id */;
     private static final byte[] PADDING = new byte[HEADER_MAX_SIZE - HEADER_BYTES_USED];
 
-    public static DataBlockHeaderImpl of(int blockLength, long firstEntryId) {
-        return new DataBlockHeaderImpl(HEADER_MAX_SIZE, blockLength, firstEntryId);
+    public long getLedgerId() {
+        return ledgerId;
     }
 
-    // Construct DataBlockHeader from InputStream, which contains `HEADER_MAX_SIZE` bytes readable.
-    public static DataBlockHeader fromStream(InputStream stream) throws IOException {
-        CountingInputStream countingStream = new CountingInputStream(stream);
-        DataInputStream dis = new DataInputStream(countingStream);
-        int magic = dis.readInt();
-        if (magic != MAGIC_WORD) {
-            throw new IOException("Data block header magic word not match. read: " + magic
-                    + " expected: " + MAGIC_WORD);
-        }
-
-        long headerLen = dis.readLong();
-        long blockLen = dis.readLong();
-        long firstEntryId = dis.readLong();
-        long toSkip = headerLen - countingStream.getCount();
-        if (dis.skip(toSkip) != toSkip) {
-            throw new EOFException("Header was too small");
-        }
+    private final long ledgerId;
 
-        return new DataBlockHeaderImpl(headerLen, blockLen, firstEntryId);
+    public static StreamingDataBlockHeaderImpl of(int blockLength, long ledgerId, long firstEntryId) {
+        return new StreamingDataBlockHeaderImpl(HEADER_MAX_SIZE, blockLength, ledgerId, firstEntryId);
     }
 
     private final long headerLength;
@@ -98,10 +82,33 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
         return this.firstEntryId;
     }
 
-    public DataBlockHeaderImpl(long headerLength, long blockLength, long firstEntryId) {
+    public StreamingDataBlockHeaderImpl(long headerLength, long blockLength, long ledgerId, long firstEntryId) {
         this.headerLength = headerLength;
         this.blockLength = blockLength;
         this.firstEntryId = firstEntryId;
+        this.ledgerId = ledgerId;
+    }
+
+    // Construct DataBlockHeader from InputStream, which contains `HEADER_MAX_SIZE` bytes readable.
+    public static StreamingDataBlockHeaderImpl fromStream(InputStream stream) throws IOException {
+        CountingInputStream countingStream = new CountingInputStream(stream);
+        DataInputStream dis = new DataInputStream(countingStream);
+        int magic = dis.readInt();
+        if (magic != MAGIC_WORD) {
+            throw new IOException("Data block header magic word not match. read: " + magic
+                    + " expected: " + MAGIC_WORD);
+        }
+
+        long headerLen = dis.readLong();
+        long blockLen = dis.readLong();
+        long firstEntryId = dis.readLong();
+        long ledgerId = dis.readLong();
+        long toSkip = headerLen - countingStream.getCount();
+        if (dis.skip(toSkip) != toSkip) {
+            throw new EOFException("Header was too small");
+        }
+
+        return new StreamingDataBlockHeaderImpl(headerLen, blockLen, ledgerId, firstEntryId);
     }
 
     /**
@@ -113,10 +120,11 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
     public InputStream toStream() {
         ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
         out.writeInt(MAGIC_WORD)
-            .writeLong(headerLength)
-            .writeLong(blockLength)
-            .writeLong(firstEntryId)
-            .writeBytes(PADDING);
+                .writeLong(headerLength)
+                .writeLong(blockLength)
+                .writeLong(firstEntryId)
+                .writeLong(ledgerId)
+                .writeBytes(PADDING);
 
         // true means the input stream will release the ByteBuf on close
         return new ByteBufInputStream(out, true);
@@ -124,8 +132,8 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
 
     @Override
     public String toString() {
-        return String.format("DataBlockHeader(len:%d,hlen:%d,firstEntry:%d)",
-                blockLength, headerLength, firstEntryId);
+        return String.format("StreamingDataBlockHeader(len:%d,hlen:%d,firstEntry:%d,ledger:%d)",
+                blockLength, headerLength, firstEntryId, ledgerId);
     }
 }
 
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index 7d7d7fa..5aa5765 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -61,8 +61,16 @@ public class TieredStorageConfiguration {
     public static final String METADATA_FIELD_REGION = "region";
     public static final String METADATA_FIELD_ENDPOINT = "serviceEndpoint";
     public static final String METADATA_FIELD_MAX_BLOCK_SIZE = "maxBlockSizeInBytes";
+    public static final String METADATA_FIELD_MIN_BLOCK_SIZE = "minBlockSizeInBytes";
     public static final String METADATA_FIELD_READ_BUFFER_SIZE = "readBufferSizeInBytes";
+    public static final String METADATA_FIELD_WRITE_BUFFER_SIZE = "writeBufferSizeInBytes";
     public static final String OFFLOADER_PROPERTY_PREFIX = "managedLedgerOffload";
+    public static final String MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC = "maxOffloadSegmentRolloverTimeInSeconds";
+    public static final String MIN_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC = "minOffloadSegmentRolloverTimeInSeconds";
+    public static final long DEFAULT_MAX_SEGMENT_TIME_IN_SECOND = 600;
+    public static final long DEFAULT_MIN_SEGMENT_TIME_IN_SECOND = 0;
+    public static final String MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES = "maxOffloadSegmentSizeInBytes";
+    public static final long DEFAULT_MAX_SEGMENT_SIZE_IN_BYTES = 1024 * 1024 * 1024;
 
     protected static final int MB = 1024 * 1024;
 
@@ -177,6 +185,30 @@ public class TieredStorageConfiguration {
         return null;
     }
 
+    public long getMaxSegmentTimeInSecond() {
+        if (configProperties.containsKey(MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC)) {
+            return Long.parseLong(configProperties.get(MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC));
+        } else {
+            return DEFAULT_MAX_SEGMENT_TIME_IN_SECOND;
+        }
+    }
+
+    public long getMinSegmentTimeInSecond() {
+        if (configProperties.containsKey(MIN_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC)) {
+            return Long.parseLong(configProperties.get(MIN_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC));
+        } else {
+            return DEFAULT_MIN_SEGMENT_TIME_IN_SECOND;
+        }
+    }
+
+    public long getMaxSegmentSizeInBytes() {
+        if (configProperties.containsKey(MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES)) {
+            return Long.parseLong(configProperties.get(MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES));
+        } else {
+            return DEFAULT_MAX_SEGMENT_SIZE_IN_BYTES;
+        }
+    }
+
     public void setServiceEndpoint(String s) {
         configProperties.put(getKeyName(METADATA_FIELD_ENDPOINT), s);
     }
@@ -213,6 +245,15 @@ public class TieredStorageConfiguration {
         return new Integer(64 * MB);
     }
 
+    public Integer getMinBlockSizeInBytes() {
+        for (String key : getKeys(METADATA_FIELD_MIN_BLOCK_SIZE)) {
+            if (configProperties.containsKey(key)) {
+                return Integer.valueOf(configProperties.get(key));
+            }
+        }
+        return 5 * MB;
+    }
+
     public Integer getReadBufferSizeInBytes() {
         for (String key : getKeys(METADATA_FIELD_READ_BUFFER_SIZE)) {
             if (configProperties.containsKey(key)) {
@@ -222,6 +263,15 @@ public class TieredStorageConfiguration {
         return new Integer(MB);
     }
 
+    public Integer getWriteBufferSizeInBytes() {
+        for (String key : getKeys(METADATA_FIELD_WRITE_BUFFER_SIZE)) {
+            if (configProperties.containsKey(key)) {
+                return Integer.valueOf(configProperties.get(key));
+            }
+        }
+        return 10 * MB;
+    }
+
     public Supplier<Credentials> getProviderCredentials() {
         if (credentials == null) {
             getProvider().buildCredentials(this);
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
index a2944f5..a646999 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
@@ -19,21 +19,17 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.lang.reflect.Method;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.api.DigestType;
-import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockHeaderImpl;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -42,29 +38,22 @@ import org.apache.pulsar.jcloud.shade.com.google.common.base.Supplier;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.data.ACL;
-
 import org.jclouds.blobstore.BlobStore;
-import org.jclouds.blobstore.domain.Blob;
 import org.jclouds.domain.Credentials;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 
-import com.google.common.util.concurrent.MoreExecutors;
-
 public abstract class BlobStoreManagedLedgerOffloaderBase {
 
     public final static String BUCKET = "pulsar-unittest";
     protected static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
     protected static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
-    
+
     protected final OrderedScheduler scheduler;
     protected final PulsarMockBookKeeper bk;
     protected final JCloudBlobStoreProvider provider;
     protected TieredStorageConfiguration config;
     protected BlobStore blobStore = null;
-    
+
     protected BlobStoreManagedLedgerOffloaderBase() throws Exception {
         scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(5).name("offloader").build();
         bk = new PulsarMockBookKeeper(createMockZooKeeper(), scheduler.chooseThread(this));
@@ -82,7 +71,11 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
                 CreateMode.PERSISTENT);
         return zk;
     }
-    
+
+    protected static MockManagedLedger createMockManagedLedger() {
+        return new MockManagedLedger();
+    }
+
     /*
      * Determine which BlobStore Provider to test based on the System properties
      */
@@ -95,7 +88,7 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
             return JCloudBlobStoreProvider.TRANSIENT;
         }
     }
-    
+
     /*
      * Get the credentials to use for the JCloud provider
      * based on the System properties.
@@ -108,7 +101,7 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
              *      props.setProperty("S3Key", "HXXXXXß");
              */
             return () -> new Credentials(System.getProperty("S3ID"), System.getProperty("S3Key"));
-                    
+
         } else if (Boolean.parseBoolean(System.getProperty("testRealGCS", "false"))) {
             /*
              * To use this, must config credentials using "client_email" as GCSID and "private_key" as GCSKey.
@@ -121,17 +114,24 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
             return null;
         }
     }
-    
+
     protected TieredStorageConfiguration getConfiguration(String bucket) {
-        Map<String, String> metaData = new HashMap<String, String> ();
+        return getConfiguration(bucket, null);
+    }
+
+    protected TieredStorageConfiguration getConfiguration(String bucket, Map<String, String> additionalConfig) {
+        Map<String, String> metaData = new HashMap<String, String>();
+        if (additionalConfig != null) {
+            metaData.putAll(additionalConfig);
+        }
         metaData.put(TieredStorageConfiguration.BLOB_STORE_PROVIDER_KEY, provider.getDriver());
         metaData.put(getConfigKey(TieredStorageConfiguration.METADATA_FIELD_REGION), "");
         metaData.put(getConfigKey(TieredStorageConfiguration.METADATA_FIELD_BUCKET), bucket);
         metaData.put(getConfigKey(TieredStorageConfiguration.METADATA_FIELD_ENDPOINT), "");
-        
+
         TieredStorageConfiguration config = TieredStorageConfiguration.create(metaData);
         config.setProviderCredentials(getBlobStoreCredentials());
-        
+
         return config;
     }
 
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
new file mode 100644
index 0000000..32ad498
--- /dev/null
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
+import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
+import org.jclouds.blobstore.BlobStore;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class BlobStoreManagedLedgerOffloaderStreamingTest extends BlobStoreManagedLedgerOffloaderBase {
+
+    private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderStreamingTest.class);
+    private TieredStorageConfiguration mockedConfig;
+    private static final Random random = new Random();
+
+    BlobStoreManagedLedgerOffloaderStreamingTest() throws Exception {
+        super();
+        config = getConfiguration(BUCKET);
+        JCloudBlobStoreProvider provider = getBlobStoreProvider();
+        assertNotNull(provider);
+        provider.validate(config);
+        blobStore = provider.getBlobStore(config);
+    }
+
+    private BlobStoreManagedLedgerOffloader getOffloader(Map<String, String> additionalConfig) throws IOException {
+        return getOffloader(BUCKET, additionalConfig);
+    }
+
+    private BlobStoreManagedLedgerOffloader getOffloader(BlobStore mockedBlobStore,
+                                                         Map<String, String> additionalConfig) throws IOException {
+        return getOffloader(BUCKET, mockedBlobStore, additionalConfig);
+    }
+
+    private BlobStoreManagedLedgerOffloader getOffloader(String bucket, Map<String, String> additionalConfig) throws
+            IOException {
+        mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig)));
+        Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore
+        BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader
+                .create(mockedConfig, new HashMap<String, String>(), scheduler);
+        return offloader;
+    }
+
+    private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mockedBlobStore,
+                                                         Map<String, String> additionalConfig) throws IOException {
+        mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig)));
+        Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
+        BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader
+                .create(mockedConfig, new HashMap<String, String>(), scheduler);
+        return offloader;
+    }
+
+    @Test
+    public void testHappyCase() throws Exception {
+        LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+            put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+        }});
+        ManagedLedger ml = createMockManagedLedger();
+        UUID uuid = UUID.randomUUID();
+        long beginLedger = 0;
+        long beginEntry = 0;
+        log.error("try begin offload");
+        OffloadHandle offloadHandle = offloader
+                .streamingOffload(ml, uuid, beginLedger, beginEntry, new HashMap<>()).get();
+        //Segment should closed because size in bytes full
+        for (int i = 0; i < 10; i++) {
+            final byte[] data = new byte[100];
+            random.nextBytes(data);
+            final OffloadHandle.OfferEntryResult offerEntryResult = offloadHandle
+                    .offerEntry(EntryImpl.create(0, i, data));
+            log.info("offer result: {}", offerEntryResult);
+        }
+        final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
+        log.info("Offload reasult: {}", offloadResult);
+    }
+
+    @Test
+    public void testReadAndWrite() throws Exception {
+        LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+            put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+        }});
+        ManagedLedger ml = createMockManagedLedger();
+        UUID uuid = UUID.randomUUID();
+        long beginLedger = 0;
+        long beginEntry = 0;
+
+        Map<String, String> driverMeta = new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.METADATA_FIELD_BUCKET, BUCKET);
+        }};
+        OffloadHandle offloadHandle = offloader
+                .streamingOffload(ml, uuid, beginLedger, beginEntry, driverMeta).get();
+
+        //Segment should closed because size in bytes full
+        final LinkedList<Entry> entries = new LinkedList<>();
+        for (int i = 0; i < 10; i++) {
+            final byte[] data = new byte[100];
+            random.nextBytes(data);
+            final EntryImpl entry = EntryImpl.create(0, i, data);
+            offloadHandle.offerEntry(entry);
+            entries.add(entry);
+        }
+
+        final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
+        assertEquals(offloadResult.endLedger, 0);
+        assertEquals(offloadResult.endEntry, 9);
+        final OffloadContext.Builder contextBuilder = OffloadContext.newBuilder();
+        contextBuilder.addOffloadSegment(
+                MLDataFormats.OffloadSegment.newBuilder()
+                        .setUidLsb(uuid.getLeastSignificantBits())
+                        .setUidMsb(uuid.getMostSignificantBits())
+                        .setComplete(true).setEndEntryId(9).build());
+
+        final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get();
+        final LedgerEntries ledgerEntries = readHandle.readAsync(0, 9).get();
+
+        for (LedgerEntry ledgerEntry : ledgerEntries) {
+            final EntryImpl storedEntry = (EntryImpl) entries.get((int) ledgerEntry.getEntryId());
+            final byte[] storedData = storedEntry.getData();
+            final byte[] entryBytes = ledgerEntry.getEntryBytes();
+            assertEquals(storedData, entryBytes);
+        }
+    }
+
+    @Test
+    public void testReadAndWriteAcrossLedger() throws Exception {
+        LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "2000");
+            put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+        }});
+        ManagedLedger ml = createMockManagedLedger();
+        UUID uuid = UUID.randomUUID();
+        long beginLedger = 0;
+        long beginEntry = 0;
+
+        Map<String, String> driverMeta = new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.METADATA_FIELD_BUCKET, BUCKET);
+        }};
+        OffloadHandle offloadHandle = offloader
+                .streamingOffload(ml, uuid, beginLedger, beginEntry, driverMeta).get();
+
+        //Segment should closed because size in bytes full
+        final LinkedList<Entry> entries = new LinkedList<>();
+        final LinkedList<Entry> ledger2Entries = new LinkedList<>();
+        for (int i = 0; i < 10; i++) {
+            final byte[] data = new byte[100];
+            random.nextBytes(data);
+            final EntryImpl entry = EntryImpl.create(0, i, data);
+            offloadHandle.offerEntry(entry);
+            entries.add(entry);
+        }
+        for (int i = 0; i < 10; i++) {
+            final byte[] data = new byte[100];
+            random.nextBytes(data);
+            final EntryImpl entry = EntryImpl.create(1, i, data);
+            offloadHandle.offerEntry(entry);
+            ledger2Entries.add(entry);
+        }
+
+        final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
+        assertEquals(offloadResult.endLedger, 1);
+        assertEquals(offloadResult.endEntry, 9);
+        final OffloadContext.Builder contextBuilder = OffloadContext.newBuilder();
+        contextBuilder.addOffloadSegment(
+                MLDataFormats.OffloadSegment.newBuilder()
+                        .setUidLsb(uuid.getLeastSignificantBits())
+                        .setUidMsb(uuid.getMostSignificantBits())
+                        .setComplete(true).setEndEntryId(9).build());
+
+        final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get();
+        final LedgerEntries ledgerEntries = readHandle.readAsync(0, 9).get();
+
+        for (LedgerEntry ledgerEntry : ledgerEntries) {
+            final EntryImpl storedEntry = (EntryImpl) entries.get((int) ledgerEntry.getEntryId());
+            final byte[] storedData = storedEntry.getData();
+            final byte[] entryBytes = ledgerEntry.getEntryBytes();
+            assertEquals(storedData, entryBytes);
+        }
+
+        final ReadHandle readHandle2 = offloader.readOffloaded(1, contextBuilder.build(), driverMeta).get();
+        final LedgerEntries ledgerEntries2 = readHandle2.readAsync(0, 9).get();
+
+        for (LedgerEntry ledgerEntry : ledgerEntries2) {
+            final EntryImpl storedEntry = (EntryImpl) ledger2Entries.get((int) ledgerEntry.getEntryId());
+            final byte[] storedData = storedEntry.getData();
+            final byte[] entryBytes = ledgerEntry.getEntryBytes();
+            assertEquals(storedData, entryBytes);
+        }
+    }
+
+    @Test
+    public void testReadAndWriteAcrossSegment() throws Exception {
+        LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+            put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+        }});
+        LedgerOffloader offloader2 = getOffloader(new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+            put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+        }});
+        ManagedLedger ml = createMockManagedLedger();
+        UUID uuid = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        long beginLedger = 0;
+        long beginEntry = 0;
+
+        Map<String, String> driverMeta = new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.METADATA_FIELD_BUCKET, BUCKET);
+        }};
+        OffloadHandle offloadHandle = offloader
+                .streamingOffload(ml, uuid, beginLedger, beginEntry, driverMeta).get();
+
+        //Segment should closed because size in bytes full
+        final LinkedList<Entry> entries = new LinkedList<>();
+        for (int i = 0; i < 10; i++) {
+            final byte[] data = new byte[100];
+            random.nextBytes(data);
+            final EntryImpl entry = EntryImpl.create(0, i, data);
+            offloadHandle.offerEntry(entry);
+            entries.add(entry);
+        }
+
+        final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
+        assertEquals(offloadResult.endLedger, 0);
+        assertEquals(offloadResult.endEntry, 9);
+
+        //Segment should closed because size in bytes full
+        OffloadHandle offloadHandle2 = offloader2
+                .streamingOffload(ml, uuid2, beginLedger, 10, driverMeta).get();
+        for (int i = 0; i < 10; i++) {
+            final byte[] data = new byte[100];
+            random.nextBytes(data);
+            final EntryImpl entry = EntryImpl.create(0, i + 10, data);
+            offloadHandle2.offerEntry(entry);
+            entries.add(entry);
+        }
+        final LedgerOffloader.OffloadResult offloadResult2 = offloadHandle2.getOffloadResultAsync().get();
+        assertEquals(offloadResult2.endLedger, 0);
+        assertEquals(offloadResult2.endEntry, 19);
+
+        final OffloadContext.Builder contextBuilder = OffloadContext.newBuilder();
+        contextBuilder.addOffloadSegment(
+                MLDataFormats.OffloadSegment.newBuilder()
+                        .setUidLsb(uuid.getLeastSignificantBits())
+                        .setUidMsb(uuid.getMostSignificantBits())
+                        .setComplete(true).setEndEntryId(9).build()).addOffloadSegment(
+                MLDataFormats.OffloadSegment.newBuilder()
+                        .setUidLsb(uuid2.getLeastSignificantBits())
+                        .setUidMsb(uuid2.getMostSignificantBits())
+                        .setComplete(true).setEndEntryId(19).build()
+        );
+
+        final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get();
+        final LedgerEntries ledgerEntries = readHandle.readAsync(0, 19).get();
+
+        for (LedgerEntry ledgerEntry : ledgerEntries) {
+            final EntryImpl storedEntry = (EntryImpl) entries.get((int) ledgerEntry.getEntryId());
+            final byte[] storedData = storedEntry.getData();
+            final byte[] entryBytes = ledgerEntry.getEntryBytes();
+            assertEquals(storedData, entryBytes);
+        }
+    }
+
+    @Test
+    public void testRandomRead() throws Exception {
+        LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+            put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+        }});
+        LedgerOffloader offloader2 = getOffloader(new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+            put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+        }});
+        ManagedLedger ml = createMockManagedLedger();
+        UUID uuid = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+        long beginLedger = 0;
+        long beginEntry = 0;
+
+        Map<String, String> driverMeta = new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.METADATA_FIELD_BUCKET, BUCKET);
+        }};
+        OffloadHandle offloadHandle = offloader
+                .streamingOffload(ml, uuid, beginLedger, beginEntry, driverMeta).get();
+
+        //Segment should closed because size in bytes full
+        final LinkedList<Entry> entries = new LinkedList<>();
+        for (int i = 0; i < 10; i++) {
+            final byte[] data = new byte[100];
+            random.nextBytes(data);
+            final EntryImpl entry = EntryImpl.create(0, i, data);
+            offloadHandle.offerEntry(entry);
+            entries.add(entry);
+        }
+
+        final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
+        assertEquals(offloadResult.endLedger, 0);
+        assertEquals(offloadResult.endEntry, 9);
+
+        //Segment should closed because size in bytes full
+        OffloadHandle offloadHandle2 = offloader2
+                .streamingOffload(ml, uuid2, beginLedger, 10, driverMeta).get();
+        for (int i = 0; i < 10; i++) {
+            final byte[] data = new byte[100];
+            random.nextBytes(data);
+            final EntryImpl entry = EntryImpl.create(0, i + 10, data);
+            offloadHandle2.offerEntry(entry);
+            entries.add(entry);
+        }
+        final LedgerOffloader.OffloadResult offloadResult2 = offloadHandle2.getOffloadResultAsync().get();
+        assertEquals(offloadResult2.endLedger, 0);
+        assertEquals(offloadResult2.endEntry, 19);
+
+        final OffloadContext.Builder contextBuilder = OffloadContext.newBuilder();
+        contextBuilder.addOffloadSegment(
+                MLDataFormats.OffloadSegment.newBuilder()
+                        .setUidLsb(uuid.getLeastSignificantBits())
+                        .setUidMsb(uuid.getMostSignificantBits())
+                        .setComplete(true).setEndEntryId(9).build()).addOffloadSegment(
+                MLDataFormats.OffloadSegment.newBuilder()
+                        .setUidLsb(uuid2.getLeastSignificantBits())
+                        .setUidMsb(uuid2.getMostSignificantBits())
+                        .setComplete(true).setEndEntryId(19).build()
+        );
+
+        final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get();
+
+        for (int i = 0; i <= 19; i++) {
+            Random seed = new Random(0);
+            int begin = seed.nextInt(20);
+            int end = seed.nextInt(20);
+            if (begin >= end) {
+                int temp = begin;
+                begin = end;
+                end = temp;
+            }
+            final LedgerEntries ledgerEntries = readHandle.readAsync(begin, end).get();
+            for (LedgerEntry ledgerEntry : ledgerEntries) {
+                final EntryImpl storedEntry = (EntryImpl) entries.get((int) ledgerEntry.getEntryId());
+                final byte[] storedData = storedEntry.getData();
+                final byte[] entryBytes = ledgerEntry.getEntryBytes();
+                assertEquals(storedData, entryBytes);
+            }
+        }
+    }
+
+    @Test
+    public void testInvalidEntryIds() throws Exception {
+        LedgerOffloader offloader = getOffloader(new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000");
+            put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880");
+            put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC, "600");
+        }});
+        ManagedLedger ml = createMockManagedLedger();
+        UUID uuid = UUID.randomUUID();
+        long beginLedger = 0;
+        long beginEntry = 0;
+
+        Map<String, String> driverMeta = new HashMap<String, String>() {{
+            put(TieredStorageConfiguration.METADATA_FIELD_BUCKET, BUCKET);
+        }};
+        OffloadHandle offloadHandle = offloader
+                .streamingOffload(ml, uuid, beginLedger, beginEntry, driverMeta).get();
+
+        //Segment should closed because size in bytes full
+        final LinkedList<Entry> entries = new LinkedList<>();
+        for (int i = 0; i < 10; i++) {
+            final byte[] data = new byte[100];
+            random.nextBytes(data);
+            final EntryImpl entry = EntryImpl.create(0, i, data);
+            offloadHandle.offerEntry(entry);
+            entries.add(entry);
+        }
+
+        final LedgerOffloader.OffloadResult offloadResult = offloadHandle.getOffloadResultAsync().get();
+        assertEquals(offloadResult.endLedger, 0);
+        assertEquals(offloadResult.endEntry, 9);
+        final OffloadContext.Builder contextBuilder = OffloadContext.newBuilder();
+        contextBuilder.addOffloadSegment(
+                MLDataFormats.OffloadSegment.newBuilder()
+                        .setUidLsb(uuid.getLeastSignificantBits())
+                        .setUidMsb(uuid.getMostSignificantBits())
+                        .setComplete(true).setEndEntryId(9).build());
+
+        final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get();
+        try {
+            readHandle.read(-1, -1);
+            Assert.fail("Shouldn't be able to read anything");
+        } catch (Exception e) {
+        }
+
+        try {
+            readHandle.read(0, 20);
+            Assert.fail("Shouldn't be able to read anything");
+        } catch (Exception e) {
+        }
+    }
+}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStreamTest.java
new file mode 100644
index 0000000..d0b433b
--- /dev/null
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStreamTest.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.OffloadSegmentInfoImpl;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.testng.Assert;
+
+public class BufferedOffloadStreamTest {
+    final Random random = new Random();
+
+    public void testWithPadding(int paddingLen) throws Exception {
+        int blockSize = StreamingDataBlockHeaderImpl.getDataStartOffset();
+        List<Entry> entryBuffer = new LinkedList<>();
+        final UUID uuid = UUID.randomUUID();
+        OffloadSegmentInfoImpl segmentInfo = new OffloadSegmentInfoImpl(uuid, 0, 0, "",
+                new HashMap<>());
+        final int entryCount = 10;
+        List<Entry> entries = new ArrayList<>();
+        for (int i = 0; i < entryCount; i++) {
+            final byte[] bytes = new byte[random.nextInt(10)];
+            final EntryImpl entry = EntryImpl.create(0, i, bytes);
+            entries.add(entry);
+            entry.retain();
+            entryBuffer.add(entry);
+            blockSize += BufferedOffloadStream.ENTRY_HEADER_SIZE + entry.getLength();
+        }
+        segmentInfo.closeSegment(0, 9);
+        blockSize += paddingLen;
+
+        final BufferedOffloadStream inputStream = new BufferedOffloadStream(blockSize, entryBuffer,
+                segmentInfo.beginLedgerId,
+                segmentInfo.beginEntryId);
+        assertEquals(inputStream.getLedgerId(), 0);
+        assertEquals(inputStream.getBeginEntryId(), 0);
+        assertEquals(inputStream.getBlockSize(), blockSize);
+
+        byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+        ByteStreams.readFully(inputStream, headerB);
+        StreamingDataBlockHeaderImpl headerRead = StreamingDataBlockHeaderImpl
+                .fromStream(new ByteArrayInputStream(headerB));
+        Assert.assertEquals(headerRead.getBlockLength(), blockSize);
+        Assert.assertEquals(headerRead.getFirstEntryId(), 0);
+
+        int left = blockSize - DataBlockHeaderImpl.getDataStartOffset();
+        for (int i = 0; i < entryCount; i++) {
+            byte lengthBuf[] = new byte[4];
+            byte entryIdBuf[] = new byte[8];
+            byte content[] = new byte[entries.get(i).getLength()];
+
+            left -= lengthBuf.length + entryIdBuf.length + content.length;
+            inputStream.read(lengthBuf);
+            inputStream.read(entryIdBuf);
+            inputStream.read(content);
+            Assert.assertEquals(entries.get(i).getLength(), Ints.fromByteArray(lengthBuf));
+            Assert.assertEquals(i, Longs.fromByteArray(entryIdBuf));
+            assertArrayEquals(entries.get(i).getData(), content);
+        }
+        assertEquals(left, paddingLen);
+        byte padding[] = new byte[left];
+        inputStream.read(padding);
+
+        ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
+        for (int i = 0; i < paddingBuf.capacity() / 4; i++) {
+            Assert.assertEquals(Integer.toHexString(paddingBuf.readInt()),
+                    Integer.toHexString(0xFEDCDEAD));
+        }
+
+        // 4. reach end.
+        Assert.assertEquals(inputStream.read(), -1);
+        Assert.assertEquals(inputStream.read(), -1);
+        inputStream.close();
+
+    }
+
+    @Test
+    public void testHavePadding() throws Exception {
+        testWithPadding(10);
+    }
+
+    @Test
+    public void testNoPadding() throws Exception {
+        testWithPadding(0);
+    }
+
+    @Ignore("Disable because let offloader to ensure there is no another ledger id")
+    public void shouldEndWhenSegmentChanged() throws IOException {
+        int blockSize = StreamingDataBlockHeaderImpl.getDataStartOffset();
+        int paddingLen = 10;
+        List<Entry> entryBuffer = new LinkedList<>();
+        final UUID uuid = UUID.randomUUID();
+        OffloadSegmentInfoImpl segmentInfo = new OffloadSegmentInfoImpl(uuid, 0, 0, "",
+                new HashMap<>());
+        AtomicLong bufferLength = new AtomicLong();
+        final int entryCount = 10;
+        List<Entry> entries = new ArrayList<>();
+        for (int i = 0; i < entryCount; i++) {
+            final byte[] bytes = new byte[random.nextInt(10)];
+            final EntryImpl entry = EntryImpl.create(0, i, bytes);
+            entries.add(entry);
+            entry.retain();
+            entryBuffer.add(entry);
+            blockSize += BufferedOffloadStream.ENTRY_HEADER_SIZE + entry.getLength();
+        }
+        //create new ledger
+        {
+            final byte[] bytes = new byte[random.nextInt(10)];
+            final EntryImpl entry = EntryImpl.create(1, 0, bytes);
+            entries.add(entry);
+            entry.retain();
+            entryBuffer.add(entry);
+        }
+        segmentInfo.closeSegment(1, 0);
+        blockSize += paddingLen;
+
+        final BufferedOffloadStream inputStream = new BufferedOffloadStream(blockSize, entryBuffer,
+                segmentInfo.beginLedgerId,
+                segmentInfo.beginEntryId);
+        assertEquals(inputStream.getLedgerId(), 0);
+        assertEquals(inputStream.getBeginEntryId(), 0);
+        assertEquals(inputStream.getBlockSize(), blockSize);
+
+        byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+        ByteStreams.readFully(inputStream, headerB);
+        StreamingDataBlockHeaderImpl headerRead = StreamingDataBlockHeaderImpl
+                .fromStream(new ByteArrayInputStream(headerB));
+        Assert.assertEquals(headerRead.getBlockLength(), blockSize);
+        Assert.assertEquals(headerRead.getFirstEntryId(), 0);
+
+        int left = blockSize - DataBlockHeaderImpl.getDataStartOffset();
+        for (int i = 0; i < entryCount; i++) {
+            byte lengthBuf[] = new byte[4];
+            byte entryIdBuf[] = new byte[8];
+            byte content[] = new byte[entries.get(i).getLength()];
+
+            left -= lengthBuf.length + entryIdBuf.length + content.length;
+            inputStream.read(lengthBuf);
+            inputStream.read(entryIdBuf);
+            inputStream.read(content);
+            Assert.assertEquals(entries.get(i).getLength(), Ints.fromByteArray(lengthBuf));
+            Assert.assertEquals(i, Longs.fromByteArray(entryIdBuf));
+            assertArrayEquals(entries.get(i).getData(), content);
+        }
+        assertEquals(left, paddingLen);
+        byte padding[] = new byte[left];
+        inputStream.read(padding);
+
+        ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
+        for (int i = 0; i < paddingBuf.capacity() / 4; i++) {
+            Assert.assertEquals(Integer.toHexString(paddingBuf.readInt()),
+                    Integer.toHexString(0xFEDCDEAD));
+        }
+
+        // 4. reach end.
+        Assert.assertEquals(inputStream.read(), -1);
+        Assert.assertEquals(inputStream.read(), -1);
+        inputStream.close();
+
+        assertEquals(entryBuffer.size(), 1);
+    }
+}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderV2Test.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderV2Test.java
new file mode 100644
index 0000000..c881cea
--- /dev/null
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderV2Test.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class DataBlockHeaderV2Test {
+
+    @Test
+    public void dataBlockHeaderImplTest() throws Exception {
+        int blockLength = 1024 * 1024;
+        long firstEntryId = 3333L;
+        long ledgerId = 3L;
+
+        StreamingDataBlockHeaderImpl dataBlockHeader = StreamingDataBlockHeaderImpl.of(blockLength,
+                ledgerId, firstEntryId);
+
+        // verify get methods
+        assertEquals(StreamingDataBlockHeaderImpl.getBlockMagicWord(), StreamingDataBlockHeaderImpl.MAGIC_WORD);
+        assertEquals(dataBlockHeader.getBlockLength(), blockLength);
+        assertEquals(dataBlockHeader.getFirstEntryId(), firstEntryId);
+        assertEquals(dataBlockHeader.getLedgerId(), ledgerId);
+
+        // verify toStream and fromStream
+        InputStream stream = dataBlockHeader.toStream();
+        stream.mark(0);
+        StreamingDataBlockHeaderImpl rebuild = StreamingDataBlockHeaderImpl.fromStream(stream);
+        assertEquals(rebuild.getBlockLength(), blockLength);
+        assertEquals(rebuild.getFirstEntryId(), firstEntryId);
+        assertEquals(rebuild.getLedgerId(), ledgerId);
+        // verify InputStream reach end
+        assertEquals(stream.read(), -1);
+
+        stream.reset();
+        byte streamContent[] = new byte[StreamingDataBlockHeaderImpl.getDataStartOffset()];
+
+        // stream with all 0, simulate junk data, should throw exception for header magic not match.
+        try (InputStream stream2 = new ByteArrayInputStream(streamContent, 0,
+                StreamingDataBlockHeaderImpl.getDataStartOffset())) {
+            DataBlockHeader rebuild2 = StreamingDataBlockHeaderImpl.fromStream(stream2);
+            fail("Should throw IOException");
+        } catch (Exception e) {
+            assertTrue(e instanceof IOException);
+            assertTrue(e.getMessage().contains("Data block header magic word not match"));
+        }
+
+        // simulate read header too small, throw EOFException.
+        stream.read(streamContent);
+        try (InputStream stream3 =
+                     new ByteArrayInputStream(streamContent, 0,
+                             StreamingDataBlockHeaderImpl.getDataStartOffset() - 1)) {
+            DataBlockHeader rebuild3 = StreamingDataBlockHeaderImpl.fromStream(stream3);
+            fail("Should throw EOFException");
+        } catch (EOFException e) {
+            // expected
+        }
+
+        stream.close();
+    }
+
+}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
new file mode 100644
index 0000000..fc43376
--- /dev/null
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.base.Predicate;
+import io.netty.buffer.ByteBuf;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+
+@Slf4j
+public class MockManagedLedger implements ManagedLedger {
+    @Override
+    public String getName() {
+        return null;
+    }
+
+    @Override
+    public Position addEntry(byte[] data) throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public Position addEntry(byte[] data, int numberOfMessages) throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public void asyncAddEntry(byte[] data, AsyncCallbacks.AddEntryCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException,
+            ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public void asyncAddEntry(byte[] data, int offset, int length, AsyncCallbacks.AddEntryCallback callback,
+                              Object ctx) {
+
+    }
+
+    @Override
+    public void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length,
+                              AsyncCallbacks.AddEntryCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public void asyncAddEntry(ByteBuf buffer, AsyncCallbacks.AddEntryCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AsyncCallbacks.AddEntryCallback callback,
+                              Object ctx) {
+
+    }
+
+    @Override
+    public ManagedCursor openCursor(String name) throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition) throws
+            InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition,
+                                    Map<String, Long> properties) throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws
+            ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName,
+                                             CommandSubscribe.InitialPosition initialPosition) throws
+            ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public void asyncDeleteCursor(String name, AsyncCallbacks.DeleteCursorCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public void deleteCursor(String name) throws InterruptedException, ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncOpenCursor(String name, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition,
+                                AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition,
+                                Map<String, Long> properties, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public Iterable<ManagedCursor> getCursors() {
+        return null;
+    }
+
+    @Override
+    public Iterable<ManagedCursor> getActiveCursors() {
+        return null;
+    }
+
+    @Override
+    public long getNumberOfEntries() {
+        return 0;
+    }
+
+    @Override
+    public long getNumberOfActiveEntries() {
+        return 0;
+    }
+
+    @Override
+    public long getTotalSize() {
+        return 0;
+    }
+
+    @Override
+    public long getEstimatedBacklogSize() {
+        return 0;
+    }
+
+    @Override
+    public long getOffloadedSize() {
+        return 0;
+    }
+
+    @Override
+    public void asyncTerminate(AsyncCallbacks.TerminateCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public Position terminate() throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public void close() throws InterruptedException, ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public ManagedLedgerMXBean getStats() {
+        return null;
+    }
+
+    @Override
+    public void delete() throws InterruptedException, ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncDelete(AsyncCallbacks.DeleteLedgerCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public Position offloadPrefix(Position pos) throws InterruptedException, ManagedLedgerException {
+        return null;
+    }
+
+    @Override
+    public void asyncOffloadPrefix(Position pos, AsyncCallbacks.OffloadCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public ManagedCursor getSlowestConsumer() {
+        return null;
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return false;
+    }
+
+    @Override
+    public ManagedLedgerConfig getConfig() {
+        return null;
+    }
+
+    @Override
+    public void setConfig(ManagedLedgerConfig config) {
+
+    }
+
+    @Override
+    public Position getLastConfirmedEntry() {
+        return null;
+    }
+
+    @Override
+    public void readyToCreateNewLedger() {
+
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return null;
+    }
+
+    @Override
+    public void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncSetProperty(String key, String value, AsyncCallbacks.UpdatePropertiesCallback callback,
+                                 Object ctx) {
+
+    }
+
+    @Override
+    public void deleteProperty(String key) throws InterruptedException, ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncDeleteProperty(String key, AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public void setProperties(Map<String, String> properties) throws InterruptedException, ManagedLedgerException {
+
+    }
+
+    @Override
+    public void asyncSetProperties(Map<String, String> properties,
+                                   AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx) {
+
+    }
+
+    @Override
+    public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
+
+    }
+
+    @Override
+    public void rollCurrentLedgerIfFull() {
+
+    }
+
+    @Override
+    public CompletableFuture<Position> asyncFindPosition(Predicate<Entry> predicate) {
+        return null;
+    }
+
+    @Override
+    public ManagedLedgerInterceptor getManagedLedgerInterceptor() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
+        final LedgerInfo build = LedgerInfo.newBuilder().setLedgerId(ledgerId).setSize(100).setEntries(20).build();
+        return CompletableFuture.completedFuture(build);
+    }
+}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
index 4d6f3e7..a5dd8a3 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexTest.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Charsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.netty.buffer.ByteBuf;
@@ -39,6 +38,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.testng.annotations.Test;
@@ -82,7 +82,7 @@ public class OffloadIndexTest {
 //        }
 //    }
 
-    private LedgerMetadata createLedgerMetadata(long id) throws Exception {
+    public static LedgerMetadata createLedgerMetadata(long id) throws Exception {
 
         Map<String, byte[]> metadataCustom = Maps.newHashMap();
         metadataCustom.put("key1", "value1".getBytes(UTF_8));
@@ -92,7 +92,7 @@ public class OffloadIndexTest {
         bookies.add(0, new BookieSocketAddress("127.0.0.1:3181").toBookieId());
         bookies.add(1, new BookieSocketAddress("127.0.0.2:3181").toBookieId());
         bookies.add(2, new BookieSocketAddress("127.0.0.3:3181").toBookieId());
-        
+
         return LedgerMetadataBuilder.create().withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
                 .withDigestType(DigestType.CRC32C).withPassword("password".getBytes(UTF_8))
                 .withCustomMetadata(metadataCustom).withClosedState().withLastEntryId(5000).withLength(100)
@@ -100,6 +100,15 @@ public class OffloadIndexTest {
 
     }
 
+    public static LedgerInfo createLedgerInfo(long id) throws Exception {
+
+        Map<String, byte[]> metadataCustom = Maps.newHashMap();
+        metadataCustom.put("key1", "value1".getBytes(UTF_8));
+        metadataCustom.put("key7", "value7".getBytes(UTF_8));
+
+        return LedgerInfo.newBuilder().setLedgerId(id).setEntries(5001).setSize(10000).build();
+    }
+
     // prepare metadata, then use builder to build a OffloadIndexBlockImpl
     // verify get methods, readout and fromStream methods.
     @Test
@@ -122,7 +131,7 @@ public class OffloadIndexTest {
         // verify getIndexEntryForEntry
         OffloadIndexEntry entry1 = indexBlock.getIndexEntryForEntry(0);
         assertEquals(entry1.getEntryId(), 0);
-        assertEquals(entry1.getPartId(),2);
+        assertEquals(entry1.getPartId(), 2);
         assertEquals(entry1.getOffset(), 0);
 
         OffloadIndexEntry entry11 = indexBlock.getIndexEntryForEntry(500);
@@ -185,7 +194,7 @@ public class OffloadIndexTest {
         OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
                                                         wrapper.readLong(), dataHeaderLength);
 
-        assertEquals(e1.getEntryId(),entry1.getEntryId());
+        assertEquals(e1.getEntryId(), entry1.getEntryId());
         assertEquals(e1.getPartId(), entry1.getPartId());
         assertEquals(e1.getOffset(), entry1.getOffset());
         assertEquals(e1.getDataOffset(), entry1.getDataOffset());
@@ -203,7 +212,7 @@ public class OffloadIndexTest {
         InputStream out2 = indexBlock.toStream();
         int streamLength = out2.available();
         out2.mark(0);
-        OffloadIndexBlock indexBlock2 = blockBuilder.fromStream(out2);
+        OffloadIndexBlock indexBlock2 = (OffloadIndexBlock) blockBuilder.fromStream(out2);
         // 1. verify metadata that got from inputstream success.
         LedgerMetadata metadata2 = indexBlock2.getLedgerMetadata();
         log.debug("built metadata: {}", metadata2.toString());
@@ -221,7 +230,7 @@ public class OffloadIndexTest {
         byte streamContent[] = new byte[streamLength];
         // stream with all 0, simulate junk data, should throw exception for header magic not match.
         try(InputStream stream3 = new ByteArrayInputStream(streamContent, 0, streamLength)) {
-            OffloadIndexBlock indexBlock3 = blockBuilder.fromStream(stream3);
+            OffloadIndexBlock indexBlock3 = (OffloadIndexBlock) blockBuilder.fromStream(stream3);
             fail("Should throw IOException");
         } catch (Exception e) {
             assertTrue(e instanceof IOException);
@@ -232,7 +241,7 @@ public class OffloadIndexTest {
         out2.read(streamContent);
         try(InputStream stream4 =
                 new ByteArrayInputStream(streamContent, 0, streamLength - 1)) {
-            OffloadIndexBlock indexBlock4 = blockBuilder.fromStream(stream4);
+            OffloadIndexBlock indexBlock4 = (OffloadIndexBlock) blockBuilder.fromStream(stream4);
             fail("Should throw EOFException");
         } catch (Exception e) {
             assertTrue(e instanceof java.io.EOFException);
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java
new file mode 100644
index 0000000..dd3df41
--- /dev/null
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexV2Test.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
+import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffloadIndexBlockV2Impl.CompatibleMetadata;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class OffloadIndexV2Test {
+
+    // prepare metadata, then use builder to build a StreamingOffloadIndexBlockImpl
+    // verify get methods, readout and fromStream methods.
+    @Test
+    public void streamingOffloadIndexBlockImplTest() throws Exception {
+        OffloadIndexBlockV2Builder blockBuilder = OffloadIndexBlockV2Builder.create();
+        final long ledgerId = 1; // use dummy ledgerId, from BK 4.12 the ledger is is required
+        LedgerInfo metadata = OffloadIndexTest.createLedgerInfo(ledgerId);
+        log.debug("created metadata: {}", metadata.toString());
+
+        blockBuilder.addLedgerMeta(ledgerId, metadata).withDataObjectLength(1).withDataBlockHeaderLength(23455);
+
+        blockBuilder.addBlock(ledgerId, 0, 2, 64 * 1024 * 1024);
+        blockBuilder.addBlock(ledgerId, 1000, 3, 64 * 1024 * 1024);
+        blockBuilder.addBlock(ledgerId, 2000, 4, 64 * 1024 * 1024);
+        OffloadIndexBlockV2 indexBlock = blockBuilder.buildV2();
+
+        // verify getEntryCount and getLedgerMetadata
+        assertEquals(indexBlock.getEntryCount(), 3);
+        assertEquals(indexBlock.getLedgerMetadata(ledgerId), new CompatibleMetadata(metadata));
+
+        // verify getIndexEntryForEntry
+        OffloadIndexEntry entry1 = indexBlock.getIndexEntryForEntry(ledgerId, 0);
+        assertEquals(entry1.getEntryId(), 0);
+        assertEquals(entry1.getPartId(), 2);
+        assertEquals(entry1.getOffset(), 0);
+
+        OffloadIndexEntry entry11 = indexBlock.getIndexEntryForEntry(ledgerId, 500);
+        assertEquals(entry11, entry1);
+
+        OffloadIndexEntry entry2 = indexBlock.getIndexEntryForEntry(ledgerId, 1000);
+        assertEquals(entry2.getEntryId(), 1000);
+        assertEquals(entry2.getPartId(), 3);
+        assertEquals(entry2.getOffset(), 64 * 1024 * 1024);
+
+        OffloadIndexEntry entry22 = indexBlock.getIndexEntryForEntry(ledgerId, 1300);
+        assertEquals(entry22, entry2);
+
+        OffloadIndexEntry entry3 = indexBlock.getIndexEntryForEntry(ledgerId, 2000);
+
+        assertEquals(entry3.getEntryId(), 2000);
+        assertEquals(entry3.getPartId(), 4);
+        assertEquals(entry3.getOffset(), 2 * 64 * 1024 * 1024);
+
+        OffloadIndexEntry entry33 = indexBlock.getIndexEntryForEntry(ledgerId, 3000);
+        assertEquals(entry33, entry3);
+
+        try {
+            OffloadIndexEntry entry4 = indexBlock.getIndexEntryForEntry(ledgerId, 6000);
+            fail("Should throw IndexOutOfBoundsException.");
+        } catch (Exception e) {
+            assertTrue(e instanceof IndexOutOfBoundsException);
+            assertEquals(e.getMessage(), "Entry index: 6000 beyond lastEntryId: 5000");
+        }
+
+        // verify toStream
+        InputStream out = indexBlock.toStream();
+        byte b[] = new byte[1024];
+        int readoutLen = out.read(b);
+        out.close();
+        ByteBuf wrapper = Unpooled.wrappedBuffer(b);
+        int magic = wrapper.readInt();
+        int indexBlockLength = wrapper.readInt();
+        long dataObjectLength = wrapper.readLong();
+        long dataHeaderLength = wrapper.readLong();
+        assertEquals(ledgerId, wrapper.readLong());
+        int indexEntryCount = wrapper.readInt();
+        int segmentMetadataLength = wrapper.readInt();
+
+        // verify counter
+        assertEquals(magic, OffloadIndexBlockV2Impl.getIndexMagicWord());
+        assertEquals(indexBlockLength, readoutLen);
+        assertEquals(indexEntryCount, 3);
+        assertEquals(dataObjectLength, 1);
+        assertEquals(dataHeaderLength, 23455);
+
+        wrapper.readBytes(segmentMetadataLength);
+        log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}",
+                magic, indexBlockLength, segmentMetadataLength, indexEntryCount);
+
+        // verify entry
+        OffloadIndexEntry e1 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+                wrapper.readLong(), dataHeaderLength);
+        OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+                wrapper.readLong(), dataHeaderLength);
+        OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+                wrapper.readLong(), dataHeaderLength);
+
+        assertEquals(e1.getEntryId(), entry1.getEntryId());
+        assertEquals(e1.getPartId(), entry1.getPartId());
+        assertEquals(e1.getOffset(), entry1.getOffset());
+        assertEquals(e1.getDataOffset(), entry1.getDataOffset());
+        assertEquals(e2.getEntryId(), entry2.getEntryId());
+        assertEquals(e2.getPartId(), entry2.getPartId());
+        assertEquals(e2.getOffset(), entry2.getOffset());
+        assertEquals(e2.getDataOffset(), entry2.getDataOffset());
+        assertEquals(e3.getEntryId(), entry3.getEntryId());
+        assertEquals(e3.getPartId(), entry3.getPartId());
+        assertEquals(e3.getOffset(), entry3.getOffset());
+        assertEquals(e3.getDataOffset(), entry3.getDataOffset());
+        wrapper.release();
+
+        // verify build StreamingOffloadIndexBlock from InputStream
+        InputStream out2 = indexBlock.toStream();
+        int streamLength = out2.available();
+        out2.mark(0);
+        OffloadIndexBlockV2 indexBlock2 = blockBuilder.fromStream(out2);
+        // 1. verify metadata that got from inputstream success.
+        LedgerMetadata metadata2 = indexBlock2.getLedgerMetadata(ledgerId);
+        log.debug("built metadata: {}", metadata2.toString());
+        assertEquals(metadata.getLedgerId(), metadata2.getLedgerId());
+        assertEquals(metadata.getEntries() - 1, metadata2.getLastEntryId());
+        assertEquals(metadata.getSize(), metadata2.getLength());
+        // 2. verify set all the entries
+        assertEquals(indexBlock2.getEntryCount(), indexBlock.getEntryCount());
+        // 3. verify reach end
+        assertEquals(out2.read(), -1);
+
+
+        out2.reset();
+        byte streamContent[] = new byte[streamLength];
+        // stream with all 0, simulate junk data, should throw exception for header magic not match.
+        try (InputStream stream3 = new ByteArrayInputStream(streamContent, 0, streamLength)) {
+            OffloadIndexBlockV2 indexBlock3 = blockBuilder.fromStream(stream3);
+            fail("Should throw IOException");
+        } catch (Exception e) {
+            assertTrue(e instanceof IOException);
+            assertTrue(e.getMessage().contains("Invalid MagicWord"));
+        }
+
+        // simulate read header too small, throw EOFException.
+        out2.read(streamContent);
+        try (InputStream stream4 =
+                     new ByteArrayInputStream(streamContent, 0, streamLength - 1)) {
+            OffloadIndexBlockV2 indexBlock4 = blockBuilder.fromStream(stream4);
+            fail("Should throw EOFException");
+        } catch (Exception e) {
+            assertTrue(e instanceof java.io.EOFException);
+        }
+
+        out2.close();
+        indexBlock.close();
+    }
+
+    @Test
+    public void streamingMultiLedgerOffloadIndexBlockImplTest() throws Exception {
+        OffloadIndexBlockV2Builder blockBuilder = OffloadIndexBlockV2Builder.create();
+        final long ledgerId1 = 1; // use dummy ledgerId, from BK 4.12 the ledger is is required
+        final long ledgerId2 = 2;
+        LedgerInfo metadata1 = OffloadIndexTest.createLedgerInfo(ledgerId1);
+        LedgerInfo metadata2 = OffloadIndexTest.createLedgerInfo(ledgerId2);
+        log.debug("created metadata: {}", metadata1.toString());
+        log.debug("created metadata: {}", metadata2.toString());
+
+        blockBuilder.addLedgerMeta(ledgerId1, metadata1)
+                .addLedgerMeta(ledgerId2, metadata2)
+                .withDataObjectLength(1)
+                .withDataBlockHeaderLength(23455);
+
+        blockBuilder.addBlock(ledgerId1, 1000, 2, 64 * 1024 * 1024);
+        blockBuilder.addBlock(ledgerId2, 0, 3, 64 * 1024 * 1024);
+        blockBuilder.addBlock(ledgerId2, 1000, 4, 64 * 1024 * 1024);
+        OffloadIndexBlockV2 indexBlock = blockBuilder.buildV2();
+
+        // verify getEntryCount and getLedgerMetadata
+        assertEquals(indexBlock.getEntryCount(), 3);
+        assertEquals(indexBlock.getLedgerMetadata(ledgerId1),
+                new CompatibleMetadata(metadata1));
+        assertEquals(indexBlock.getLedgerMetadata(ledgerId2), new CompatibleMetadata(metadata2));
+
+        // verify getIndexEntryForEntry
+        OffloadIndexEntry entry1 = indexBlock.getIndexEntryForEntry(ledgerId1, 1000);
+        assertEquals(entry1.getEntryId(), 1000);
+        assertEquals(entry1.getPartId(), 2);
+        assertEquals(entry1.getOffset(), 0);
+
+        OffloadIndexEntry entry11 = indexBlock.getIndexEntryForEntry(ledgerId1, 1500);
+        assertEquals(entry11, entry1);
+
+        OffloadIndexEntry entry2 = indexBlock.getIndexEntryForEntry(ledgerId2, 0);
+        assertEquals(entry2.getEntryId(), 0);
+        assertEquals(entry2.getPartId(), 3);
+        assertEquals(entry2.getOffset(), 64 * 1024 * 1024);
+
+        OffloadIndexEntry entry22 = indexBlock.getIndexEntryForEntry(ledgerId2, 300);
+        assertEquals(entry22, entry2);
+
+        OffloadIndexEntry entry3 = indexBlock.getIndexEntryForEntry(ledgerId2, 1000);
+
+        assertEquals(entry3.getEntryId(), 1000);
+        assertEquals(entry3.getPartId(), 4);
+        assertEquals(entry3.getOffset(), 2 * 64 * 1024 * 1024);
+
+        OffloadIndexEntry entry33 = indexBlock.getIndexEntryForEntry(ledgerId2, 2000);
+        assertEquals(entry33, entry3);
+
+        try {
+            OffloadIndexEntry entry4 = indexBlock.getIndexEntryForEntry(ledgerId2, 6000);
+            fail("Should throw IndexOutOfBoundsException.");
+        } catch (Exception e) {
+            assertTrue(e instanceof IndexOutOfBoundsException);
+            assertEquals(e.getMessage(), "Entry index: 6000 beyond lastEntryId: 5000");
+        }
+
+        // verify toStream
+        InputStream out = indexBlock.toStream();
+        byte b[] = new byte[1024];
+        int readoutLen = out.read(b);
+        out.close();
+        ByteBuf wrapper = Unpooled.wrappedBuffer(b);
+        int magic = wrapper.readInt();
+        int indexBlockLength = wrapper.readInt();
+        long dataObjectLength = wrapper.readLong();
+        long dataHeaderLength = wrapper.readLong();
+        assertEquals(ledgerId1, wrapper.readLong());
+        int indexEntryCount = wrapper.readInt();
+        int segmentMetadataLength = wrapper.readInt();
+
+        // verify counter
+        assertEquals(magic, OffloadIndexBlockV2Impl.getIndexMagicWord());
+        assertEquals(indexBlockLength, readoutLen);
+        assertEquals(indexEntryCount, 1);
+        assertEquals(dataObjectLength, 1);
+        assertEquals(dataHeaderLength, 23455);
+
+        wrapper.readBytes(segmentMetadataLength);
+        log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}",
+                magic, indexBlockLength, segmentMetadataLength, indexEntryCount);
+
+        // verify entry
+        OffloadIndexEntry e1 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+                wrapper.readLong(), dataHeaderLength);
+
+        assertEquals(e1.getEntryId(), entry1.getEntryId());
+        assertEquals(e1.getPartId(), entry1.getPartId());
+        assertEquals(e1.getOffset(), entry1.getOffset());
+        assertEquals(e1.getDataOffset(), entry1.getDataOffset());
+
+
+        assertEquals(ledgerId2, wrapper.readLong());
+        int indexEntryCount2 = wrapper.readInt();
+        assertEquals(indexEntryCount2, 2);
+        int segmentMetadataLength2 = wrapper.readInt();
+        wrapper.readBytes(segmentMetadataLength2);
+
+        OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+                wrapper.readLong(), dataHeaderLength);
+        OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+                wrapper.readLong(), dataHeaderLength);
+
+        assertEquals(e2.getEntryId(), entry2.getEntryId());
+        assertEquals(e2.getPartId(), entry2.getPartId());
+        assertEquals(e2.getOffset(), entry2.getOffset());
+        assertEquals(e2.getDataOffset(), entry2.getDataOffset());
+        assertEquals(e3.getEntryId(), entry3.getEntryId());
+        assertEquals(e3.getPartId(), entry3.getPartId());
+        assertEquals(e3.getOffset(), entry3.getOffset());
+        assertEquals(e3.getDataOffset(), entry3.getDataOffset());
+        wrapper.release();
+
+        // verify build StreamingOffloadIndexBlock from InputStream
+        InputStream out2 = indexBlock.toStream();
+        int streamLength = out2.available();
+        out2.mark(0);
+        OffloadIndexBlockV2 indexBlock2 = blockBuilder.fromStream(out2);
+        // 1. verify metadata that got from inputstream success.
+        //TODO change to meaningful things
+//        LedgerMetadata metadata1back = indexBlock2.getLedgerMetadata(ledgerId1);
+//        log.debug("built metadata: {}", metadata1back.toString());
+//        assertEquals(metadata1back.getAckQuorumSize(), metadata1.getAckQuorumSize());
+//        assertEquals(metadata1back.getEnsembleSize(), metadata1.getEnsembleSize());
+//        assertEquals(metadata1back.getDigestType(), metadata1.getDigestType());
+//        assertEquals(metadata1back.getAllEnsembles().entrySet(), metadata1.getAllEnsembles().entrySet());
+//        LedgerMetadata metadata2back = indexBlock2.getLedgerMetadata(ledgerId2);
+//        log.debug("built metadata: {}", metadata2back.toString());
+//        assertEquals(metadata2back.getAckQuorumSize(), metadata1.getAckQuorumSize());
+//        assertEquals(metadata2back.getEnsembleSize(), metadata1.getEnsembleSize());
+//        assertEquals(metadata2back.getDigestType(), metadata1.getDigestType());
+//        assertEquals(metadata2back.getAllEnsembles().entrySet(), metadata1.getAllEnsembles().entrySet());
+        // 2. verify set all the entries
+        assertEquals(indexBlock2.getEntryCount(), indexBlock.getEntryCount());
+        // 3. verify reach end
+        assertEquals(out2.read(), -1);
+
+        out2.close();
+        indexBlock.close();
+    }
+}
diff --git a/tiered-storage/jcloud/src/test/resources/log4j2-test.yml b/tiered-storage/jcloud/src/test/resources/log4j2-test.yml
new file mode 100644
index 0000000..46311be
--- /dev/null
+++ b/tiered-storage/jcloud/src/test/resources/log4j2-test.yml
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+
+Configuration:
+  status: warn
+  name: YAMLConfigTest
+  properties:
+    property:
+      name: filename
+      value: target/test-yaml.log
+  thresholdFilter:
+    level: debug
+  appenders:
+    Console:
+      name: STDOUT
+      target: SYSTEM_OUT
+      PatternLayout:
+        Pattern: "%d{HH:mm:ss.SSS} [%t:%C@%L] %-5level %logger{36} - %msg%n"
+    File:
+      name: File
+      fileName: ${filename}
+      PatternLayout:
+        Pattern: "%d %p %C{1.} [%t] %m%n"
+      Filters:
+        ThresholdFilter:
+          level: error
+
+  Loggers:
+    logger:
+      - name: org.apache.bookkeeper.client.PulsarMockReadHandle
+        level: info
+        additivity: false
+        AppenderRef:
+          ref: STDOUT
+      - name: org.apache.logging.log4j.test2
+        level: debug
+        additivity: false
+        AppenderRef:
+          ref: File
+    Root:
+      level: debug
+      AppenderRef:
+        ref: STDOUT