You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/04/03 17:56:47 UTC

[kafka] branch 1.0 updated: KAFKA-6739; Ignore headers when down-converting from V2 to V0/V1 (#4813)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 3738ffe  KAFKA-6739; Ignore headers when down-converting from V2 to V0/V1 (#4813)
3738ffe is described below

commit 3738ffe8989cad25f303303c5043f0c70dd7570f
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Tue Apr 3 10:39:20 2018 -0700

    KAFKA-6739; Ignore headers when down-converting from V2 to V0/V1 (#4813)
    
    Ignore headers when down-converting to V0/V1 since they are not supported. Added a test-case to verify down-conversion sanity in presence of headers.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 .../org/apache/kafka/common/record/AbstractRecords.java    |  9 +++++++--
 .../org/apache/kafka/common/record/FileRecordsTest.java    | 14 ++++++++++++--
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 2452798..89a5413 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -130,8 +130,13 @@ public abstract class AbstractRecords implements Records {
 
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
                 timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
-        for (Record record : recordBatchAndRecords.records)
-            builder.append(record);
+        for (Record record : recordBatchAndRecords.records) {
+            // Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported
+            if (magic > RecordBatch.MAGIC_VALUE_V1)
+                builder.append(record);
+            else
+                builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
+        }
 
         builder.close();
         return builder;
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 6df7c2d..5cb7580 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -40,6 +42,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertArrayEquals;
 
 public class FileRecordsTest {
 
@@ -331,6 +334,11 @@ public class FileRecordsTest {
 
     private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException {
         List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);
+
+        Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()),
+                            new RecordHeader("headerKey2", "headerValue2".getBytes()),
+                            new RecordHeader("headerKey3", "headerValue3".getBytes())};
+
         List<SimpleRecord> records = asList(
                 new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
                 new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
@@ -339,9 +347,10 @@ public class FileRecordsTest {
                 new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
                 new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()),
                 new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
-                new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes()),
+                new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers),
                 new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()),
-                new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes()));
+                new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers));
+        assertEquals("incorrect test setup", offsets.size(), records.size());
 
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
@@ -425,6 +434,7 @@ public class FileRecordsTest {
                     assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
                     assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
                     assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
+                    assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers());
                 }
                 i += 1;
             }

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.