You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/04/03 17:56:47 UTC
[kafka] branch 1.0 updated: KAFKA-6739;
Ignore headers when down-converting from V2 to V0/V1 (#4813)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new 3738ffe KAFKA-6739; Ignore headers when down-converting from V2 to V0/V1 (#4813)
3738ffe is described below
commit 3738ffe8989cad25f303303c5043f0c70dd7570f
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Tue Apr 3 10:39:20 2018 -0700
KAFKA-6739; Ignore headers when down-converting from V2 to V0/V1 (#4813)
Ignore headers when down-converting to V0/V1 since they are not supported. Added a test-case to verify down-conversion sanity in presence of headers.
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
.../org/apache/kafka/common/record/AbstractRecords.java | 9 +++++++--
.../org/apache/kafka/common/record/FileRecordsTest.java | 14 ++++++++++++--
2 files changed, 19 insertions(+), 4 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 2452798..89a5413 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -130,8 +130,13 @@ public abstract class AbstractRecords implements Records {
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
- for (Record record : recordBatchAndRecords.records)
- builder.append(record);
+ for (Record record : recordBatchAndRecords.records) {
+ // Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported
+ if (magic > RecordBatch.MAGIC_VALUE_V1)
+ builder.append(record);
+ else
+ builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
+ }
builder.close();
return builder;
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 6df7c2d..5cb7580 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -40,6 +42,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertArrayEquals;
public class FileRecordsTest {
@@ -331,6 +334,11 @@ public class FileRecordsTest {
private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException {
List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);
+
+ Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()),
+ new RecordHeader("headerKey2", "headerValue2".getBytes()),
+ new RecordHeader("headerKey3", "headerValue3".getBytes())};
+
List<SimpleRecord> records = asList(
new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
@@ -339,9 +347,10 @@ public class FileRecordsTest {
new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()),
new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
- new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes()),
+ new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers),
new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()),
- new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes()));
+ new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers));
+ assertEquals("incorrect test setup", offsets.size(), records.size());
ByteBuffer buffer = ByteBuffer.allocate(1024);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
@@ -425,6 +434,7 @@ public class FileRecordsTest {
assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+ assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers());
}
i += 1;
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.