You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/03 16:40:41 UTC

kafka git commit: HOTFIX: Set `baseOffset` correctly in `RecordAccumulator`

Repository: kafka
Updated Branches:
  refs/heads/trunk 1ba8b40b3 -> f54b61909


HOTFIX: Set `baseOffset` correctly in `RecordAccumulator`

The bug meant that the base offset was the same as the batch size instead of 0 so the broker would always recompress batches.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #2794 from ijuma/fix-records-builder-construction


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f54b6190
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f54b6190
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f54b6190

Branch: refs/heads/trunk
Commit: f54b61909d525547d65123c02bbd36d92ccee5da
Parents: 1ba8b40
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Apr 3 09:40:38 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Apr 3 09:40:38 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/RecordAccumulator.java   |  2 +-
 .../kafka/common/record/MemoryRecords.java      | 21 +++++++++-----------
 .../internals/RecordAccumulatorTest.java        | 20 ++++++++++++++++++-
 3 files changed, 29 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f54b6190/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index e07d201..299356e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -233,7 +233,7 @@ public final class RecordAccumulator {
             throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
                     "support the required message format (v2). The broker must be version 0.11 or later.");
         }
-        return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, this.batchSize);
+        return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f54b6190/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 8c4e771..f810e39 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -281,15 +281,6 @@ public class MemoryRecords extends AbstractRecords {
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
                                                CompressionType compressionType,
                                                TimestampType timestampType,
-                                               int writeLimit) {
-        return new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, timestampType, 0L,
-                System.currentTimeMillis(), RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
-                RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, writeLimit);
-    }
-
-    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
-                                               CompressionType compressionType,
-                                               TimestampType timestampType,
                                                long baseOffset) {
         return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, timestampType, baseOffset);
     }
@@ -299,7 +290,10 @@ public class MemoryRecords extends AbstractRecords {
                                                CompressionType compressionType,
                                                TimestampType timestampType,
                                                long baseOffset) {
-        return builder(buffer, magic, compressionType, timestampType, baseOffset, System.currentTimeMillis());
+        long logAppendTime = RecordBatch.NO_TIMESTAMP;
+        if (timestampType == TimestampType.LOG_APPEND_TIME)
+            logAppendTime = System.currentTimeMillis();
+        return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime);
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@@ -323,7 +317,7 @@ public class MemoryRecords extends AbstractRecords {
                                                int baseSequence) {
         return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset,
                 logAppendTime, pid, epoch, baseSequence, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH,
-                buffer.capacity());
+                buffer.remaining());
     }
 
     public static MemoryRecords withRecords(CompressionType compressionType, SimpleRecord... records) {
@@ -357,8 +351,11 @@ public class MemoryRecords extends AbstractRecords {
             return MemoryRecords.EMPTY;
         int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records));
         ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
+        long logAppendTime = RecordBatch.NO_TIMESTAMP;
+        if (timestampType == TimestampType.LOG_APPEND_TIME)
+            logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset,
-                System.currentTimeMillis(), pid, epoch, baseSequence);
+                logAppendTime, pid, epoch, baseSequence);
         for (SimpleRecord record : records)
             builder.append(record);
         return builder.build();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f54b6190/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 9117e16..d152117 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -32,10 +32,12 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Test;
 
@@ -132,10 +134,26 @@ public class RecordAccumulatorTest {
     @Test
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
+        byte[] value = new byte[2 * batchSize];
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
                 CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null);
-        accum.append(tp1, 0L, key, new byte[2 * batchSize], null, maxBlockTimeMs);
+        accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
+
+        Deque<ProducerBatch> batches = accum.batches().get(tp1);
+        assertEquals(1, batches.size());
+        ProducerBatch producerBatch = batches.peek();
+        List<MutableRecordBatch> recordBatches = TestUtils.toList(producerBatch.records().batches());
+        assertEquals(1, recordBatches.size());
+        MutableRecordBatch recordBatch = recordBatches.get(0);
+        assertEquals(0L, recordBatch.baseOffset());
+        List<Record> records = TestUtils.toList(recordBatch);
+        assertEquals(1, records.size());
+        Record record = records.get(0);
+        assertEquals(0L, record.offset());
+        assertEquals(ByteBuffer.wrap(key), record.key());
+        assertEquals(ByteBuffer.wrap(value), record.value());
+        assertEquals(0L, record.timestamp());
     }
 
     @Test