You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/08 22:12:13 UTC
[kafka] branch 3.3 updated: KAFKA-14104; Add CRC validation when iterating over Metadata Log Records (#12457)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 2dafe45f15e KAFKA-14104; Add CRC validation when iterating over Metadata Log Records (#12457)
2dafe45f15e is described below
commit 2dafe45f15eb730a7a6bad258defd298ec173a08
Author: Niket <ni...@users.noreply.github.com>
AuthorDate: Mon Aug 8 15:03:04 2022 -0700
KAFKA-14104; Add CRC validation when iterating over Metadata Log Records (#12457)
This commit adds a check to ensure the RecordBatch CRC is valid when
iterating over a Batch of Records using the RecordsIterator. The
RecordsIterator is used by both Snapshot reads and Log Records reads in
Kraft. The check can be turned off by a class parameter and is on by default.
Reviewers: José Armando García Sancio <js...@users.noreply.github.com>
---
.../kafka/common/record/DefaultRecordBatch.java | 2 +-
.../kafka/server/RaftClusterSnapshotTest.scala | 3 +-
.../kafka/controller/QuorumControllerTest.java | 3 +-
.../org/apache/kafka/metalog/LocalLogManager.java | 3 +-
.../org/apache/kafka/raft/KafkaRaftClient.java | 10 +++-
.../kafka/raft/internals/RecordsBatchReader.java | 5 +-
.../kafka/raft/internals/RecordsIterator.java | 13 ++++-
.../kafka/snapshot/RecordsSnapshotReader.java | 5 +-
.../apache/kafka/raft/RaftEventSimulationTest.java | 2 +-
.../raft/internals/RecordsBatchReaderTest.java | 3 +-
.../kafka/raft/internals/RecordsIteratorTest.java | 61 +++++++++++++++++++---
.../kafka/snapshot/SnapshotWriterReaderTest.java | 5 +-
12 files changed, 92 insertions(+), 23 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index bd80981d84b..bc8f32491c0 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -107,7 +107,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
static final int PARTITION_LEADER_EPOCH_LENGTH = 4;
static final int MAGIC_OFFSET = PARTITION_LEADER_EPOCH_OFFSET + PARTITION_LEADER_EPOCH_LENGTH;
static final int MAGIC_LENGTH = 1;
- static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
+ public static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
static final int CRC_LENGTH = 4;
static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH;
static final int ATTRIBUTE_LENGTH = 2;
diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
index 503ce7d2bee..f8dccd17d0d 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
@@ -78,7 +78,8 @@ class RaftClusterSnapshotTest {
raftManager.replicatedLog.latestSnapshot.get(),
new MetadataRecordSerde(),
BufferSupplier.create(),
- 1
+ 1,
+ true
)
) { snapshot =>
// Check that the snapshot is non-empty
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index a62b1f682f0..2cdec699da2 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -738,7 +738,8 @@ public class QuorumControllerTest {
reader,
new MetadataRecordSerde(),
BufferSupplier.create(),
- Integer.MAX_VALUE
+ Integer.MAX_VALUE,
+ true
);
}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index c8e39ae3289..e24d86bd873 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -496,7 +496,8 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
snapshot.get(),
new MetadataRecordSerde(),
BufferSupplier.create(),
- Integer.MAX_VALUE
+ Integer.MAX_VALUE,
+ true
)
);
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 53372728aab..cac7a8a3cb9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -333,7 +333,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private Optional<SnapshotReader<T>> latestSnapshot() {
return log.latestSnapshot().map(reader ->
- RecordsSnapshotReader.of(reader, serde, BufferSupplier.create(), MAX_BATCH_SIZE_BYTES)
+ RecordsSnapshotReader.of(reader,
+ serde,
+ BufferSupplier.create(),
+ MAX_BATCH_SIZE_BYTES,
+ true /* Validate batch CRC*/
+ )
);
}
@@ -2519,7 +2524,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
serde,
BufferSupplier.create(),
MAX_BATCH_SIZE_BYTES,
- this
+ this,
+ true /* Validate batch CRC*/
)
);
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
index e95206100a3..61819a9dcca 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
@@ -100,11 +100,12 @@ public final class RecordsBatchReader<T> implements BatchReader<T> {
RecordSerde<T> serde,
BufferSupplier bufferSupplier,
int maxBatchSize,
- CloseListener<BatchReader<T>> closeListener
+ CloseListener<BatchReader<T>> closeListener,
+ boolean doCrcValidation
) {
return new RecordsBatchReader<>(
baseOffset,
- new RecordsIterator<>(records, serde, bufferSupplier, maxBatchSize),
+ new RecordsIterator<>(records, serde, bufferSupplier, maxBatchSize, doCrcValidation),
closeListener
);
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
index 866f541fb24..ff415aa72ad 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
@@ -41,6 +41,9 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
private final RecordSerde<T> serde;
private final BufferSupplier bufferSupplier;
private final int batchSize;
+ // Setting to true will make the RecordsIterator perform a CRC Validation
+ // on the batch header when iterating over them
+ private final boolean doCrcValidation;
private Iterator<MutableRecordBatch> nextBatches = Collections.emptyIterator();
private Optional<Batch<T>> nextBatch = Optional.empty();
@@ -54,12 +57,14 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
Records records,
RecordSerde<T> serde,
BufferSupplier bufferSupplier,
- int batchSize
+ int batchSize,
+ boolean doCrcValidation
) {
this.records = records;
this.serde = serde;
this.bufferSupplier = bufferSupplier;
this.batchSize = Math.max(batchSize, Records.HEADER_SIZE_UP_TO_MAGIC);
+ this.doCrcValidation = doCrcValidation;
}
@Override
@@ -163,7 +168,6 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
if (nextBatches.hasNext()) {
MutableRecordBatch nextBatch = nextBatches.next();
-
// Update the buffer position to reflect the read batch
allocatedBuffer.ifPresent(buffer -> buffer.position(buffer.position() + nextBatch.sizeInBytes()));
@@ -180,6 +184,11 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
}
private Batch<T> readBatch(DefaultRecordBatch batch) {
+ if (doCrcValidation) {
+ // Perform a CRC validity check on this batch
+ batch.ensureValid();
+ }
+
final Batch<T> result;
if (batch.isControlBatch()) {
result = Batch.control(
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
index 89ad2632229..92b695146c3 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
@@ -104,11 +104,12 @@ public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
RawSnapshotReader snapshot,
RecordSerde<T> serde,
BufferSupplier bufferSupplier,
- int maxBatchSize
+ int maxBatchSize,
+ boolean doCrcValidation
) {
return new RecordsSnapshotReader<>(
snapshot.snapshotId(),
- new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize)
+ new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize, doCrcValidation)
);
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 4f79dc18cc6..a6117a33ca0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -1112,7 +1112,7 @@ public class RaftEventSimulationTest {
startOffset.set(snapshotId.offset);
try (SnapshotReader<Integer> snapshot =
- RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE)) {
+ RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE, true)) {
// Expect only one batch with only one record
assertTrue(snapshot.hasNext());
Batch<Integer> batch = snapshot.next();
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
index 6fe540711c2..ae8b1dfb8e2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
@@ -100,7 +100,8 @@ class RecordsBatchReaderTest {
serde,
bufferSupplier,
MAX_BATCH_BYTES,
- closeListener
+ closeListener,
+ true
);
for (TestBatch<String> batch : expectedBatches) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
index 7d984893120..9dfbfd62fbf 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
@@ -30,7 +30,9 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.jqwik.api.ForAll;
import net.jqwik.api.Property;
+import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
@@ -42,6 +44,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -60,7 +63,7 @@ public final class RecordsIteratorTest {
@ParameterizedTest
@MethodSource("emptyRecords")
void testEmptyRecords(Records records) {
- testIterator(Collections.emptyList(), records);
+ testIterator(Collections.emptyList(), records, true);
}
@Property
@@ -71,7 +74,7 @@ public final class RecordsIteratorTest {
List<TestBatch<String>> batches = createBatches(seed);
MemoryRecords memRecords = buildRecords(compressionType, batches);
- testIterator(batches, memRecords);
+ testIterator(batches, memRecords, true);
}
@Property
@@ -85,18 +88,58 @@ public final class RecordsIteratorTest {
FileRecords fileRecords = FileRecords.open(TestUtils.tempFile());
fileRecords.append(memRecords);
- testIterator(batches, fileRecords);
+ testIterator(batches, fileRecords, true);
+ fileRecords.close();
+ }
+
+ @Property
+ public void testCrcValidation(
+ @ForAll CompressionType compressionType,
+ @ForAll long seed
+ ) throws IOException {
+ List<TestBatch<String>> batches = createBatches(seed);
+ MemoryRecords memRecords = buildRecords(compressionType, batches);
+ // Read the Batch CRC for the first batch from the buffer
+ ByteBuffer readBuf = memRecords.buffer();
+ readBuf.position(DefaultRecordBatch.CRC_OFFSET);
+ int actualCrc = readBuf.getInt();
+ // Corrupt the CRC on the first batch
+ memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc + 1);
+
+ assertThrows(CorruptRecordException.class, () -> testIterator(batches, memRecords, true));
+
+ FileRecords fileRecords = FileRecords.open(TestUtils.tempFile());
+ fileRecords.append(memRecords);
+ assertThrows(CorruptRecordException.class, () -> testIterator(batches, fileRecords, true));
+
+ // Verify check does not trigger when doCrcValidation is false
+ assertDoesNotThrow(() -> testIterator(batches, memRecords, false));
+ assertDoesNotThrow(() -> testIterator(batches, fileRecords, false));
+
+ // Fix the corruption
+ memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc);
+
+ // Verify check does not trigger when the corruption is fixed
+ assertDoesNotThrow(() -> testIterator(batches, memRecords, true));
+ FileRecords moreFileRecords = FileRecords.open(TestUtils.tempFile());
+ moreFileRecords.append(memRecords);
+ assertDoesNotThrow(() -> testIterator(batches, moreFileRecords, true));
+
+ fileRecords.close();
+ moreFileRecords.close();
}
private void testIterator(
List<TestBatch<String>> expectedBatches,
- Records records
+ Records records,
+ boolean validateCrc
) {
Set<ByteBuffer> allocatedBuffers = Collections.newSetFromMap(new IdentityHashMap<>());
RecordsIterator<String> iterator = createIterator(
records,
- mockBufferSupplier(allocatedBuffers)
+ mockBufferSupplier(allocatedBuffers),
+ validateCrc
);
for (TestBatch<String> batch : expectedBatches) {
@@ -111,8 +154,12 @@ public final class RecordsIteratorTest {
assertEquals(Collections.emptySet(), allocatedBuffers);
}
- static RecordsIterator<String> createIterator(Records records, BufferSupplier bufferSupplier) {
- return new RecordsIterator<>(records, STRING_SERDE, bufferSupplier, Records.HEADER_SIZE_UP_TO_MAGIC);
+ static RecordsIterator<String> createIterator(
+ Records records,
+ BufferSupplier bufferSupplier,
+ boolean validateCrc
+ ) {
+ return new RecordsIterator<>(records, STRING_SERDE, bufferSupplier, Records.HEADER_SIZE_UP_TO_MAGIC, validateCrc);
}
static BufferSupplier mockBufferSupplier(Set<ByteBuffer> buffers) {
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
index 05d1929f271..cd86c709ff9 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
@@ -192,7 +192,8 @@ final public class SnapshotWriterReaderTest {
context.log.readSnapshot(snapshotId).get(),
context.serde,
BufferSupplier.create(),
- maxBatchSize
+ maxBatchSize,
+ true
);
}
@@ -246,7 +247,7 @@ final public class SnapshotWriterReaderTest {
public static void assertSnapshot(List<List<String>> batches, RawSnapshotReader reader) {
assertSnapshot(
batches,
- RecordsSnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE)
+ RecordsSnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE, true)
);
}