You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/08 09:13:18 UTC

[incubator-pulsar] branch master updated: Allow padding at end of last BlockAwareSegmentInputStream (#1739)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f331ff7  Allow padding at end of last BlockAwareSegmentInputStream (#1739)
f331ff7 is described below

commit f331ff756cb1e86a5c0883d78c3cc6ad559c28c0
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue May 8 11:13:16 2018 +0200

    Allow padding at end of last BlockAwareSegmentInputStream (#1739)
    
    Previous to this patch, we expected that the last block for an
    offloaded ledger would have its size calculated externally, before
    writing, and that calculation would be correct to the byte. This is
    fine, and should be the case in most cases, but in the case where the
    calculation was off by one, the whole stream breaks.
    
    This patch makes BlockAwareSegmentInputStreamImpl more forgiving in
    terms of block size. If the block size is exact, it works as before,
    but if the block size is larger than needed, padding is added.
    
    Master Issue: #1511
---
 .../impl/BlockAwareSegmentInputStreamImpl.java     | 23 +++----
 .../impl/BlockAwareSegmentInputStreamTest.java     | 70 +++++++++++++++++++++-
 2 files changed, 82 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
index f1b07bd..92d1cc0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
@@ -64,7 +64,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
     // how many entries want to read from ReadHandle each time.
     private static final int ENTRIES_PER_READ = 100;
     // buf the entry size and entry id.
-    private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* entry id */;
+    static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* entry id */;
     // Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content.
     private List<ByteBuf> entriesByteBuf = null;
 
@@ -83,18 +83,14 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
         checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
         checkState(bytesReadOffset < blockSize);
 
-        // once reach the end of entry buffer, start a new read.
-        if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) {
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
             entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
         }
 
-        if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + entriesByteBuf.get(0).readableBytes() > blockSize) {
-            // no space for a new entry, set data block full, return end padding
-            if (dataBlockFullOffset == blockSize) {
-                dataBlockFullOffset = bytesReadOffset;
-            }
-            return BLOCK_END_PADDING[(bytesReadOffset++ - dataBlockFullOffset) % BLOCK_END_PADDING.length];
-        } else  {
+        if (!entriesByteBuf.isEmpty() && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {
             // always read from the first ByteBuf in the list, once read all of its content remove it.
             ByteBuf entryByteBuf = entriesByteBuf.get(0);
             int ret = entryByteBuf.readByte();
@@ -107,6 +103,13 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
             }
 
             return ret;
+        } else {
+            // no space for a new entry or there are no more entries
+            // set data block full, return end padding
+            if (dataBlockFullOffset == blockSize) {
+                dataBlockFullOffset = bytesReadOffset;
+            }
+            return BLOCK_END_PADDING[(bytesReadOffset++ - dataBlockFullOffset) % BLOCK_END_PADDING.length];
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
index 4957c53..8165292 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
@@ -274,7 +274,8 @@ public class BlockAwareSegmentInputStreamTest {
         // set block size equals to (header + entry) size.
         int blockSize = 2148;
         BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
-        int expectedEntryCount = (blockSize - DataBlockHeaderImpl.getDataStartOffset()) / (entrySize + 4 + 8);
+        int expectedEntryCount = (blockSize - DataBlockHeaderImpl.getDataStartOffset())
+            / (entrySize + BlockAwareSegmentInputStreamImpl.ENTRY_HEADER_SIZE);
 
         // verify get methods
         assertEquals(inputStream.getLedger(), readHandle);
@@ -432,4 +433,71 @@ public class BlockAwareSegmentInputStreamTest {
         inputStream.close();
     }
 
+    @Test
+    public void testPaddingOnLastBlock() throws Exception {
+        int ledgerId = 1;
+        int entrySize = 1000;
+        int lac = 0;
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac);
+
+        // set block size not able to hold one entry
+        int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 2;
+        BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+        int expectedEntryCount = 1;
+
+        // verify get methods
+        assertEquals(inputStream.getLedger(), readHandle);
+        assertEquals(inputStream.getStartEntryId(), 0);
+        assertEquals(inputStream.getBlockSize(), blockSize);
+
+        // verify read inputStream
+        // 1. read header. 128
+        byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+        ByteStreams.readFully(inputStream, headerB);
+        DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
+        assertEquals(headerRead.getBlockLength(), blockSize);
+        assertEquals(headerRead.getFirstEntryId(), 0);
+
+        // 2. There should be a single entry
+        byte[] entryData = new byte[entrySize];
+        Arrays.fill(entryData, (byte)0xB); // 0xB is MockLedgerEntry.blockPadding
+
+        IntStream.range(0, expectedEntryCount).forEach(i -> {
+            try {
+                byte lengthBuf[] = new byte[4];
+                byte entryIdBuf[] = new byte[8];
+                byte content[] = new byte[entrySize];
+                inputStream.read(lengthBuf);
+                inputStream.read(entryIdBuf);
+                inputStream.read(content);
+
+                assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
+                assertEquals(i, Longs.fromByteArray(entryIdBuf));
+                assertArrayEquals(entryData, content);
+            } catch (Exception e) {
+                fail("meet exception", e);
+            }
+        });
+
+        // 3. Then padding
+        int consumedBytes = DataBlockHeaderImpl.getDataStartOffset()
+            + expectedEntryCount * (entrySize + BlockAwareSegmentInputStreamImpl.ENTRY_HEADER_SIZE);
+        byte padding[] = new byte[blockSize - consumedBytes];
+        inputStream.read(padding);
+        ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
+        IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
+            assertEquals(Integer.toHexString(paddingBuf.readInt()),
+                Integer.toHexString(Ints.fromByteArray(inputStream.getBlockEndPadding())))
+        );
+
+        // 3. reach end.
+        assertEquals(inputStream.read(), -1);
+
+        assertEquals(inputStream.getBlockEntryCount(), 1);
+        assertEquals(inputStream.getBlockEntryBytesCount(), entrySize);
+        assertEquals(inputStream.getEndEntryId(), 0);
+
+        inputStream.close();
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.