You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/08 09:13:18 UTC

[GitHub] sijie closed pull request #1739: Allow padding at end of last BlockAwareSegmentInputStream

sijie closed pull request #1739: Allow padding at end of last BlockAwareSegmentInputStream
URL: https://github.com/apache/incubator-pulsar/pull/1739
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
index f1b07bd89a..92d1cc0d29 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
@@ -64,7 +64,7 @@
     // how many entries want to read from ReadHandle each time.
     private static final int ENTRIES_PER_READ = 100;
     // buf the entry size and entry id.
-    private static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* entry id */;
+    static final int ENTRY_HEADER_SIZE = 4 /* entry size*/ + 8 /* entry id */;
     // Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content.
     private List<ByteBuf> entriesByteBuf = null;
 
@@ -83,18 +83,14 @@ private int readEntries() throws IOException {
         checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
         checkState(bytesReadOffset < blockSize);
 
-        // once reach the end of entry buffer, start a new read.
-        if (bytesReadOffset < dataBlockFullOffset && entriesByteBuf.isEmpty()) {
+        // once reach the end of entry buffer, read more, if there is more
+        if (bytesReadOffset < dataBlockFullOffset
+            && entriesByteBuf.isEmpty()
+            && startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
             entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
         }
 
-        if ((!entriesByteBuf.isEmpty()) && bytesReadOffset + entriesByteBuf.get(0).readableBytes() > blockSize) {
-            // no space for a new entry, set data block full, return end padding
-            if (dataBlockFullOffset == blockSize) {
-                dataBlockFullOffset = bytesReadOffset;
-            }
-            return BLOCK_END_PADDING[(bytesReadOffset++ - dataBlockFullOffset) % BLOCK_END_PADDING.length];
-        } else  {
+        if (!entriesByteBuf.isEmpty() && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {
             // always read from the first ByteBuf in the list, once read all of its content remove it.
             ByteBuf entryByteBuf = entriesByteBuf.get(0);
             int ret = entryByteBuf.readByte();
@@ -107,6 +103,13 @@ private int readEntries() throws IOException {
             }
 
             return ret;
+        } else {
+            // no space for a new entry or there are no more entries
+            // set data block full, return end padding
+            if (dataBlockFullOffset == blockSize) {
+                dataBlockFullOffset = bytesReadOffset;
+            }
+            return BLOCK_END_PADDING[(bytesReadOffset++ - dataBlockFullOffset) % BLOCK_END_PADDING.length];
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
index 4957c53e08..8165292d57 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
@@ -274,7 +274,8 @@ public void testNoEndPadding() throws Exception {
         // set block size equals to (header + entry) size.
         int blockSize = 2148;
         BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
-        int expectedEntryCount = (blockSize - DataBlockHeaderImpl.getDataStartOffset()) / (entrySize + 4 + 8);
+        int expectedEntryCount = (blockSize - DataBlockHeaderImpl.getDataStartOffset())
+            / (entrySize + BlockAwareSegmentInputStreamImpl.ENTRY_HEADER_SIZE);
 
         // verify get methods
         assertEquals(inputStream.getLedger(), readHandle);
@@ -432,4 +433,71 @@ public void testNoEntryPutIn() throws Exception {
         inputStream.close();
     }
 
+    @Test
+    public void testPaddingOnLastBlock() throws Exception {
+        int ledgerId = 1;
+        int entrySize = 1000;
+        int lac = 0;
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac);
+
+        // set block size not able to hold one entry
+        int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 2;
+        BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+        int expectedEntryCount = 1;
+
+        // verify get methods
+        assertEquals(inputStream.getLedger(), readHandle);
+        assertEquals(inputStream.getStartEntryId(), 0);
+        assertEquals(inputStream.getBlockSize(), blockSize);
+
+        // verify read inputStream
+        // 1. read header. 128
+        byte headerB[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+        ByteStreams.readFully(inputStream, headerB);
+        DataBlockHeader headerRead = DataBlockHeaderImpl.fromStream(new ByteArrayInputStream(headerB));
+        assertEquals(headerRead.getBlockLength(), blockSize);
+        assertEquals(headerRead.getFirstEntryId(), 0);
+
+        // 2. There should be a single entry
+        byte[] entryData = new byte[entrySize];
+        Arrays.fill(entryData, (byte)0xB); // 0xB is MockLedgerEntry.blockPadding
+
+        IntStream.range(0, expectedEntryCount).forEach(i -> {
+            try {
+                byte lengthBuf[] = new byte[4];
+                byte entryIdBuf[] = new byte[8];
+                byte content[] = new byte[entrySize];
+                inputStream.read(lengthBuf);
+                inputStream.read(entryIdBuf);
+                inputStream.read(content);
+
+                assertEquals(entrySize, Ints.fromByteArray(lengthBuf));
+                assertEquals(i, Longs.fromByteArray(entryIdBuf));
+                assertArrayEquals(entryData, content);
+            } catch (Exception e) {
+                fail("meet exception", e);
+            }
+        });
+
+        // 3. Then padding
+        int consumedBytes = DataBlockHeaderImpl.getDataStartOffset()
+            + expectedEntryCount * (entrySize + BlockAwareSegmentInputStreamImpl.ENTRY_HEADER_SIZE);
+        byte padding[] = new byte[blockSize - consumedBytes];
+        inputStream.read(padding);
+        ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
+        IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
+            assertEquals(Integer.toHexString(paddingBuf.readInt()),
+                Integer.toHexString(Ints.fromByteArray(inputStream.getBlockEndPadding())))
+        );
+
+        // 3. reach end.
+        assertEquals(inputStream.read(), -1);
+
+        assertEquals(inputStream.getBlockEntryCount(), 1);
+        assertEquals(inputStream.getBlockEntryBytesCount(), entrySize);
+        assertEquals(inputStream.getEndEntryId(), 0);
+
+        inputStream.close();
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services