You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/10 01:22:56 UTC

[incubator-pulsar] branch master updated: BlockAwareSegmentInputStream should only return unsigned bytes (#1741)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 114be44  BlockAwareSegmentInputStream should only return unsigned bytes (#1741)
114be44 is described below

commit 114be4457320715d0b34922275f0f37907e1d429
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu May 10 03:22:53 2018 +0200

    BlockAwareSegmentInputStream should only return unsigned bytes (#1741)
    
    The contract for InputStream expects that read() returns a positive
    int, except in the case of EOF, when it returns -1. In
    BlockAwareSegmentInputStream, we were returning the padding as a
    signed byte, and reading from the ledger as a signed byte. This meant
    that for padding, we were returning negative integers, and possibly
    returning negative integers, even -1 depending on the data, when
    reading from the ledger.
    
    This change ensures that BlockAwareSegmentInputStream always returns
    positive int except in the case of EOF.
    
    Master Issue: #1511
---
 .../impl/BlockAwareSegmentInputStreamImpl.java     |  9 +---
 .../impl/BlockAwareSegmentInputStreamTest.java     | 60 +++++++++++++++++-----
 2 files changed, 48 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
index 92d1cc0..624b9d9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
 public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStream {
     private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
 
-    private static final byte[] BLOCK_END_PADDING = Ints.toByteArray(0xFEDCDEAD);
+    private static final int[] BLOCK_END_PADDING = new int[] { 0xFE, 0xDC, 0xDE, 0xAD };
 
     private final ReadHandle ledger;
     private final long startEntryId;
@@ -93,7 +93,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
         if (!entriesByteBuf.isEmpty() && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {
             // always read from the first ByteBuf in the list, once read all of its content remove it.
             ByteBuf entryByteBuf = entriesByteBuf.get(0);
-            int ret = entryByteBuf.readByte();
+            int ret = entryByteBuf.readUnsignedByte();
             bytesReadOffset++;
 
             if (entryByteBuf.readableBytes() == 0) {
@@ -204,10 +204,5 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
     public int getBlockEntryBytesCount() {
         return dataBlockFullOffset - DataBlockHeaderImpl.getDataStartOffset() - ENTRY_HEADER_SIZE * blockEntryCount;
     }
-
-    public static byte[] getBlockEndPadding() {
-        return BLOCK_END_PADDING;
-    }
-
 }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
index 8165292..e6f5249 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
@@ -33,8 +33,10 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
+import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -49,23 +51,25 @@ import org.testng.collections.Lists;
 
 @Slf4j
 public class BlockAwareSegmentInputStreamTest {
+    private static final byte DEFAULT_ENTRY_BYTE = 0xB;
+
     @Data
     class MockLedgerEntry implements LedgerEntry {
-        public byte blockPadding = 0xB;
         long ledgerId;
         long entryId;
         long length;
         byte entryBytes[];
         ByteBuf entryBuffer;
 
-        MockLedgerEntry(long ledgerId, long entryId, long length) {
+        MockLedgerEntry(long ledgerId, long entryId, long length,
+                        Supplier<Byte> dataSupplier) {
             this.ledgerId = ledgerId;
             this.entryId = entryId;
             this.length = length;
             this.entryBytes = new byte[(int)length];
             entryBuffer = Unpooled.wrappedBuffer(entryBytes);
             entryBuffer.writerIndex(0);
-            IntStream.range(0, (int)length).forEach(i -> entryBuffer.writeByte(blockPadding));
+            IntStream.range(0, (int)length).forEach(i -> entryBuffer.writeByte(dataSupplier.get()));
         }
 
         @Override
@@ -92,7 +96,7 @@ public class BlockAwareSegmentInputStreamTest {
         int entrySize;
         List<LedgerEntry> entries;
 
-        MockLedgerEntries(int ledgerId, int startEntryId, int count, int entrySize) {
+        MockLedgerEntries(int ledgerId, int startEntryId, int count, int entrySize, Supplier<Byte> dataSupplier) {
             this.ledgerId = ledgerId;
             this.startEntryId = startEntryId;
             this.count = count;
@@ -100,7 +104,7 @@ public class BlockAwareSegmentInputStreamTest {
             this.entries = Lists.newArrayList(count);
 
             IntStream.range(startEntryId, startEntryId + count).forEach(i ->
-                entries.add(new MockLedgerEntry(ledgerId, i, entrySize)));
+                    entries.add(new MockLedgerEntry(ledgerId, i, entrySize, dataSupplier)));
         }
 
         @Override
@@ -127,10 +131,17 @@ public class BlockAwareSegmentInputStreamTest {
         int ledgerId;
         int entrySize;
         int lac;
-        MockReadHandle(int ledgerId, int entrySize, int lac) {
+        Supplier<Byte> dataSupplier;
+
+        MockReadHandle(int ledgerId, int entrySize, int lac, Supplier<Byte> dataSupplier) {
             this.ledgerId = ledgerId;
             this.entrySize = entrySize;
             this.lac = lac;
+            this.dataSupplier = dataSupplier;
+        }
+
+        MockReadHandle(int ledgerId, int entrySize, int lac) {
+            this(ledgerId, entrySize, lac, () -> DEFAULT_ENTRY_BYTE);
         }
 
         @Override
@@ -139,7 +150,7 @@ public class BlockAwareSegmentInputStreamTest {
             LedgerEntries entries = new MockLedgerEntries(ledgerId,
                 (int)firstEntry,
                 (int)(lastEntry - firstEntry + 1),
-                entrySize);
+                    entrySize, dataSupplier);
 
             future.complete(entries);
             return future;
@@ -251,8 +262,7 @@ public class BlockAwareSegmentInputStreamTest {
         ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
         IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
             assertEquals(Integer.toHexString(paddingBuf.readInt()),
-                Integer.toHexString(Ints.fromByteArray(inputStream.getBlockEndPadding())))
-        );
+                         Integer.toHexString(0xFEDCDEAD)));
 
         // 4. reach end.
         assertEquals(inputStream.read(), -1);
@@ -420,8 +430,7 @@ public class BlockAwareSegmentInputStreamTest {
         ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
         IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
             assertEquals(Integer.toHexString(paddingBuf.readInt()),
-                Integer.toHexString(Ints.fromByteArray(inputStream.getBlockEndPadding())))
-        );
+                         Integer.toHexString(0xFEDCDEAD)));
 
         // 3. reach end.
         assertEquals(inputStream.read(), -1);
@@ -486,9 +495,8 @@ public class BlockAwareSegmentInputStreamTest {
         inputStream.read(padding);
         ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
         IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
-            assertEquals(Integer.toHexString(paddingBuf.readInt()),
-                Integer.toHexString(Ints.fromByteArray(inputStream.getBlockEndPadding())))
-        );
+                assertEquals(Integer.toHexString(paddingBuf.readInt()),
+                             Integer.toHexString(0xFEDCDEAD)));
 
         // 3. reach end.
         assertEquals(inputStream.read(), -1);
@@ -500,4 +508,28 @@ public class BlockAwareSegmentInputStreamTest {
         inputStream.close();
     }
 
+    @Test
+    public void testOnlyNegativeOnEOF() throws Exception {
+        int ledgerId = 1;
+        int entrySize = 10000;
+        int lac = 0;
+
+        Random r = new Random(0);
+        ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, () -> (byte)r.nextInt());
+
+        int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 2;
+        BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+
+        int bytesRead = 0;
+        for (int i = 0; i < blockSize*2; i++) {
+            int ret = inputStream.read();
+            if (ret < 0) { // should only be EOF
+                assertEquals(bytesRead, blockSize);
+                break;
+            } else {
+                bytesRead++;
+            }
+        }
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.