You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/02 08:44:22 UTC
kafka git commit: KAFKA-5365;
Fix regression in compressed message iteration affecting magic v0 and
v1
Repository: kafka
Updated Branches:
refs/heads/trunk 1188db565 -> cd507f36a
KAFKA-5365; Fix regression in compressed message iteration affecting magic v0 and v1
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3203 from hachikuji/KAFKA-5365
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cd507f36
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cd507f36
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cd507f36
Branch: refs/heads/trunk
Commit: cd507f36a0ee23a15c2088cbd4efd45d2d82051d
Parents: 1188db5
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Jun 2 09:29:29 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jun 2 09:29:29 2017 +0100
----------------------------------------------------------------------
.../record/AbstractLegacyRecordBatch.java | 62 ++++++++++++--------
.../kafka/common/record/DefaultRecordBatch.java | 1 -
.../record/AbstractLegacyRecordBatchTest.java | 39 ++++++++++++
3 files changed, 76 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd507f36/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 9b74d06..eaf691e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -31,6 +31,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
+import java.util.Iterator;
import java.util.NoSuchElementException;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
@@ -222,7 +223,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
* @return An iterator over the records contained within this batch
*/
@Override
- public CloseableIterator<Record> iterator() {
+ public Iterator<Record> iterator() {
return iterator(BufferSupplier.NO_CACHING);
}
@@ -307,14 +308,18 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
}
private static class DeepRecordsIterator extends AbstractIterator<Record> implements CloseableIterator<Record> {
- private final ArrayDeque<AbstractLegacyRecordBatch> batches;
+ private final ArrayDeque<AbstractLegacyRecordBatch> innerEntries;
private final long absoluteBaseOffset;
private final byte wrapperMagic;
- private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, boolean ensureMatchingMagic,
- int maxMessageSize, BufferSupplier bufferSupplier) {
+ private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry,
+ boolean ensureMatchingMagic,
+ int maxMessageSize,
+ BufferSupplier bufferSupplier) {
LegacyRecord wrapperRecord = wrapperEntry.outerRecord();
this.wrapperMagic = wrapperRecord.magic();
+ if (wrapperMagic != RecordBatch.MAGIC_VALUE_V0 && wrapperMagic != RecordBatch.MAGIC_VALUE_V1)
+ throw new InvalidRecordException("Invalid wrapper magic found in legacy deep record iterator " + wrapperMagic);
CompressionType compressionType = wrapperRecord.compressionType();
ByteBuffer wrapperValue = wrapperRecord.value();
@@ -325,47 +330,54 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
InputStream stream = compressionType.wrapForInput(wrapperValue, wrapperRecord.magic(), bufferSupplier);
LogInputStream<AbstractLegacyRecordBatch> logStream = new DataLogInputStream(stream, maxMessageSize);
- long wrapperRecordOffset = wrapperEntry.lastOffset();
- long wrapperRecordTimestamp = wrapperRecord.timestamp();
- this.batches = new ArrayDeque<>();
+ long lastOffsetFromWrapper = wrapperEntry.lastOffset();
+ long timestampFromWrapper = wrapperRecord.timestamp();
+ this.innerEntries = new ArrayDeque<>();
// If relative offset is used, we need to decompress the entire message first to compute
// the absolute offset. For simplicity and because it's a format that is on its way out, we
// do the same for message format version 0
try {
while (true) {
- AbstractLegacyRecordBatch batch = logStream.nextBatch();
- if (batch == null)
+ AbstractLegacyRecordBatch innerEntry = logStream.nextBatch();
+ if (innerEntry == null)
break;
- LegacyRecord record = batch.outerRecord();
+ LegacyRecord record = innerEntry.outerRecord();
byte magic = record.magic();
if (ensureMatchingMagic && magic != wrapperMagic)
throw new InvalidRecordException("Compressed message magic " + magic +
" does not match wrapper magic " + wrapperMagic);
- if (magic > RecordBatch.MAGIC_VALUE_V0) {
+ if (magic == RecordBatch.MAGIC_VALUE_V1) {
LegacyRecord recordWithTimestamp = new LegacyRecord(
record.buffer(),
- wrapperRecordTimestamp,
+ timestampFromWrapper,
wrapperRecord.timestampType());
- batch = new BasicLegacyRecordBatch(batch.lastOffset(), recordWithTimestamp);
+ innerEntry = new BasicLegacyRecordBatch(innerEntry.lastOffset(), recordWithTimestamp);
}
- batches.addLast(batch);
- // break early if we reach the last offset in the batch
- if (batch.offset() == wrapperRecordOffset)
- break;
+ innerEntries.addLast(innerEntry);
}
- if (batches.isEmpty())
+ if (innerEntries.isEmpty())
throw new InvalidRecordException("Found invalid compressed record set with no inner records");
- if (wrapperMagic > RecordBatch.MAGIC_VALUE_V0)
- this.absoluteBaseOffset = wrapperRecordOffset - batches.getLast().lastOffset();
- else
+ if (wrapperMagic == RecordBatch.MAGIC_VALUE_V1) {
+ if (lastOffsetFromWrapper == 0) {
+ // The outer offset may be 0 if this is produce data from an older client.
+ this.absoluteBaseOffset = 0;
+ } else {
+ long lastInnerOffset = innerEntries.getLast().offset();
+ if (lastOffsetFromWrapper < lastInnerOffset)
+ throw new InvalidRecordException("Found invalid wrapper offset in compressed v1 message set: "
+ + lastOffsetFromWrapper);
+ this.absoluteBaseOffset = lastOffsetFromWrapper - lastInnerOffset;
+ }
+ } else {
this.absoluteBaseOffset = -1;
+ }
} catch (IOException e) {
throw new KafkaException(e);
} finally {
@@ -375,14 +387,14 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
@Override
protected Record makeNext() {
- if (batches.isEmpty())
+ if (innerEntries.isEmpty())
return allDone();
- AbstractLegacyRecordBatch entry = batches.remove();
+ AbstractLegacyRecordBatch entry = innerEntries.remove();
// Convert offset to absolute offset if needed.
- if (absoluteBaseOffset >= 0) {
- long absoluteOffset = absoluteBaseOffset + entry.lastOffset();
+ if (wrapperMagic == RecordBatch.MAGIC_VALUE_V1) {
+ long absoluteOffset = absoluteBaseOffset + entry.offset();
entry = new BasicLegacyRecordBatch(absoluteOffset, entry.outerRecord());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd507f36/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index c05cab8..ab0813a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -290,7 +290,6 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
}
}
-
@Override
public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
if (isCompressed())
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd507f36/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
index 0f01f2a..df6a16b 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.record.AbstractLegacyRecordBatch.ByteBufferLegacy
import org.apache.kafka.common.utils.Utils;
import org.junit.Test;
+import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -56,6 +57,44 @@ public class AbstractLegacyRecordBatchTest {
assertEquals(offset++, record.offset());
}
+ @Test
+ public void testIterateCompressedRecordWithWrapperOffsetZero() {
+ for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1)) {
+ SimpleRecord[] simpleRecords = new SimpleRecord[] {
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes())
+ };
+
+ MemoryRecords records = MemoryRecords.withRecords(magic, 0L,
+ CompressionType.GZIP, TimestampType.CREATE_TIME, simpleRecords);
+
+ ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+ batch.setLastOffset(0L);
+
+ long offset = 0L;
+ for (Record record : batch)
+ assertEquals(offset++, record.offset());
+ }
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testInvalidWrapperOffsetV1() {
+ SimpleRecord[] simpleRecords = new SimpleRecord[] {
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes())
+ };
+
+ MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+ CompressionType.GZIP, TimestampType.CREATE_TIME, simpleRecords);
+
+ ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+ batch.setLastOffset(1L);
+
+ batch.iterator();
+ }
+
@Test(expected = IllegalArgumentException.class)
public void testSetNoTimestampTypeNotAllowed() {
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,