You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/03 16:40:41 UTC
kafka git commit: HOTFIX: Set `baseOffset` correctly in
`RecordAccumulator`
Repository: kafka
Updated Branches:
refs/heads/trunk 1ba8b40b3 -> f54b61909
HOTFIX: Set `baseOffset` correctly in `RecordAccumulator`
The bug meant that the base offset was the same as the batch size instead of 0 so the broker would always recompress batches.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #2794 from ijuma/fix-records-builder-construction
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f54b6190
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f54b6190
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f54b6190
Branch: refs/heads/trunk
Commit: f54b61909d525547d65123c02bbd36d92ccee5da
Parents: 1ba8b40
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Apr 3 09:40:38 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Apr 3 09:40:38 2017 -0700
----------------------------------------------------------------------
.../producer/internals/RecordAccumulator.java | 2 +-
.../kafka/common/record/MemoryRecords.java | 21 +++++++++-----------
.../internals/RecordAccumulatorTest.java | 20 ++++++++++++++++++-
3 files changed, 29 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f54b6190/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index e07d201..299356e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -233,7 +233,7 @@ public final class RecordAccumulator {
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
"support the required message format (v2). The broker must be version 0.11 or later.");
}
- return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, this.batchSize);
+ return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/f54b6190/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 8c4e771..f810e39 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -281,15 +281,6 @@ public class MemoryRecords extends AbstractRecords {
public static MemoryRecordsBuilder builder(ByteBuffer buffer,
CompressionType compressionType,
TimestampType timestampType,
- int writeLimit) {
- return new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, timestampType, 0L,
- System.currentTimeMillis(), RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false,
- RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, writeLimit);
- }
-
- public static MemoryRecordsBuilder builder(ByteBuffer buffer,
- CompressionType compressionType,
- TimestampType timestampType,
long baseOffset) {
return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, timestampType, baseOffset);
}
@@ -299,7 +290,10 @@ public class MemoryRecords extends AbstractRecords {
CompressionType compressionType,
TimestampType timestampType,
long baseOffset) {
- return builder(buffer, magic, compressionType, timestampType, baseOffset, System.currentTimeMillis());
+ long logAppendTime = RecordBatch.NO_TIMESTAMP;
+ if (timestampType == TimestampType.LOG_APPEND_TIME)
+ logAppendTime = System.currentTimeMillis();
+ return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime);
}
public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@@ -323,7 +317,7 @@ public class MemoryRecords extends AbstractRecords {
int baseSequence) {
return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset,
logAppendTime, pid, epoch, baseSequence, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH,
- buffer.capacity());
+ buffer.remaining());
}
public static MemoryRecords withRecords(CompressionType compressionType, SimpleRecord... records) {
@@ -357,8 +351,11 @@ public class MemoryRecords extends AbstractRecords {
return MemoryRecords.EMPTY;
int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records));
ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
+ long logAppendTime = RecordBatch.NO_TIMESTAMP;
+ if (timestampType == TimestampType.LOG_APPEND_TIME)
+ logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset,
- System.currentTimeMillis(), pid, epoch, baseSequence);
+ logAppendTime, pid, epoch, baseSequence);
for (SimpleRecord record : records)
builder.append(record);
return builder.build();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f54b6190/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 9117e16..d152117 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -32,10 +32,12 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Test;
@@ -132,10 +134,26 @@ public class RecordAccumulatorTest {
@Test
public void testAppendLarge() throws Exception {
int batchSize = 512;
+ byte[] value = new byte[2 * batchSize];
RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null);
- accum.append(tp1, 0L, key, new byte[2 * batchSize], null, maxBlockTimeMs);
+ accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
+
+ Deque<ProducerBatch> batches = accum.batches().get(tp1);
+ assertEquals(1, batches.size());
+ ProducerBatch producerBatch = batches.peek();
+ List<MutableRecordBatch> recordBatches = TestUtils.toList(producerBatch.records().batches());
+ assertEquals(1, recordBatches.size());
+ MutableRecordBatch recordBatch = recordBatches.get(0);
+ assertEquals(0L, recordBatch.baseOffset());
+ List<Record> records = TestUtils.toList(recordBatch);
+ assertEquals(1, records.size());
+ Record record = records.get(0);
+ assertEquals(0L, record.offset());
+ assertEquals(ByteBuffer.wrap(key), record.key());
+ assertEquals(ByteBuffer.wrap(value), record.value());
+ assertEquals(0L, record.timestamp());
}
@Test