You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/08 22:12:13 UTC

[kafka] branch 3.3 updated: KAFKA-14104; Add CRC validation when iterating over Metadata Log Records (#12457)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 2dafe45f15e KAFKA-14104; Add CRC validation when iterating over Metadata Log Records (#12457)
2dafe45f15e is described below

commit 2dafe45f15eb730a7a6bad258defd298ec173a08
Author: Niket <ni...@users.noreply.github.com>
AuthorDate: Mon Aug 8 15:03:04 2022 -0700

    KAFKA-14104; Add CRC validation when iterating over Metadata Log Records (#12457)
    
    This commit adds a check to ensure the RecordBatch CRC is valid when
    iterating over a Batch of Records using the RecordsIterator. The
    RecordsIterator is used by both Snapshot reads and Log Records reads in
    Kraft. The check can be turned off by a class parameter and is on by default.
    
    Reviewers: José Armando García Sancio <js...@users.noreply.github.com>
---
 .../kafka/common/record/DefaultRecordBatch.java    |  2 +-
 .../kafka/server/RaftClusterSnapshotTest.scala     |  3 +-
 .../kafka/controller/QuorumControllerTest.java     |  3 +-
 .../org/apache/kafka/metalog/LocalLogManager.java  |  3 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 10 +++-
 .../kafka/raft/internals/RecordsBatchReader.java   |  5 +-
 .../kafka/raft/internals/RecordsIterator.java      | 13 ++++-
 .../kafka/snapshot/RecordsSnapshotReader.java      |  5 +-
 .../apache/kafka/raft/RaftEventSimulationTest.java |  2 +-
 .../raft/internals/RecordsBatchReaderTest.java     |  3 +-
 .../kafka/raft/internals/RecordsIteratorTest.java  | 61 +++++++++++++++++++---
 .../kafka/snapshot/SnapshotWriterReaderTest.java   |  5 +-
 12 files changed, 92 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index bd80981d84b..bc8f32491c0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -107,7 +107,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
     static final int PARTITION_LEADER_EPOCH_LENGTH = 4;
     static final int MAGIC_OFFSET = PARTITION_LEADER_EPOCH_OFFSET + PARTITION_LEADER_EPOCH_LENGTH;
     static final int MAGIC_LENGTH = 1;
-    static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
+    public static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
     static final int CRC_LENGTH = 4;
     static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH;
     static final int ATTRIBUTE_LENGTH = 2;
diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
index 503ce7d2bee..f8dccd17d0d 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
@@ -78,7 +78,8 @@ class RaftClusterSnapshotTest {
             raftManager.replicatedLog.latestSnapshot.get(),
             new MetadataRecordSerde(),
             BufferSupplier.create(),
-            1
+            1,
+            true
           )
         ) { snapshot =>
           // Check that the snapshot is non-empty
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index a62b1f682f0..2cdec699da2 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -738,7 +738,8 @@ public class QuorumControllerTest {
             reader,
             new MetadataRecordSerde(),
             BufferSupplier.create(),
-            Integer.MAX_VALUE
+            Integer.MAX_VALUE,
+            true
         );
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index c8e39ae3289..e24d86bd873 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -496,7 +496,8 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
                                         snapshot.get(),
                                         new  MetadataRecordSerde(),
                                         BufferSupplier.create(),
-                                        Integer.MAX_VALUE
+                                        Integer.MAX_VALUE,
+                                        true
                                     )
                                 );
                             }
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 53372728aab..cac7a8a3cb9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -333,7 +333,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
 
     private Optional<SnapshotReader<T>> latestSnapshot() {
         return log.latestSnapshot().map(reader ->
-            RecordsSnapshotReader.of(reader, serde, BufferSupplier.create(), MAX_BATCH_SIZE_BYTES)
+            RecordsSnapshotReader.of(reader,
+                serde,
+                BufferSupplier.create(),
+                MAX_BATCH_SIZE_BYTES,
+                true /* Validate batch CRC*/
+            )
         );
     }
 
@@ -2519,7 +2524,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
                     serde,
                     BufferSupplier.create(),
                     MAX_BATCH_SIZE_BYTES,
