You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/24 16:57:06 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #10899: KAFKA-12952 Adding Delimiters to Metadata Snapshot

jsancio commented on a change in pull request #10899:
URL: https://github.com/apache/kafka/pull/10899#discussion_r658125257



##########
File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##########
@@ -37,6 +43,52 @@
     private final int localId = 0;
     private final Set<Integer> voters = Collections.singleton(localId);
 
+    @Test
+    public void testSnapshotDelimiters() throws Exception {
+        // TODO ngoel
+        // Is this the right way to setup the offset for
+        // an empty snapshot?
+        int recordsPerBatch = 1;
+        int batches = 0;
+        OffsetAndEpoch id = new OffsetAndEpoch(recordsPerBatch * batches, 3);
+
+        RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters);
+        RaftClientTestContext context = contextBuilder.build();
+
+        context.pollUntil(() -> context.currentLeader().equals(OptionalInt.of(localId)));
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+        // Create an empty snapshot and freeze it immediately
+        try (SnapshotWriter<String> snapshot = context.client.createSnapshot(id.offset - 1, id.epoch).get()) {
+            assertEquals(id, snapshot.snapshotId());
+            snapshot.freeze();
+        }
+
+        // Verify that an empty snapshot has only the Header and Footer
+        try (SnapshotReader<String> reader = readSnapshot(context, id, Integer.MAX_VALUE)) {
+            RawSnapshotReader snapshot = context.log.readSnapshot(id).get();
+            // TODO ngoel
+            // Is there a way to make this a function of the actual
+            // Control Batch Record size.
+            assertEquals(150, snapshot.sizeInBytes());
+            int countBatches = 0;
+            Iterator<RecordBatch> recordBatches = Utils.covariantCast(snapshot.records().batchIterator());
+            while (recordBatches.hasNext()) {
+                int countRecords = 0;
+                RecordBatch batch = recordBatches.next();
+                countBatches += 1;
+
+                Iterator<Record> records = batch.streamingIterator(new GrowableBufferSupplier());
+                while (records.hasNext()) {
+                    Record record = records.next();

Review comment:
       Are you planning to add a check that the record is a control record and that you can decode the key and the value?

##########
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -541,6 +541,8 @@ public void resign(int epoch) {
     @Override
     public Optional<SnapshotWriter<ApiMessageAndVersion>> createSnapshot(long committedOffset, int committedEpoch) {
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(committedOffset + 1, committedEpoch);
+        return SnapshotWriter.createWithHeader(
+                () -> createNewSnapshot(snapshotId),

Review comment:
       There are two returns in a row. The second return is never reached.

##########
File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##########
@@ -37,6 +43,52 @@
     private final int localId = 0;
     private final Set<Integer> voters = Collections.singleton(localId);
 
+    @Test
+    public void testSnapshotDelimiters() throws Exception {
+        // TODO ngoel
+        // Is this the right way to setup the offset for
+        // an empty snapshot?
+        int recordsPerBatch = 1;
+        int batches = 0;
+        OffsetAndEpoch id = new OffsetAndEpoch(recordsPerBatch * batches, 3);
+
+        RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters);
+        RaftClientTestContext context = contextBuilder.build();
+
+        context.pollUntil(() -> context.currentLeader().equals(OptionalInt.of(localId)));
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+        // Create an empty snapshot and freeze it immediately
+        try (SnapshotWriter<String> snapshot = context.client.createSnapshot(id.offset - 1, id.epoch).get()) {
+            assertEquals(id, snapshot.snapshotId());
+            snapshot.freeze();
+        }
+
+        // Verify that an empty snapshot has only the Header and Footer
+        try (SnapshotReader<String> reader = readSnapshot(context, id, Integer.MAX_VALUE)) {
+            RawSnapshotReader snapshot = context.log.readSnapshot(id).get();
+            // TODO ngoel
+            // Is there a way to make this a function of the actual
+            // Control Batch Record size.
+            assertEquals(150, snapshot.sizeInBytes());

Review comment:
       Yeah. This is probably a brittle check. How about checking that is is non-empty and that you can decode the control batches/records?

##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -84,44 +74,36 @@ private SnapshotWriter(
     }
 
     /**
-     * Adds a {@link MetadataSnapshotHeaderRecord} to snapshot
+     * Adds a {@link SnapshotHeaderRecord} to snapshot
      *
      * @throws IllegalStateException if the snapshot is not empty
      */
     private void initializeSnapshotWithHeader() {
         if (snapshot.sizeInBytes() != 0) {
-            throw new IllegalStateException("Initializing new snapshot (ID: "
-                    + snapshot.snapshotId().epoch + ", " + snapshot.snapshotId().offset
-                    + ") with a non-empty file");
+            throw new IllegalStateException("Initializing with a non-empty snapshot (%s)"
+                    + snapshot.snapshotId());

Review comment:
       `+` does a string concatenation. You need to use `String.format` if you wan to perform string formatting.

##########
File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##########
@@ -37,6 +43,52 @@
     private final int localId = 0;
     private final Set<Integer> voters = Collections.singleton(localId);
 
+    @Test
+    public void testSnapshotDelimiters() throws Exception {
+        // TODO ngoel
+        // Is this the right way to setup the offset for
+        // an empty snapshot?
+        int recordsPerBatch = 1;
+        int batches = 0;
+        OffsetAndEpoch id = new OffsetAndEpoch(recordsPerBatch * batches, 3);
+
+        RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters);
+        RaftClientTestContext context = contextBuilder.build();
+
+        context.pollUntil(() -> context.currentLeader().equals(OptionalInt.of(localId)));
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+        // Create an empty snapshot and freeze it immediately
+        try (SnapshotWriter<String> snapshot = context.client.createSnapshot(id.offset - 1, id.epoch).get()) {
+            assertEquals(id, snapshot.snapshotId());
+            snapshot.freeze();
+        }
+
+        // Verify that an empty snapshot has only the Header and Footer
+        try (SnapshotReader<String> reader = readSnapshot(context, id, Integer.MAX_VALUE)) {
+            RawSnapshotReader snapshot = context.log.readSnapshot(id).get();
+            // TODO ngoel
+            // Is there a way to make this a function of the actual
+            // Control Batch Record size.
+            assertEquals(150, snapshot.sizeInBytes());
+            int countBatches = 0;
+            Iterator<RecordBatch> recordBatches = Utils.covariantCast(snapshot.records().batchIterator());
+            while (recordBatches.hasNext()) {
+                int countRecords = 0;
+                RecordBatch batch = recordBatches.next();

Review comment:
       We should check that this is a control batch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org