You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/10 01:22:56 UTC
[incubator-pulsar] branch master updated:
BlockAwareSegmentInputStream should only return unsigned bytes (#1741)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 114be44 BlockAwareSegmentInputStream should only return unsigned bytes (#1741)
114be44 is described below
commit 114be4457320715d0b34922275f0f37907e1d429
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu May 10 03:22:53 2018 +0200
BlockAwareSegmentInputStream should only return unsigned bytes (#1741)
The contract for InputStream expects that read() returns a positive
int, except in the case of EOF, when it returns -1. In
BlockAwareSegmentInputStream, we were returning the padding as a
signed byte, and reading from the ledger as a signed byte. This meant
that for padding, we were returning negative integers, and possibly
returning negative integers, even -1 depending on the data, when
reading from the ledger.
This change ensures that BlockAwareSegmentInputStream always returns
positive int except in the case of EOF.
Master Issue: #1511
---
.../impl/BlockAwareSegmentInputStreamImpl.java | 9 +---
.../impl/BlockAwareSegmentInputStreamTest.java | 60 +++++++++++++++++-----
2 files changed, 48 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
index 92d1cc0..624b9d9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStream {
private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);
- private static final byte[] BLOCK_END_PADDING = Ints.toByteArray(0xFEDCDEAD);
+ private static final int[] BLOCK_END_PADDING = new int[] { 0xFE, 0xDC, 0xDE, 0xAD };
private final ReadHandle ledger;
private final long startEntryId;
@@ -93,7 +93,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
if (!entriesByteBuf.isEmpty() && bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {
// always read from the first ByteBuf in the list, once read all of its content remove it.
ByteBuf entryByteBuf = entriesByteBuf.get(0);
- int ret = entryByteBuf.readByte();
+ int ret = entryByteBuf.readUnsignedByte();
bytesReadOffset++;
if (entryByteBuf.readableBytes() == 0) {
@@ -204,10 +204,5 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
public int getBlockEntryBytesCount() {
return dataBlockFullOffset - DataBlockHeaderImpl.getDataStartOffset() - ENTRY_HEADER_SIZE * blockEntryCount;
}
-
- public static byte[] getBlockEndPadding() {
- return BLOCK_END_PADDING;
- }
-
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
index 8165292..e6f5249 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamTest.java
@@ -33,8 +33,10 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
+import java.util.function.Supplier;
import java.util.stream.IntStream;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -49,23 +51,25 @@ import org.testng.collections.Lists;
@Slf4j
public class BlockAwareSegmentInputStreamTest {
+ private static final byte DEFAULT_ENTRY_BYTE = 0xB;
+
@Data
class MockLedgerEntry implements LedgerEntry {
- public byte blockPadding = 0xB;
long ledgerId;
long entryId;
long length;
byte entryBytes[];
ByteBuf entryBuffer;
- MockLedgerEntry(long ledgerId, long entryId, long length) {
+ MockLedgerEntry(long ledgerId, long entryId, long length,
+ Supplier<Byte> dataSupplier) {
this.ledgerId = ledgerId;
this.entryId = entryId;
this.length = length;
this.entryBytes = new byte[(int)length];
entryBuffer = Unpooled.wrappedBuffer(entryBytes);
entryBuffer.writerIndex(0);
- IntStream.range(0, (int)length).forEach(i -> entryBuffer.writeByte(blockPadding));
+ IntStream.range(0, (int)length).forEach(i -> entryBuffer.writeByte(dataSupplier.get()));
}
@Override
@@ -92,7 +96,7 @@ public class BlockAwareSegmentInputStreamTest {
int entrySize;
List<LedgerEntry> entries;
- MockLedgerEntries(int ledgerId, int startEntryId, int count, int entrySize) {
+ MockLedgerEntries(int ledgerId, int startEntryId, int count, int entrySize, Supplier<Byte> dataSupplier) {
this.ledgerId = ledgerId;
this.startEntryId = startEntryId;
this.count = count;
@@ -100,7 +104,7 @@ public class BlockAwareSegmentInputStreamTest {
this.entries = Lists.newArrayList(count);
IntStream.range(startEntryId, startEntryId + count).forEach(i ->
- entries.add(new MockLedgerEntry(ledgerId, i, entrySize)));
+ entries.add(new MockLedgerEntry(ledgerId, i, entrySize, dataSupplier)));
}
@Override
@@ -127,10 +131,17 @@ public class BlockAwareSegmentInputStreamTest {
int ledgerId;
int entrySize;
int lac;
- MockReadHandle(int ledgerId, int entrySize, int lac) {
+ Supplier<Byte> dataSupplier;
+
+ MockReadHandle(int ledgerId, int entrySize, int lac, Supplier<Byte> dataSupplier) {
this.ledgerId = ledgerId;
this.entrySize = entrySize;
this.lac = lac;
+ this.dataSupplier = dataSupplier;
+ }
+
+ MockReadHandle(int ledgerId, int entrySize, int lac) {
+ this(ledgerId, entrySize, lac, () -> DEFAULT_ENTRY_BYTE);
}
@Override
@@ -139,7 +150,7 @@ public class BlockAwareSegmentInputStreamTest {
LedgerEntries entries = new MockLedgerEntries(ledgerId,
(int)firstEntry,
(int)(lastEntry - firstEntry + 1),
- entrySize);
+ entrySize, dataSupplier);
future.complete(entries);
return future;
@@ -251,8 +262,7 @@ public class BlockAwareSegmentInputStreamTest {
ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
assertEquals(Integer.toHexString(paddingBuf.readInt()),
- Integer.toHexString(Ints.fromByteArray(inputStream.getBlockEndPadding())))
- );
+ Integer.toHexString(0xFEDCDEAD)));
// 4. reach end.
assertEquals(inputStream.read(), -1);
@@ -420,8 +430,7 @@ public class BlockAwareSegmentInputStreamTest {
ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
assertEquals(Integer.toHexString(paddingBuf.readInt()),
- Integer.toHexString(Ints.fromByteArray(inputStream.getBlockEndPadding())))
- );
+ Integer.toHexString(0xFEDCDEAD)));
// 3. reach end.
assertEquals(inputStream.read(), -1);
@@ -486,9 +495,8 @@ public class BlockAwareSegmentInputStreamTest {
inputStream.read(padding);
ByteBuf paddingBuf = Unpooled.wrappedBuffer(padding);
IntStream.range(0, paddingBuf.capacity()/4).forEach(i ->
- assertEquals(Integer.toHexString(paddingBuf.readInt()),
- Integer.toHexString(Ints.fromByteArray(inputStream.getBlockEndPadding())))
- );
+ assertEquals(Integer.toHexString(paddingBuf.readInt()),
+ Integer.toHexString(0xFEDCDEAD)));
// 3. reach end.
assertEquals(inputStream.read(), -1);
@@ -500,4 +508,28 @@ public class BlockAwareSegmentInputStreamTest {
inputStream.close();
}
+ @Test
+ public void testOnlyNegativeOnEOF() throws Exception {
+ int ledgerId = 1;
+ int entrySize = 10000;
+ int lac = 0;
+
+ Random r = new Random(0);
+ ReadHandle readHandle = new MockReadHandle(ledgerId, entrySize, lac, () -> (byte)r.nextInt());
+
+ int blockSize = DataBlockHeaderImpl.getDataStartOffset() + entrySize * 2;
+ BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, blockSize);
+
+ int bytesRead = 0;
+ for (int i = 0; i < blockSize*2; i++) {
+ int ret = inputStream.read();
+ if (ret < 0) { // should only be EOF
+ assertEquals(bytesRead, blockSize);
+ break;
+ } else {
+ bytesRead++;
+ }
+ }
+ }
+
}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.