You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/02 08:44:22 UTC

kafka git commit: KAFKA-5365; Fix regression in compressed message iteration affecting magic v0 and v1

Repository: kafka
Updated Branches:
  refs/heads/trunk 1188db565 -> cd507f36a


KAFKA-5365; Fix regression in compressed message iteration affecting magic v0 and v1

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3203 from hachikuji/KAFKA-5365


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cd507f36
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cd507f36
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cd507f36

Branch: refs/heads/trunk
Commit: cd507f36a0ee23a15c2088cbd4efd45d2d82051d
Parents: 1188db5
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Jun 2 09:29:29 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jun 2 09:29:29 2017 +0100

----------------------------------------------------------------------
 .../record/AbstractLegacyRecordBatch.java       | 62 ++++++++++++--------
 .../kafka/common/record/DefaultRecordBatch.java |  1 -
 .../record/AbstractLegacyRecordBatchTest.java   | 39 ++++++++++++
 3 files changed, 76 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cd507f36/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 9b74d06..eaf691e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -31,6 +31,7 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayDeque;
+import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
@@ -222,7 +223,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
      * @return An iterator over the records contained within this batch
      */
     @Override
-    public CloseableIterator<Record> iterator() {
+    public Iterator<Record> iterator() {
         return iterator(BufferSupplier.NO_CACHING);
     }
 
@@ -307,14 +308,18 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
     }
 
     private static class DeepRecordsIterator extends AbstractIterator<Record> implements CloseableIterator<Record> {
-        private final ArrayDeque<AbstractLegacyRecordBatch> batches;
+        private final ArrayDeque<AbstractLegacyRecordBatch> innerEntries;
         private final long absoluteBaseOffset;
         private final byte wrapperMagic;
 
-        private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry, boolean ensureMatchingMagic,
-                                    int maxMessageSize, BufferSupplier bufferSupplier) {
+        private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry,
+                                    boolean ensureMatchingMagic,
+                                    int maxMessageSize,
+                                    BufferSupplier bufferSupplier) {
             LegacyRecord wrapperRecord = wrapperEntry.outerRecord();
             this.wrapperMagic = wrapperRecord.magic();
+            if (wrapperMagic != RecordBatch.MAGIC_VALUE_V0 && wrapperMagic != RecordBatch.MAGIC_VALUE_V1)
+                throw new InvalidRecordException("Invalid wrapper magic found in legacy deep record iterator " + wrapperMagic);
 
             CompressionType compressionType = wrapperRecord.compressionType();
             ByteBuffer wrapperValue = wrapperRecord.value();
@@ -325,47 +330,54 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
             InputStream stream = compressionType.wrapForInput(wrapperValue, wrapperRecord.magic(), bufferSupplier);
             LogInputStream<AbstractLegacyRecordBatch> logStream = new DataLogInputStream(stream, maxMessageSize);
 
-            long wrapperRecordOffset = wrapperEntry.lastOffset();
-            long wrapperRecordTimestamp = wrapperRecord.timestamp();
-            this.batches = new ArrayDeque<>();
+            long lastOffsetFromWrapper = wrapperEntry.lastOffset();
+            long timestampFromWrapper = wrapperRecord.timestamp();
+            this.innerEntries = new ArrayDeque<>();
 
             // If relative offset is used, we need to decompress the entire message first to compute
             // the absolute offset. For simplicity and because it's a format that is on its way out, we
             // do the same for message format version 0
             try {
                 while (true) {
-                    AbstractLegacyRecordBatch batch = logStream.nextBatch();
-                    if (batch == null)
+                    AbstractLegacyRecordBatch innerEntry = logStream.nextBatch();
+                    if (innerEntry == null)
                         break;
 
-                    LegacyRecord record = batch.outerRecord();
+                    LegacyRecord record = innerEntry.outerRecord();
                     byte magic = record.magic();
 
                     if (ensureMatchingMagic && magic != wrapperMagic)
                         throw new InvalidRecordException("Compressed message magic " + magic +
                                 " does not match wrapper magic " + wrapperMagic);
 
-                    if (magic > RecordBatch.MAGIC_VALUE_V0) {
+                    if (magic == RecordBatch.MAGIC_VALUE_V1) {
                         LegacyRecord recordWithTimestamp = new LegacyRecord(
                                 record.buffer(),
-                                wrapperRecordTimestamp,
+                                timestampFromWrapper,
                                 wrapperRecord.timestampType());
-                        batch = new BasicLegacyRecordBatch(batch.lastOffset(), recordWithTimestamp);
+                        innerEntry = new BasicLegacyRecordBatch(innerEntry.lastOffset(), recordWithTimestamp);
                     }
-                    batches.addLast(batch);
 
-                    // break early if we reach the last offset in the batch
-                    if (batch.offset() == wrapperRecordOffset)
-                        break;
+                    innerEntries.addLast(innerEntry);
                 }
 
-                if (batches.isEmpty())
+                if (innerEntries.isEmpty())
                     throw new InvalidRecordException("Found invalid compressed record set with no inner records");
 
-                if (wrapperMagic > RecordBatch.MAGIC_VALUE_V0)
-                    this.absoluteBaseOffset = wrapperRecordOffset - batches.getLast().lastOffset();
-                else
+                if (wrapperMagic == RecordBatch.MAGIC_VALUE_V1) {
+                    if (lastOffsetFromWrapper == 0) {
+                        // The outer offset may be 0 if this is produce data from an older client.
+                        this.absoluteBaseOffset = 0;
+                    } else {
+                        long lastInnerOffset = innerEntries.getLast().offset();
+                        if (lastOffsetFromWrapper < lastInnerOffset)
+                            throw new InvalidRecordException("Found invalid wrapper offset in compressed v1 message set: "
+                                    + lastOffsetFromWrapper);
+                        this.absoluteBaseOffset = lastOffsetFromWrapper - lastInnerOffset;
+                    }
+                } else {
                     this.absoluteBaseOffset = -1;
+                }
             } catch (IOException e) {
                 throw new KafkaException(e);
             } finally {
@@ -375,14 +387,14 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
 
         @Override
         protected Record makeNext() {
-            if (batches.isEmpty())
+            if (innerEntries.isEmpty())
                 return allDone();
 
-            AbstractLegacyRecordBatch entry = batches.remove();
+            AbstractLegacyRecordBatch entry = innerEntries.remove();
 
             // Convert offset to absolute offset if needed.
-            if (absoluteBaseOffset >= 0) {
-                long absoluteOffset = absoluteBaseOffset + entry.lastOffset();
+            if (wrapperMagic == RecordBatch.MAGIC_VALUE_V1) {
+                long absoluteOffset = absoluteBaseOffset + entry.offset();
                 entry = new BasicLegacyRecordBatch(absoluteOffset, entry.outerRecord());
             }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd507f36/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index c05cab8..ab0813a 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -290,7 +290,6 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
         }
     }
 
-
     @Override
     public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
         if (isCompressed())

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd507f36/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
index 0f01f2a..df6a16b 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.record.AbstractLegacyRecordBatch.ByteBufferLegacy
 import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -56,6 +57,44 @@ public class AbstractLegacyRecordBatchTest {
             assertEquals(offset++, record.offset());
     }
 
+    @Test
+    public void testIterateCompressedRecordWithWrapperOffsetZero() {
+        for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1)) {
+            SimpleRecord[] simpleRecords = new SimpleRecord[] {
+                new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+                new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+                new SimpleRecord(3L, "c".getBytes(), "3".getBytes())
+            };
+
+            MemoryRecords records = MemoryRecords.withRecords(magic, 0L,
+                    CompressionType.GZIP, TimestampType.CREATE_TIME, simpleRecords);
+
+            ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+            batch.setLastOffset(0L);
+
+            long offset = 0L;
+            for (Record record : batch)
+                assertEquals(offset++, record.offset());
+        }
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testInvalidWrapperOffsetV1() {
+        SimpleRecord[] simpleRecords = new SimpleRecord[] {
+            new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+            new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+            new SimpleRecord(3L, "c".getBytes(), "3".getBytes())
+        };
+
+        MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+                CompressionType.GZIP, TimestampType.CREATE_TIME, simpleRecords);
+
+        ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
+        batch.setLastOffset(1L);
+
+        batch.iterator();
+    }
+
     @Test(expected = IllegalArgumentException.class)
     public void testSetNoTimestampTypeNotAllowed() {
         MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,