You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/10 20:25:27 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #10276: Kafka 12253: Add tests that cover all of all of the cases for ReplicatedLog-validateOffsetAndEpoch

jsancio commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r591824754



##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -340,6 +340,124 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())

Review comment:
       How about changing the name of the method `type` so that it doesn't conflict with a Scala keyword?

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -340,6 +340,124 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochUnknown(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 10)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetGreatThanEndOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateValidEpochAndOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch)

Review comment:
       Okay. I think we should change this so that the offset is less than the LEO.

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -340,6 +340,124 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochUnknown(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 10)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetGreatThanEndOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch)

Review comment:
       Sounds good. Let's also add a case where the offset is less that the LEO. E.g.
   
   1. leader epoch = (epoch: 1, start offset: 0), (epoch: 2, start offset: 10)
   
   ```
        val resultOffsetAndEpoch = log.validateOffsetAndEpoch(11, 1)
   ```
   
   

##########
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##########
@@ -340,6 +340,124 @@ final class KafkaMetadataLogTest {
     batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1)
+    assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 10
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.SNAPSHOT, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val offset = 2
+    val epoch = 1
+
+    append(log, offset, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(offset))
+
+    val snapshotId = new OffsetAndEpoch(offset, epoch)
+    TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+      snapshot.freeze()
+    }
+    assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
+    assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.`type`())
+    assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochUnknown(): Unit = {
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    val numberOfRecords = 1
+    val epoch = 1
+
+    append(log, numberOfRecords, epoch)
+
+    val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 10)

Review comment:
       This is the same as:
   > epoch > last known epoch =>  return diverging(log.endOffset, log.lastFetchedEpoch)
   
   I updated the Jira but for this case we want the following scenario:
   > epoch unknown, less than the last known epoch and greater oldest snapshot epoch => return diverging(endOffset/epoch)
   
   Examples are:
   Case 1:
   1. oldest snapshot = (end offset: 10, epoch: 1)
   2. leader epoch = (epoch: 1, start offset: 10), (epoch: 2, start offset: 15), (epoch: 4, start offset: 20)
   Notice how the log is missing epoch 3. In the log this state means that:
   1. epoch 1 spans from offset 10 inclusive to 15 exclusive.
   2. epoch 2 spans from offset 15 inclusive to 20 exclusive.
   3. epoch 4 spans from offset 20 inclusive to LEO exclusive.
   
   For this case I think these checks should pass:
   ```
        val resultOffsetAndEpoch = log.validateOffsetAndEpoch(..., 3);
        assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
        assertEquals(new OffsetAndEpoch(20, 2), resultOffsetAndEpoch.offsetAndEpoch())
   ```
   
   Case 2:
   1. oldest snapshot = (end offset: 10, epoch: 1)
   2. leader epoch = (epoch: 3, start offset: 10)
   
   
   For this case I think these checks should pass:
   ```
        val resultOffsetAndEpoch = log.validateOffsetAndEpoch(..., 2);
        assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.`type`())
        assertEquals(new OffsetAndEpoch(10, 1), resultOffsetAndEpoch.offsetAndEpoch())
   ```

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##########
@@ -594,6 +594,102 @@ public void testDoesntTruncateFully() throws IOException {
         assertFalse(log.truncateToLatestSnapshot());
     }
 
+    @Test

Review comment:
       If you update `KafkaMetadataLogTest` based on my comments let's update these tests too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org