You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/28 22:59:40 UTC

[GitHub] [kafka] niket-goel opened a new pull request, #12457: WIP -- added some CRC checking

niket-goel opened a new pull request, #12457:
URL: https://github.com/apache/kafka/pull/12457

   1. For Metadata Log -- During UnifiedLog::Read()
   2. For Snapshots -- During RecordsIterator::nextsBatch()
   3. Thinking of performing delimiter validation -- RecordsSnapshotReader::of()


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12457: WIP -- added some CRC checking

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12457:
URL: https://github.com/apache/kafka/pull/12457#discussion_r933489518


##########
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java:
##########
@@ -106,6 +106,7 @@ public static <T> RecordsSnapshotReader<T> of(
         BufferSupplier bufferSupplier,
         int maxBatchSize
     ) {
+        // TODO: Is this a good place to perform delimeter check (i.e. existence of header and footer?)

Review Comment:
   That is the downside, yes. We could also seek to the end of the snapshot file and just read the first and last batch. More ideal w.r.t performance would be to just check for the existence of the footer as you are iterating through the snapshot. You would still realize that you have an incomplete snapshot, but you would have applied a part of that incomplete snapshot to your memory (not sure if that poses a correctness risk).



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1216,7 +1216,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       case FetchHighWatermark => fetchHighWatermarkMetadata
       case FetchTxnCommitted => fetchLastStableOffsetMetadata
     }
-    localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchTxnCommitted)
+    val fetchDataInfo = localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchTxnCommitted)

Review Comment:
   That is a fair question. I was contemplating that too. The alternative of having a reader verify the bytes places less stress on the log layer in Kafka, but is not really fool proof.
   e.g. in that scheme the listeners that are reading the batches (both for controller and broker) would do the CRC check (assuming the batch header reaches that layer).
   Actually now that I think about it, does having the check in the `RecordsIterator` protect all readers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel closed pull request #12457: WIP -- added some CRC checking

Posted by GitBox <gi...@apache.org>.
niket-goel closed pull request #12457: WIP -- added some CRC checking
URL: https://github.com/apache/kafka/pull/12457


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji merged pull request #12457: KAFKA-14104: Add CRC validation when iterating over Metadata Log Records

Posted by GitBox <gi...@apache.org>.
hachikuji merged PR #12457:
URL: https://github.com/apache/kafka/pull/12457


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on pull request #12457: KAFKA-14104: Add CRC validation when iterating over Metadata Log Records

Posted by GitBox <gi...@apache.org>.
niket-goel commented on PR #12457:
URL: https://github.com/apache/kafka/pull/12457#issuecomment-1204185885

   The build failures are a little cryptic. The same tests all pass locally, and the failing step in the build only complains of the script exiting with a non-zero status, but there are no actual failing tests. Will just try to trigger another run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12457: KAFKA-14104: Add CRC validation when iterating over Metadata Log Records

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12457:
URL: https://github.com/apache/kafka/pull/12457#discussion_r937164272


##########
raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java:
##########
@@ -85,18 +88,41 @@ public void testFileRecords(
         FileRecords fileRecords = FileRecords.open(TestUtils.tempFile());
         fileRecords.append(memRecords);
 
-        testIterator(batches, fileRecords);
+        testIterator(batches, fileRecords, true);
+    }
+
+    @Property
+    public void testCrcValidation(
+            @ForAll CompressionType compressionType,
+            @ForAll long seed
+    ) throws IOException {
+        List<TestBatch<String>> batches = createBatches(seed);
+        MemoryRecords memRecords = buildRecords(compressionType, batches);
+        // Corrupt the record buffer
+        memRecords.buffer().putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, new Random(seed).nextInt());

Review Comment:
   There is a non-zero probability that the test might fail due to the random int colliding with the actual value. Will modify the test to fix this.
   As for the field, it was one of the two exposed offsets that I could choose to corrupt. I am actually not happy about this either. I am going to expose the CRC_OFFSET as a public field and corrupt that instead. Should make for a more reliable test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12457: KAFKA-14104: Add CRC validation when iterating over Metadata Log Records

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12457:
URL: https://github.com/apache/kafka/pull/12457#discussion_r938007440


##########
raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java:
##########
@@ -110,6 +114,15 @@ public void testCrcValidation(
         // Verify check does not trigger when doCrcValidation is false
         assertDoesNotThrow(() -> testIterator(batches, memRecords, false));
         assertDoesNotThrow(() -> testIterator(batches, fileRecords, false));
+
+        // Fix the corruption
+        memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc);
+
+        // Verify check does not trigger when the corruption is fixed
+        assertDoesNotThrow(() -> testIterator(batches, memRecords, true));
+        FileRecords moreFileRecords = FileRecords.open(TestUtils.tempFile());

Review Comment:
   This also applies to existing tests. The tests should close the `FileRecords` objects when they are done using them.



##########
raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java:
##########
@@ -98,8 +98,12 @@ public void testCrcValidation(
     ) throws IOException {
         List<TestBatch<String>> batches = createBatches(seed);
         MemoryRecords memRecords = buildRecords(compressionType, batches);
-        // Corrupt the record buffer
-        memRecords.buffer().putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, new Random(seed).nextInt());
+        // Read the Batch CRC for the first batch from the buffer
+        ByteBuffer readBuf = memRecords.buffer().asReadOnlyBuffer();
+        readBuf.position(DefaultRecordBatch.CRC_OFFSET);
+        Integer actualCrc = readBuf.getInt();