-                    this
+                    this,
+                    true /* Validate batch CRC*/
                 )
             );
         }
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
index e95206100a3..61819a9dcca 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
@@ -100,11 +100,12 @@ public final class RecordsBatchReader<T> implements BatchReader<T> {
         RecordSerde<T> serde,
         BufferSupplier bufferSupplier,
         int maxBatchSize,
-        CloseListener<BatchReader<T>> closeListener
+        CloseListener<BatchReader<T>> closeListener,
+        boolean doCrcValidation
     ) {
         return new RecordsBatchReader<>(
             baseOffset,
-            new RecordsIterator<>(records, serde, bufferSupplier, maxBatchSize),
+            new RecordsIterator<>(records, serde, bufferSupplier, maxBatchSize, doCrcValidation),
             closeListener
         );
     }
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
index 866f541fb24..ff415aa72ad 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
@@ -41,6 +41,9 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
     private final RecordSerde<T> serde;
     private final BufferSupplier bufferSupplier;
     private final int batchSize;
+    // Setting to true will make the RecordsIterator perform a CRC Validation
+    // on the batch header when iterating over them
+    private final boolean doCrcValidation;
 
     private Iterator<MutableRecordBatch> nextBatches = Collections.emptyIterator();
     private Optional<Batch<T>> nextBatch = Optional.empty();
@@ -54,12 +57,14 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
         Records records,
         RecordSerde<T> serde,
         BufferSupplier bufferSupplier,
-        int batchSize
+        int batchSize,
+        boolean doCrcValidation
     ) {
         this.records = records;
         this.serde = serde;
         this.bufferSupplier = bufferSupplier;
         this.batchSize = Math.max(batchSize, Records.HEADER_SIZE_UP_TO_MAGIC);
+        this.doCrcValidation = doCrcValidation;
     }
 
     @Override
