You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/29 18:07:22 UTC

[incubator-pulsar] branch master updated: Store data block header length in index and block header (#1854)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9594553  Store data block header length in index and block header (#1854)
9594553 is described below

commit 9594553e86b35d3f5243cf3bd3519f0122c7a01c
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue May 29 20:07:20 2018 +0200

    Store data block header length in index and block header (#1854)
    
    In the case of the index, if we store the block header length in the
    index, we can change the length in future without having to break
    compatability.
    
    In the case of the block header itself, storing the header length in
    the block, allows reading of the data object without having to make
    assumptions about the size of the header.
    
    Master Issue: #1511
---
 .../pulsar/broker/s3offload/DataBlockHeader.java   |  4 +-
 .../pulsar/broker/s3offload/OffloadIndexBlock.java |  7 ++-
 .../broker/s3offload/OffloadIndexBlockBuilder.java |  6 +++
 .../broker/s3offload/S3ManagedLedgerOffloader.java |  3 +-
 .../impl/BlockAwareSegmentInputStreamImpl.java     |  4 ++
 .../broker/s3offload/impl/DataBlockHeaderImpl.java | 53 +++++++++++-----------
 .../impl/OffloadIndexBlockBuilderImpl.java         | 14 +++++-
 .../s3offload/impl/OffloadIndexBlockImpl.java      | 15 +++++-
 .../s3offload/impl/OffloadIndexEntryImpl.java      | 12 ++---
 .../broker/s3offload/impl/DataBlockHeaderTest.java | 13 +++---
 .../broker/s3offload/impl/OffloadIndexTest.java    | 22 ++++++---
 11 files changed, 102 insertions(+), 51 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
index cae653b..f49b6a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
@@ -36,7 +36,7 @@ public interface DataBlockHeader {
     /**
      * Get the length of the block in bytes, including the header.
      */
-    int getBlockLength();
+    long getBlockLength();
 
     /**
      * Get the message entry Id for the first message that stored in this data block.
@@ -46,7 +46,7 @@ public interface DataBlockHeader {
     /**
      * Get the size of this DataBlockHeader.
      */
-    int getHeaderSize();
+    long getHeaderLength();
 
     /**
      * Get the content of the data block header as InputStream.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
index c7e71c7..2bb59b4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
@@ -60,12 +60,17 @@ public interface OffloadIndexBlock extends Closeable {
      */
     LedgerMetadata getLedgerMetadata();
 
-    /*
+    /**
      * Get the total size of the data object.
      */
     long getDataObjectLength();
 
     /**
+     * Get the length of the header in the blocks in the data object.
+     */
+    long getDataBlockHeaderLength();
+
+    /**
      * An input stream which knows the size of the stream upfront.
      */
     public static class IndexInputStream extends FilterInputStream {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
index c60ce88..126b4b9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
@@ -58,6 +58,12 @@ public interface OffloadIndexBlockBuilder {
     OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength);
 
     /**
+     * Specify the length of the block headers in the data object.
+     * @param dataHeaderLength the length of the headers
+     */
+    OffloadIndexBlockBuilder withDataBlockHeaderLength(long dataHeaderLength);
+
+    /**
      * Finalize the immutable OffloadIndexBlock
      */
     OffloadIndexBlock build();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 5e46350..65ffd37 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -115,7 +115,8 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.chooseThread(readHandle.getId()).submit(() -> {
             OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create()
-                .withLedgerMetadata(readHandle.getLedgerMetadata());
+                .withLedgerMetadata(readHandle.getLedgerMetadata())
+                .withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
             String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid);
             String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid);
             InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey, new ObjectMetadata());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
index 03ae702..78d2380 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
@@ -205,6 +205,10 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
         return dataBlockFullOffset - DataBlockHeaderImpl.getDataStartOffset() - ENTRY_HEADER_SIZE * blockEntryCount;
     }
 
+    public static long getHeaderSize() {
+        return DataBlockHeaderImpl.getDataStartOffset();
+    }
+
     // Calculate the block size after uploaded `entryBytesAlreadyWritten` bytes
     public static int calculateBlockSize(int maxBlockSize, ReadHandle readHandle,
                                          long firstEntryToWrite, long entryBytesAlreadyWritten) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
index 6ea1f29..19a644a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.s3offload.impl;
 
+import com.google.common.io.CountingInputStream;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -39,37 +41,38 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
     // This is bigger than header size. Leaving some place for alignment and future enhancement.
     // Payload use this as the start offset.
     private static final int HEADER_MAX_SIZE = 128;
-    // The size of this header.
-    private static final int HEADER_SIZE = 4 /* magic word */
-        + 4 /* index block length */
-        + 8 /* first entry id */;
-    private static final byte[] PADDING = new byte[HEADER_MAX_SIZE - HEADER_SIZE];
-
+    private static final int HEADER_BYTES_USED = 4 /* magic */
+                                               + 8 /* header len */
+                                               + 8 /* block len */
+                                               + 8 /* first entry id */;
+    private static final byte[] PADDING = new byte[HEADER_MAX_SIZE - HEADER_BYTES_USED];
 
     public static DataBlockHeaderImpl of(int blockLength, long firstEntryId) {
-        return new DataBlockHeaderImpl(blockLength, firstEntryId);
+        return new DataBlockHeaderImpl(HEADER_MAX_SIZE, blockLength, firstEntryId);
     }
 
     // Construct DataBlockHeader from InputStream, which contains `HEADER_MAX_SIZE` bytes readable.
     public static DataBlockHeader fromStream(InputStream stream) throws IOException {
-        DataInputStream dis = new DataInputStream(stream);
+        CountingInputStream countingStream = new CountingInputStream(stream);
+        DataInputStream dis = new DataInputStream(countingStream);
         int magic = dis.readInt();
         if (magic != MAGIC_WORD) {
             throw new IOException("Data block header magic word not match. read: " + magic + " expected: " + MAGIC_WORD);
         }
 
-        int blockLen = dis.readInt();
+        long headerLen = dis.readLong();
+        long blockLen = dis.readLong();
         long firstEntryId = dis.readLong();
-
-        // padding part
-        if (PADDING.length != dis.skipBytes(PADDING.length)) {
-            throw new EOFException("Data block header magic word not match.");
+        long toSkip = headerLen - countingStream.getCount();
+        if (dis.skip(toSkip) != toSkip) {
+            throw new EOFException("Header was too small");
         }
 
-        return new DataBlockHeaderImpl(blockLen, firstEntryId);
+        return new DataBlockHeaderImpl(headerLen, blockLen, firstEntryId);
     }
 
-    private final int blockLength;
+    private final long headerLength;
+    private final long blockLength;
     private final long firstEntryId;
 
     static public int getBlockMagicWord() {
@@ -81,21 +84,22 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
     }
 
     @Override
-    public int getBlockLength() {
+    public long getBlockLength() {
         return this.blockLength;
     }
 
     @Override
-    public long getFirstEntryId() {
-        return this.firstEntryId;
+    public long getHeaderLength() {
+        return this.headerLength;
     }
 
     @Override
-    public int getHeaderSize() {
-        return HEADER_MAX_SIZE;
+    public long getFirstEntryId() {
+        return this.firstEntryId;
     }
 
-    public DataBlockHeaderImpl(int blockLength, long firstEntryId) {
+    public DataBlockHeaderImpl(long headerLength, long blockLength, long firstEntryId) {
+        this.headerLength = headerLength;
         this.blockLength = blockLength;
         this.firstEntryId = firstEntryId;
     }
@@ -107,13 +111,10 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
      */
     @Override
     public InputStream toStream() {
-        int headerSize = 4 /* magic word */
-            + 4 /* index block length */
-            + 8 /* first entry id */;
-
         ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
         out.writeInt(MAGIC_WORD)
-            .writeInt(blockLength)
+            .writeLong(headerLength)
+            .writeLong(blockLength)
             .writeLong(firstEntryId)
             .writeBytes(PADDING);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
index 3b0f899..1b03f00 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
@@ -35,6 +35,7 @@ public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
 
     private LedgerMetadata ledgerMetadata;
     private long dataObjectLength;
+    private long dataHeaderLength;
     private List<OffloadIndexEntryImpl> entries;
     private int lastBlockSize;
 
@@ -49,6 +50,12 @@ public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
     }
 
     @Override
+    public OffloadIndexBlockBuilder withDataBlockHeaderLength(long dataHeaderLength) {
+        this.dataHeaderLength = dataHeaderLength;
+        return this;
+    }
+
+    @Override
     public OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata) {
         this.ledgerMetadata = metadata;
         return this;
@@ -56,6 +63,8 @@ public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
 
     @Override
     public OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize) {
+        checkState(dataHeaderLength > 0);
+
         // we should added one by one.
         long offset;
         if (firstEntryId == 0) {
@@ -67,7 +76,7 @@ public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
         }
         lastBlockSize = blockSize;
 
-        this.entries.add(OffloadIndexEntryImpl.of(firstEntryId, partId, offset));
+        this.entries.add(OffloadIndexEntryImpl.of(firstEntryId, partId, offset, dataHeaderLength));
         return this;
     }
 
@@ -81,7 +90,8 @@ public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
         checkState(ledgerMetadata != null);
         checkState(!entries.isEmpty());
         checkState(dataObjectLength > 0);
-        return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, entries);
+        checkState(dataHeaderLength > 0);
+        return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, dataHeaderLength, entries);
     }
 
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
index db758d0..d0e7239 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
@@ -59,6 +59,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
 
     private LedgerMetadata segmentMetadata;
     private long dataObjectLength;
+    private long dataHeaderLength;
     private TreeMap<Long, OffloadIndexEntryImpl> indexEntries;
 
     private final Handle<OffloadIndexBlockImpl> recyclerHandle;
@@ -75,6 +76,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
     }
 
     public static OffloadIndexBlockImpl get(LedgerMetadata metadata, long dataObjectLength,
+                                            long dataHeaderLength,
                                             List<OffloadIndexEntryImpl> entries) {
         OffloadIndexBlockImpl block = RECYCLER.get();
         block.indexEntries = Maps.newTreeMap();
@@ -82,6 +84,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
         checkState(entries.size() == block.indexEntries.size());
         block.segmentMetadata = metadata;
         block.dataObjectLength = dataObjectLength;
+        block.dataHeaderLength = dataHeaderLength;
         return block;
     }
 
@@ -94,6 +97,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
 
     public void recycle() {
         dataObjectLength = -1;
+        dataHeaderLength = -1;
         segmentMetadata = null;
         indexEntries.clear();
         indexEntries = null;
@@ -129,6 +133,11 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
         return this.dataObjectLength;
     }
 
+    @Override
+    public long getDataBlockHeaderLength() {
+        return this.dataHeaderLength;
+    }
+
     private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
         LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
         builder.setQuorumSize(metadata.getWriteQuorumSize())
@@ -170,6 +179,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
         int indexBlockLength = 4 /* magic header */
             + 4 /* index block length */
             + 8 /* data object length */
+            + 8 /* data header length */
             + 4 /* segment metadata length */
             + 4 /* index entry count */
             + segmentMetadataLength
@@ -180,6 +190,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
         out.writeInt(INDEX_MAGIC_WORD)
             .writeInt(indexBlockLength)
             .writeLong(dataObjectLength)
+            .writeLong(dataHeaderLength)
             .writeInt(segmentMetadataLength)
             .writeInt(indexEntryCount);
 
@@ -319,6 +330,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
         }
         int indexBlockLength = dis.readInt();
         this.dataObjectLength = dis.readLong();
+        this.dataHeaderLength = dis.readLong();
         int segmentMetadataLength = dis.readInt();
         int indexEntryCount = dis.readInt();
 
@@ -332,7 +344,8 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
 
         for (int i = 0; i < indexEntryCount; i ++) {
             long entryId = dis.readLong();
-            this.indexEntries.putIfAbsent(entryId, OffloadIndexEntryImpl.of(entryId, dis.readInt(), dis.readLong()));
+            this.indexEntries.putIfAbsent(entryId, OffloadIndexEntryImpl.of(entryId, dis.readInt(),
+                                                                            dis.readLong(), dataHeaderLength));
         }
 
         return this;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
index d8d2267..b83de85 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
@@ -26,15 +26,14 @@ import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
  *
  */
 public class OffloadIndexEntryImpl implements OffloadIndexEntry {
-    public static OffloadIndexEntryImpl of(long entryId, int partId, long offset) {
-        return new OffloadIndexEntryImpl(entryId, partId, offset);
+    public static OffloadIndexEntryImpl of(long entryId, int partId, long offset, long blockHeaderSize) {
+        return new OffloadIndexEntryImpl(entryId, partId, offset, blockHeaderSize);
     }
 
     private final long entryId;
-
     private final int partId;
-
     private final long offset;
+    private final long blockHeaderSize;
 
     @Override
     public long getEntryId() {
@@ -50,13 +49,14 @@ public class OffloadIndexEntryImpl implements OffloadIndexEntry {
     }
     @Override
     public long getDataOffset() {
-        return offset + DataBlockHeaderImpl.getDataStartOffset();
+        return offset + blockHeaderSize;
     }
 
-    public OffloadIndexEntryImpl(long entryId, int partId, long offset) {
+    private OffloadIndexEntryImpl(long entryId, int partId, long offset, long blockHeaderSize) {
         this.entryId = entryId;
         this.partId = partId;
         this.offset = offset;
+        this.blockHeaderSize = blockHeaderSize;
     }
 }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java
index f3515ed..a658032 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.io.ByteArrayInputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import lombok.extern.slf4j.Slf4j;
@@ -34,22 +35,22 @@ public class DataBlockHeaderTest {
 
     @Test
     public void dataBlockHeaderImplTest() throws Exception {
-        int headerLength = 1024 * 1024;
+        int blockLength = 1024 * 1024;
         long firstEntryId = 3333L;
 
-        DataBlockHeaderImpl dataBlockHeader = DataBlockHeaderImpl.of(headerLength,
+        DataBlockHeaderImpl dataBlockHeader = DataBlockHeaderImpl.of(blockLength,
             firstEntryId);
 
         // verify get methods
         assertEquals(dataBlockHeader.getBlockMagicWord(), DataBlockHeaderImpl.MAGIC_WORD);
-        assertEquals(dataBlockHeader.getBlockLength(), headerLength);
+        assertEquals(dataBlockHeader.getBlockLength(), blockLength);
         assertEquals(dataBlockHeader.getFirstEntryId(), firstEntryId);
 
         // verify toStream and fromStream
         InputStream stream = dataBlockHeader.toStream();
         stream.mark(0);
         DataBlockHeader rebuild = DataBlockHeaderImpl.fromStream(stream);
-        assertEquals(rebuild.getBlockLength(), headerLength);
+        assertEquals(rebuild.getBlockLength(), blockLength);
         assertEquals(rebuild.getFirstEntryId(), firstEntryId);
         // verify InputStream reach end
         assertEquals(stream.read(), -1);
@@ -72,8 +73,8 @@ public class DataBlockHeaderTest {
                 new ByteArrayInputStream(streamContent, 0, DataBlockHeaderImpl.getDataStartOffset() - 1)) {
             DataBlockHeader rebuild3 = DataBlockHeaderImpl.fromStream(stream3);
             fail("Should throw EOFException");
-        } catch (Exception e) {
-            assertTrue(e instanceof java.io.EOFException);
+        } catch (EOFException e) {
+            // expected
         }
 
         stream.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
index af9d6e4..cff4045 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
@@ -47,17 +47,19 @@ public class OffloadIndexTest {
     @Test
     public void offloadIndexEntryImplTest() {
         // verify OffloadIndexEntryImpl builder
-        OffloadIndexEntryImpl entry1 = OffloadIndexEntryImpl.of(0, 2, 0);
-        OffloadIndexEntryImpl entry2 = OffloadIndexEntryImpl.of(100, 3, 1234);
+        OffloadIndexEntryImpl entry1 = OffloadIndexEntryImpl.of(0, 2, 0, 20);
+        OffloadIndexEntryImpl entry2 = OffloadIndexEntryImpl.of(100, 3, 1234, 20);
 
         // verify OffloadIndexEntryImpl get
         assertEquals(entry1.getEntryId(), 0L);
         assertEquals(entry1.getPartId(), 2);
         assertEquals(entry1.getOffset(), 0L);
+        assertEquals(entry1.getDataOffset(), 20L);
 
         assertEquals(entry2.getEntryId(), 100L);
         assertEquals(entry2.getPartId(), 3);
         assertEquals(entry2.getOffset(), 1234L);
+        assertEquals(entry2.getDataOffset(), 1254L);
     }
 
 
@@ -108,7 +110,7 @@ public class OffloadIndexTest {
         LedgerMetadata metadata = createLedgerMetadata();
         log.debug("created metadata: {}", metadata.toString());
 
-        blockBuilder.withLedgerMetadata(metadata).withDataObjectLength(1);
+        blockBuilder.withLedgerMetadata(metadata).withDataObjectLength(1).withDataBlockHeaderLength(23455);
 
         blockBuilder.addBlock(0, 2, 64 * 1024 * 1024);
         blockBuilder.addBlock(1000, 3, 64 * 1024 * 1024);
@@ -162,6 +164,7 @@ public class OffloadIndexTest {
         int magic = wrapper.readInt();
         int indexBlockLength = wrapper.readInt();
         long dataObjectLength = wrapper.readLong();
+        long dataHeaderLength = wrapper.readLong();
         int segmentMetadataLength = wrapper.readInt();
         int indexEntryCount = wrapper.readInt();
 
@@ -170,25 +173,32 @@ public class OffloadIndexTest {
         assertEquals(indexBlockLength, readoutLen);
         assertEquals(indexEntryCount, 3);
         assertEquals(dataObjectLength, 1);
+        assertEquals(dataHeaderLength, 23455);
 
         wrapper.readBytes(segmentMetadataLength);
         log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}",
             magic, indexBlockLength, segmentMetadataLength, indexEntryCount);
 
         // verify entry
-        OffloadIndexEntry e1 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(), wrapper.readLong());
-        OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(), wrapper.readLong());
-        OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(), wrapper.readLong());;
+        OffloadIndexEntry e1 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+                                                        wrapper.readLong(), dataHeaderLength);
+        OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+                                                        wrapper.readLong(), dataHeaderLength);
+        OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(),
+                                                        wrapper.readLong(), dataHeaderLength);;
 
         assertEquals(e1.getEntryId(),entry1.getEntryId());
         assertEquals(e1.getPartId(), entry1.getPartId());
         assertEquals(e1.getOffset(), entry1.getOffset());
+        assertEquals(e1.getDataOffset(), entry1.getDataOffset());
         assertEquals(e2.getEntryId(), entry2.getEntryId());
         assertEquals(e2.getPartId(), entry2.getPartId());
         assertEquals(e2.getOffset(), entry2.getOffset());
+        assertEquals(e2.getDataOffset(), entry2.getDataOffset());
         assertEquals(e3.getEntryId(), entry3.getEntryId());
         assertEquals(e3.getPartId(), entry3.getPartId());
         assertEquals(e3.getOffset(), entry3.getOffset());
+        assertEquals(e3.getDataOffset(), entry3.getDataOffset());
         wrapper.release();
 
         // verify build OffloadIndexBlock from InputStream

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.