Review Comment:
   How about this to avoid the boxing and object creation:
   ```java
   int actualCrc = readBuf.getInt();
   ```



##########
raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java:
##########
@@ -98,8 +98,12 @@ public void testCrcValidation(
     ) throws IOException {
         List<TestBatch<String>> batches = createBatches(seed);
         MemoryRecords memRecords = buildRecords(compressionType, batches);
-        // Corrupt the record buffer
-        memRecords.buffer().putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, new Random(seed).nextInt());
+        // Read the Batch CRC for the first batch from the buffer
+        ByteBuffer readBuf = memRecords.buffer().asReadOnlyBuffer();

Review Comment:
   Btw, `adReadOnlyBuffer` is probably not needed since `MemoryBuffer#buffer` returns a duplicate view into the buffer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12457: KAFKA-14104: Add CRC validation when iterating over Metadata Log Records

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12457:
URL: https://github.com/apache/kafka/pull/12457#discussion_r938103850


##########
raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java:
##########
@@ -179,7 +184,12 @@ private Optional<Batch<T>> nextBatch() {
         return Optional.empty();
     }
 
-    private Batch<T> readBatch(DefaultRecordBatch batch) {
+    private Batch<T> readBatch(DefaultRecordBatch batch) throws CorruptRecordException {

Review Comment:
   `CorruptRecordException` is a runtime exception and doesn't need to be added to the set of checked exceptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12457: KAFKA-14104: Add CRC validation when iterating over Metadata Log Records

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12457:
URL: https://github.com/apache/kafka/pull/12457#discussion_r936987763


##########
raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java:
##########
@@ -85,18 +88,41 @@ public void testFileRecords(
         FileRecords fileRecords = FileRecords.open(TestUtils.tempFile());
         fileRecords.append(memRecords);
 
-        testIterator(batches, fileRecords);
+        testIterator(batches, fileRecords, true);
+    }
+
+    @Property
+    public void testCrcValidation(
+            @ForAll CompressionType compressionType,
+            @ForAll long seed
+    ) throws IOException {
+        List<TestBatch<String>> batches = createBatches(seed);
+        MemoryRecords memRecords = buildRecords(compressionType, batches);
+        // Corrupt the record buffer
+        memRecords.buffer().putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, new Random(seed).nextInt());

Review Comment:
   Does a random int always corrupt the batch? Meaning, it is possible that this test sometime fails because it was unlucky to pick a random int that didn't invalidate the CRC.
   
   Can you explain in a comment why this buffer/memory change specifically?



##########
raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java:
##########
@@ -179,8 +182,13 @@ private Optional<Batch<T>> nextBatch() {
         return Optional.empty();
     }
 
-    private Batch<T> readBatch(DefaultRecordBatch batch) {
+    private Batch<T> readBatch(DefaultRecordBatch batch) throws CorruptRecordException {
         final Batch<T> result;
+        if (doCrcValidation) {
+            // Perform a CRC validity check on this block.

Review Comment:
   typo; did you mean "on this batch."?



##########
raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java:
##########
@@ -179,8 +182,13 @@ private Optional<Batch<T>> nextBatch() {
         return Optional.empty();
     }
 
-    private Batch<T> readBatch(DefaultRecordBatch batch) {
+    private Batch<T> readBatch(DefaultRecordBatch batch) throws CorruptRecordException {
         final Batch<T> result;
+        if (doCrcValidation) {

Review Comment:
   You can move this block before `final Batch<T> tresult` and keep `result` close to where it is initialized.



##########
raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java:
##########
@@ -49,17 +50,20 @@
     // Number of bytes from records read up to now
     private int bytesRead = 0;
     private boolean isClosed = false;
+    private boolean doCrcValidation = false;

Review Comment:
   This should be `private final boolean doCrcValidation;`. It should be moved with the rest of the `final` fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12457: WIP -- added some CRC checking

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12457:
URL: https://github.com/apache/kafka/pull/12457#discussion_r933320842


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1216,7 +1216,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       case FetchHighWatermark => fetchHighWatermarkMetadata
       case FetchTxnCommitted => fetchLastStableOffsetMetadata
     }
-    localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchTxnCommitted)
+    val fetchDataInfo = localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchTxnCommitted)

Review Comment:
   Does this mean that Kafka needs to read all of the batches to handle a Fetch request? For performance, Kafka doesn't read the batches when handling a Fetch request. Kafka sends bytes. It is up to the reader (Consumer or Follower) to validate the bytes received.



##########
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java:
##########
@@ -106,6 +106,7 @@ public static <T> RecordsSnapshotReader<T> of(
         BufferSupplier bufferSupplier,
         int maxBatchSize
     ) {
+        // TODO: Is this a good place to perform delimeter check (i.e. existence of header and footer?)

Review Comment:
   If you do this here, that means you have to read the snapshot twice, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12457: KAFKA-14104: Add CRC validation when iterating over Metadata Log Records

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12457:
URL: https://github.com/apache/kafka/pull/12457#discussion_r938879350


##########
raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java:
##########
@@ -180,6 +184,11 @@ private Optional<Batch<T>> nextBatch() {
     }
 
     private Batch<T> readBatch(DefaultRecordBatch batch) {
+        if (doCrcValidation) {
+            // Perform a CRC validity check on this batch
+            batch.ensureValid();

Review Comment:
   This throws an exception that doesn't include a stacktrace. This is a rare event so we want to pay the cost of generating a stacktrace as it will help us debug the issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org