@@ -163,7 +168,6 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
 
         if (nextBatches.hasNext()) {
             MutableRecordBatch nextBatch = nextBatches.next();
-
             // Update the buffer position to reflect the read batch
             allocatedBuffer.ifPresent(buffer -> buffer.position(buffer.position() + nextBatch.sizeInBytes()));
 
@@ -180,6 +184,11 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
     }
 
     private Batch<T> readBatch(DefaultRecordBatch batch) {
+        if (doCrcValidation) {
+            // Perform a CRC validity check on this batch
+            batch.ensureValid();
+        }
+
         final Batch<T> result;
         if (batch.isControlBatch()) {
             result = Batch.control(
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
index 89ad2632229..92b695146c3 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
@@ -104,11 +104,12 @@ public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
         RawSnapshotReader snapshot,
         RecordSerde<T> serde,
         BufferSupplier bufferSupplier,
-        int maxBatchSize
+        int maxBatchSize,
+        boolean doCrcValidation
     ) {
         return new RecordsSnapshotReader<>(
             snapshot.snapshotId(),
-            new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize)
+            new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize, doCrcValidation)
         );
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 4f79dc18cc6..a6117a33ca0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -1112,7 +1112,7 @@ public class RaftEventSimulationTest {
                 startOffset.set(snapshotId.offset);
 
                 try (SnapshotReader<Integer> snapshot =
-                        RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE)) {
+                        RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE, true)) {
                     // Expect only one batch with only one record
                     assertTrue(snapshot.hasNext());
                     Batch<Integer> batch = snapshot.next();
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
index 6fe540711c2..ae8b1dfb8e2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
@@ -100,7 +100,8 @@ class RecordsBatchReaderTest {
             serde,
             bufferSupplier,
             MAX_BATCH_BYTES,
-            closeListener
+            closeListener,
+            true
         );
 
         for (TestBatch<String> batch : expectedBatches) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
index 7d984893120..9dfbfd62fbf 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
@@ -30,7 +30,9 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import net.jqwik.api.ForAll;
 import net.jqwik.api.Property;
+import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Records;
@@ -42,6 +44,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mockito;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -60,7 +63,7 @@ public final class RecordsIteratorTest {
     @ParameterizedTest
     @MethodSource("emptyRecords")
     void testEmptyRecords(Records records) {
-        testIterator(Collections.emptyList(), records);
+        testIterator(Collections.emptyList(), records, true);
     }
 
     @Property
@@ -71,7 +74,7 @@ public final class RecordsIteratorTest {
         List<TestBatch<String>> batches = createBatches(seed);
 
         MemoryRecords memRecords = buildRecords(compressionType, batches);
-        testIterator(batches, memRecords);
+        testIterator(batches, memRecords, true);
     }
 
     @Property
@@ -85,18 +88,58 @@ public final class RecordsIteratorTest {
         FileRecords fileRecords = FileRecords.open(TestUtils.tempFile());
         fileRecords.append(memRecords);
 
-        testIterator(batches, fileRecords);
+        testIterator(batches, fileRecords, true);
+        fileRecords.close();
+    }
+
+    @Property
+    public void testCrcValidation(
+            @ForAll CompressionType compressionType,
+            @ForAll long seed
+    ) throws IOException {
+        List<TestBatch<String>> batches = createBatches(seed);
+        MemoryRecords memRecords = buildRecords(compressionType, batches);
+        // Read the Batch CRC for the first batch from the buffer
+        ByteBuffer readBuf = memRecords.buffer();
+        readBuf.position(DefaultRecordBatch.CRC_OFFSET);
+        int actualCrc = readBuf.getInt();
+        // Corrupt the CRC on the first batch
+        memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc + 1);
+
+        assertThrows(CorruptRecordException.class, () -> testIterator(batches, memRecords, true));
+
+        FileRecords fileRecords = FileRecords.open(TestUtils.tempFile());
+        fileRecords.append(memRecords);
+        assertThrows(CorruptRecordException.class, () -> testIterator(batches, fileRecords, true));
+
+        // Verify check does not trigger when doCrcValidation is false
+        assertDoesNotThrow(() -> testIterator(batches, memRecords, false));
+        assertDoesNotThrow(() -> testIterator(batches, fileRecords, false));
+
+        // Fix the corruption
+        memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc);
+
+        // Verify check does not trigger when the corruption is fixed
+        assertDoesNotThrow(() -> testIterator(batches, memRecords, true));
+        FileRecords moreFileRecords = FileRecords.open(TestUtils.tempFile());
+        moreFileRecords.append(memRecords);
+        assertDoesNotThrow(() -> testIterator(batches, moreFileRecords, true));
+
+        fileRecords.close();
+        moreFileRecords.close();
     }
 
     private void testIterator(
         List<TestBatch<String>> expectedBatches,
-        Records records
+        Records records,
+        boolean validateCrc
     ) {
         Set<ByteBuffer> allocatedBuffers = Collections.newSetFromMap(new IdentityHashMap<>());
 
         RecordsIterator<String> iterator = createIterator(
             records,
-            mockBufferSupplier(allocatedBuffers)
+            mockBufferSupplier(allocatedBuffers),
+            validateCrc
         );
 
         for (TestBatch<String> batch : expectedBatches) {
@@ -111,8 +154,12 @@ public final class RecordsIteratorTest {
         assertEquals(Collections.emptySet(), allocatedBuffers);
     }
 
-    static RecordsIterator<String> createIterator(Records records, BufferSupplier bufferSupplier) {
-        return new RecordsIterator<>(records, STRING_SERDE, bufferSupplier, Records.HEADER_SIZE_UP_TO_MAGIC);
+    static RecordsIterator<String> createIterator(
+        Records records,
+        BufferSupplier bufferSupplier,
+        boolean validateCrc
+    ) {
+        return new RecordsIterator<>(records, STRING_SERDE, bufferSupplier, Records.HEADER_SIZE_UP_TO_MAGIC, validateCrc);
     }
 
     static BufferSupplier mockBufferSupplier(Set<ByteBuffer> buffers) {
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
index 05d1929f271..cd86c709ff9 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
@@ -192,7 +192,8 @@ final public class SnapshotWriterReaderTest {
             context.log.readSnapshot(snapshotId).get(),
             context.serde,
             BufferSupplier.create(),
-            maxBatchSize
+            maxBatchSize,
+            true
         );
     }
 
@@ -246,7 +247,7 @@ final public class SnapshotWriterReaderTest {
     public static void assertSnapshot(List<List<String>> batches, RawSnapshotReader reader) {
         assertSnapshot(
             batches,
-            RecordsSnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE)
+            RecordsSnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE, true)
         );
     }