You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/08 04:58:59 UTC

[GitHub] [kafka] rohitrmd opened a new pull request #10276: Kafka 12253: Add tests that cover all of all of the cases for ReplicatedLog::validateOffsetAndEpoch

rohitrmd opened a new pull request #10276:
URL: https://github.com/apache/kafka/pull/10276


   jira:  https://issues.apache.org/jira/browse/KAFKA-12253 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #10276: Kafka 12253: Add tests that cover all of all of the cases for ReplicatedLog-validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#issuecomment-793276622


   > @jsancio
   
   Thanks. I'll take a look this week.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10276:
URL: https://github.com/apache/kafka/pull/10276


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rohitrmd commented on pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
rohitrmd commented on pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#issuecomment-798856850


   @jsancio made changes as per review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r597260954



##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())

Review comment:
       What I mean is just using the variable that we already have defined.
   ```scala
   val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
   ...
   assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch())
   ```

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())

Review comment:
       What I mean is just using the value that we already have defined.
   ```scala
   val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
   ...
   assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch())
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r593859477



##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -340,6 +340,124 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochUnknown(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 10)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetGreatThanEndOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch)

Review comment:
       Added testValidateOffsetLessThanLEO test.

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##########
@@ -594,6 +594,102 @@ public void testDoesntTruncateFully() throws IOException {
         assertFalse(log.truncateToLatestSnapshot());
     }
 
+    @Test

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rohitrmd commented on pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
rohitrmd commented on pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#issuecomment-801440974


   thanks @jsancio. will you be merging the pr?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r597144327



##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())

Review comment:
       @hachikuji can you explain what is meant by using snapshotId? You mean changing `assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())` to`    assertEquals(new OffsetAndEpoch(snapshotId.offset, snapshotId.epoch), resultOffsetAndEpoch.offsetAndEpoch())
   ` and not use variables?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r596481018



##########
File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
##########
@@ -16,40 +16,56 @@
  */
 package org.apache.kafka.raft;
 
+import java.util.Objects;
+
 public final class ValidOffsetAndEpoch {
-    final private Type type;
+    final private Kind kind;

Review comment:
       It's always been a little odd to me that `ValidOffsetAndEpoch` had different types (or kinds), including "VALID." If this were scala, we would probably use case classes to represent this. The nice thing about that is that we can use different field names for each case instead of having to call it `offsetAndEpoch` in all cases. Even though we don't have anything as convenient in Java, I do wonder if it might be clearer to approximate that pattern with an interface and separate implementations for each case. For example:
   
   ```java
   interface ValidationResult
   
   class OutOfRange implements ValidationResult {
     OffsetAndEpoch latestSnapshotId;
   }
   
   class DivergingEpoch implements ValidationResult {
     OffsetAndEpoch divergingEpoch;
   }
   
   class ValidOffsetAndEpoch implements ValidationResult {
   }
   ```
   
   The benefit is that we make the expectation about the `OffsetAndEpoch` in the result explicit. The downside is that we have to use `instanceof` checks for each case and we don't get any guarantees from the type system that we have handled them all. Kind of painful either way.
   

##########
File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
##########
@@ -16,40 +16,56 @@
  */
 package org.apache.kafka.raft;
 
+import java.util.Objects;
+
 public final class ValidOffsetAndEpoch {
-    final private Type type;
+    final private Kind kind;
     final private OffsetAndEpoch offsetAndEpoch;
 
-    ValidOffsetAndEpoch(Type type, OffsetAndEpoch offsetAndEpoch) {
-        this.type = type;
+    ValidOffsetAndEpoch(Kind kind, OffsetAndEpoch offsetAndEpoch) {
+        this.kind = kind;
         this.offsetAndEpoch = offsetAndEpoch;
     }
 
-    public Type type() {
-        return type;
+    public Kind kind() {
+        return kind;
     }
 
     public OffsetAndEpoch offsetAndEpoch() {
         return offsetAndEpoch;
     }
 
-    public static enum Type {
+    public static enum Kind {

Review comment:
       nit: `static` is redundant

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())

Review comment:
       nit: use `snapshotId`? There are a few similar cases below and in `MockLogTest`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rohitrmd commented on pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
rohitrmd commented on pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#issuecomment-799971318


   @jsancio thanks for the review. Made changes as per review comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r594509680



##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##########
@@ -732,13 +760,17 @@ public int hashCode() {
     }
 
     private void appendAsLeader(Collection<SimpleRecord> records, int epoch) {
+        appendAsLeader(records, epoch, log.endOffset().offset);
+    }
+
+    private void appendAsLeader(Collection<SimpleRecord> records, int epoch, long initialOffset) {
         log.appendAsLeader(
-            MemoryRecords.withRecords(
-                log.endOffset().offset,
-                CompressionType.NONE,
-                records.toArray(new SimpleRecord[records.size()])
-            ),
-            epoch
+                MemoryRecords.withRecords(
+                        initialOffset,
+                        CompressionType.NONE,
+                        records.toArray(new SimpleRecord[records.size()])
+                ),
+                epoch

Review comment:
       Indentation looks off. We indent 4 spaces.

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -413,22 +413,53 @@ final class KafkaMetadataLogTest {
     assertTrue(log.deleteBeforeSnapshot(snapshotId))
 
     val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
-    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.getType())
+    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.getType)
     assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
   }
 
   @Test
-  def testValidateEpochUnknown(): Unit = {
+  def testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot(): Unit = {
+    val offset = 10
+    val numOfRecords = 5
+
     val log = buildMetadataLog(tempDir, mockTime)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+    val snapshotId = new OffsetAndEpoch(offset, 1)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    log.truncateToLatestSnapshot()
 
-    val numberOfRecords = 1
-    val epoch = 1
 
-    append(log, numberOfRecords, epoch)
+    append(log, numOfRecords, epoch = 1, initialOffset = 10)
+    append(log, numOfRecords, epoch = 2, initialOffset = 15)
+    append(log, numOfRecords, epoch = 4, initialOffset = 20)
 
-    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 10)
-    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.getType())
-    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+    // offset is not equal to oldest snapshot's offset
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.getType)
+    assertEquals(new OffsetAndEpoch(20, 2), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateUnknownEpochLessThanLeaderGreaterThanOldestSnapshot(): Unit = {

Review comment:
       How about `testValidateEpochLessThanFirstEpochInLog`? If you agree, let's change it in `MockLogTest` also.

##########
File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
##########
@@ -25,7 +27,7 @@
         this.offsetAndEpoch = offsetAndEpoch;
     }
 
-    public Type type() {
+    public Type getType() {

Review comment:
       By the way we can also just change the name of the type and field to something like `public Kind kind()`

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##########
@@ -750,4 +782,13 @@ private void appendBatch(int numRecords, int epoch) {
 
         appendAsLeader(records, epoch);
     }
+
+    private void appendBatch(int numRecords, int epoch, long initialOffset) {

Review comment:
       How about `private void appendBatch(int numberOfRecords, int epoch)` and always use the LEO like `appendAsLeader(Collection<SimpleRecord> records, int epoch)`?

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -413,22 +413,53 @@ final class KafkaMetadataLogTest {
     assertTrue(log.deleteBeforeSnapshot(snapshotId))
 
     val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
-    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.getType())
+    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.getType)
     assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
   }
 
   @Test
-  def testValidateEpochUnknown(): Unit = {
+  def testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot(): Unit = {
+    val offset = 10
+    val numOfRecords = 5
+
     val log = buildMetadataLog(tempDir, mockTime)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+    val snapshotId = new OffsetAndEpoch(offset, 1)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    log.truncateToLatestSnapshot()
 
-    val numberOfRecords = 1
-    val epoch = 1
 
-    append(log, numberOfRecords, epoch)
+    append(log, numOfRecords, epoch = 1, initialOffset = 10)

Review comment:
       I think we should just change the signature to `def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int): LogAppendInfo`. The implementation of `append` should just use the value in `log.endOffset.offset` to set the batch's base offset.
   
   For a little bit of background the semantic for `ReplicatedLog::appendAsLeader` was recently changed from setting the right base offset to expecting the right base offset.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r593859421



##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -340,6 +340,124 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochUnknown(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 10)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetGreatThanEndOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateValidEpochAndOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch)

Review comment:
       done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10276: Kafka 12253: Add tests that cover all of all of the cases for ReplicatedLog-validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r591824754



##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -340,6 +340,124 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())

Review comment:
       How about changing the name of the method `type` so that it doesn't conflict with a Scala keyword?

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -340,6 +340,124 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochUnknown(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 10)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetGreatThanEndOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateValidEpochAndOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch)

Review comment:
       Okay. I think we should change this so that the offset is less than the LEO.

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -340,6 +340,124 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochUnknown(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 10)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetGreatThanEndOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch)

Review comment:
       Sounds good. Let's also add a case where the offset is less that the LEO. E.g.
   
   1. leader epoch = (epoch: 1, start offset: 0), (epoch: 2, start offset: 10)
   
   ```
        val resultOffsetAndEpoch = log.validateOffsetAndEpoch(11, 1)
   ```
   
   

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -340,6 +340,124 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochUnknown(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 10)

Review comment:
       This is the same as:
   > epoch > last known epoch =>  return diverging(log.endOffset, log.lastFetchedEpoch)
   
   I updated the Jira but for this case we want the following scenario:
   > epoch unknown, less than the last known epoch and greater oldest snapshot epoch => return diverging(endOffset/epoch)
   
   Examples are:
   Case 1:
   1. oldest snapshot = (end offset: 10, epoch: 1)
   2. leader epoch = (epoch: 1, start offset: 10), (epoch: 2, start offset: 15), (epoch: 4, start offset: 20)
   Notice how the log is missing epoch 3. In the log this state means that:
   1. epoch 1 spans from offset 10 inclusive to 15 exclusive.
   2. epoch 2 spans from offset 15 inclusive to 20 exclusive.
   3. epoch 4 spans from offset 20 inclusive to LEO exclusive.
   
   For this case I think these checks should pass:
   ```
        val resultOffsetAndEpoch = log.validateOffsetAndEpoch(..., 3);
        assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
        assertEquals(new OffsetAndEpoch(20, 2), resultOffsetAndEpoch.offsetAndEpoch())
   ```
   
   Case 2:
   1. oldest snapshot = (end offset: 10, epoch: 1)
   2. leader epoch = (epoch: 3, start offset: 10)
   
   
   For this case I think these checks should pass:
   ```
        val resultOffsetAndEpoch = log.validateOffsetAndEpoch(..., 2);
        assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
        assertEquals(new OffsetAndEpoch(10, 1), resultOffsetAndEpoch.offsetAndEpoch())
   ```

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##########
@@ -594,6 +594,102 @@ public void testDoesntTruncateFully() throws IOException {
         assertFalse(log.truncateToLatestSnapshot());
     }
 
+    @Test

Review comment:
       If you update `KafkaMetadataLogTest` based on my comments let's update these tests too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rohitrmd commented on pull request #10276: Kafka 12253: Add tests that cover all of all of the cases for ReplicatedLog-validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
rohitrmd commented on pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#issuecomment-793011934


   @jsancio 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r596994002



##########
File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
##########
@@ -16,40 +16,56 @@
  */
 package org.apache.kafka.raft;
 
+import java.util.Objects;
+
 public final class ValidOffsetAndEpoch {
-    final private Type type;
+    final private Kind kind;

Review comment:
       Yeah. What makes the suggested design pattern nice in Scala is that the companion objects for `case class` have an `unapply` method for pattern matching.
   
   For what it is worth. I thought about this design when implementing `ValidOffsetAndEpoch` but found the current design more user friendly in Java.
   
   I am all for changing the name of `ValidOffsetAndEpoch`. I was never happy with the name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#issuecomment-801508474


   > thanks @jsancio. will you be merging the pr?
   
   You need a green checkmark from a committer. I am not a committer. :smile: 
   
   cc @mumrah @hachikuji 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r597158051



##########
File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
##########
@@ -16,40 +16,56 @@
  */
 package org.apache.kafka.raft;
 
+import java.util.Objects;
+
 public final class ValidOffsetAndEpoch {
-    final private Type type;
+    final private Kind kind;

Review comment:
       Agree with existing design and limitations of interface compared to case classes. Java 15 has sealed classes which will be good alternative: https://openjdk.java.net/jeps/360 to interfaces as we will be limiting the extension. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

Posted by GitBox <gi...@apache.org>.
rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r597309028



##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())

Review comment:
       @hachikuji made changes to reuse snapshotId variable. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org