You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/21 16:36:51 UTC

[kafka] branch trunk updated: KAFKA-8570; Grow buffer to hold down converted records if it was insufficiently sized (#6974)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5f8b289  KAFKA-8570; Grow buffer to hold down converted records if it was insufficiently sized (#6974)
5f8b289 is described below

commit 5f8b2898ce512b9d1390857e036bb3a896dca170
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Fri Jun 21 09:36:29 2019 -0700

    KAFKA-8570; Grow buffer to hold down converted records if it was insufficiently sized (#6974)
    
    When the log contains out of order message formats (for example v2 message followed by v1 message) and consists of compressed batches typically greater than 1kB in size, it is possible for down-conversion to fail. With compressed batches, we estimate the size of down-converted batches using:
    
    ```
        private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
            return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
        }
    ```
    
    This almost always underestimates size of down-converted records if the batch is between 1kB-64kB in size. In general, this means we may under estimate the total size required for compressed batches.
    
    Because of an implicit assumption in the code that messages with a lower message format appear before any with a higher message format, we do not grow the buffer we copy the down converted records into when we see a message <= the target message format. This assumption becomes incorrect when the log contains out of order message formats, for example because of leaders flapping while upgrading the message format.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../apache/kafka/common/record/RecordsUtil.java    |  3 ++
 .../kafka/common/record/FileRecordsTest.java       | 34 ++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
index 3b0c59a..423d1e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -80,9 +81,11 @@ public class RecordsUtil {
         ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate);
         long temporaryMemoryBytes = 0;
         int numRecordsConverted = 0;
+
         for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
             temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
             if (recordBatchAndRecords.batch.magic() <= toMagic) {
+                buffer = Utils.ensureCapacity(buffer, buffer.position() + recordBatchAndRecords.batch.sizeInBytes());
                 recordBatchAndRecords.batch.writeTo(buffer);
             } else {
                 MemoryRecordsBuilder builder = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 20ecba1..bf79987 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.utf8;
@@ -435,6 +436,39 @@ public class FileRecordsTest {
     }
 
     @Test
+    public void testDownconversionAfterMessageFormatDowngrade() throws IOException {
+        // random bytes
+        Random random = new Random();
+        byte[] bytes = new byte[3000];
+        random.nextBytes(bytes);
+
+        // records
+        CompressionType compressionType = CompressionType.GZIP;
+        List<Long> offsets = asList(0L, 1L);
+        List<Byte> magic = asList(RecordBatch.MAGIC_VALUE_V2, RecordBatch.MAGIC_VALUE_V1);  // downgrade message format from v2 to v1
+        List<SimpleRecord> records = asList(
+                new SimpleRecord(1L, "k1".getBytes(), bytes),
+                new SimpleRecord(2L, "k2".getBytes(), bytes));
+        byte toMagic = 1;
+
+        // create MemoryRecords
+        ByteBuffer buffer = ByteBuffer.allocate(8000);
+        for (int i = 0; i < records.size(); i++) {
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic.get(i), compressionType, TimestampType.CREATE_TIME, 0L);
+            builder.appendWithOffset(offsets.get(i), records.get(i));
+            builder.close();
+        }
+        buffer.flip();
+
+        // create FileRecords, down-convert and verify
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            fileRecords.append(MemoryRecords.readableRecords(buffer));
+            fileRecords.flush();
+            downConvertAndVerifyRecords(records, offsets, fileRecords, compressionType, toMagic, 0L, time);
+        }
+    }
+
+    @Test
     public void testConversion() throws IOException {
         doTestConversion(CompressionType.NONE, RecordBatch.MAGIC_VALUE_V0);
         doTestConversion(CompressionType.GZIP, RecordBatch.MAGIC_VALUE_V0);