You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/07 21:42:12 UTC

[GitHub] [kafka] hachikuji opened a new pull request, #12963: MINOR: More consistent handling of snapshot IDs

hachikuji opened a new pull request, #12963:
URL: https://github.com/apache/kafka/pull/12963

   The ID we use for snapshots consists of the exclusive committed offset and epoch. That is, the offset in the ID is one more than the offset of the last committed record. This is currently handled a bit inconsistently in the codebase. For example, `MetadataImage` internally tracks the inclusive highest offset and epoch of records contained within the image. However, for an empty image, which contains no log records, the offset is incorrectly initialized to 0. 
   
   In this patch, I've tried to make the bookkeeping a little more consistent:
   
   - Use the exclusive offset/epoch in `MetadataImage` as an "imageId" (which then corresponds exactly to a the snapshotId generated from the image)
   - Also use the exclusive highest offset in `MetadataDelta`
   - Modify `RaftClient.createSnapshot` to pass the snapshotId directly instead of the inclusive last committed offset and epochs.
   
   The nice thing about this is that we can define a non-negative offset/epoch to represent each image. For an empty metadata image, we can use 0 as both the offset and epoch. Unfortunately, there are still a few places in the codebase which expect inclusive offsets, so we still have some awkward increments/decrements here and there.
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12963: MINOR: More consistent handling of snapshot IDs

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12963:
URL: https://github.com/apache/kafka/pull/12963#discussion_r1042716470


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##########
@@ -261,7 +261,7 @@ class BrokerMetadataPublisher(
       if (_firstPublish) {
         finishInitializingReplicaManager(newImage)
       }
-      publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
+      publishedOffsetAtomic.set(imageId.offset)

Review Comment:
   How about `publishedEndOffset`?



##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala:
##########
@@ -35,42 +34,42 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.Test
 
 import java.util
-import scala.compat.java8.OptionConverters._
 
 class BrokerMetadataSnapshotterTest {
   @Test
   def testCreateAndClose(): Unit = {
     val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None,
-      (_, _, _) => throw new RuntimeException("unimplemented"))
+      (_, _) => throw new RuntimeException("unimplemented"))

Review Comment:
   Minor but maybe we can use Java's `UnsupportedOperationException`.



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##########
@@ -69,25 +72,24 @@ class BrokerMetadataSnapshotter(
     snapshotReasons: Set[SnapshotReason]
   ): Boolean = synchronized {
     if (_currentSnapshotOffset != -1) {
-      info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch} because " +
+      info(s"Declining to create a new snapshot at ${image.imageId} because " +
         s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
       false
     } else {
       val writer = writerBuilder.build(
-        image.highestOffsetAndEpoch().offset,
-        image.highestOffsetAndEpoch().epoch,
+        image.imageId,
         lastContainedLogTime
       )
       if (writer.nonEmpty) {
-        _currentSnapshotOffset = image.highestOffsetAndEpoch.offset
+        _currentSnapshotOffset = image.imageId.offset

Review Comment:
   How about `_currentSnapshotEndOffset`?



##########
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java:
##########
@@ -133,6 +133,28 @@ public static <T> Optional<SnapshotWriter<T>> createWithHeader(
         });
     }
 
+    public static <T> SnapshotWriter<T> createWithHeader(

Review Comment:
   I think we can remove some code duplication if the function `public static <T> Optional<SnapshotWriter<T>> createWithHeader(...)` calls this implementation.



##########
metadata/src/main/java/org/apache/kafka/image/MetadataImage.java:
##########
@@ -159,7 +167,7 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        return "MetadataImage(highestOffsetAndEpoch=" + highestOffsetAndEpoch +
+        return "MetadataImage(id=" + imageId +

Review Comment:
   Should this be this instead so that the string literal matches the field name.
   ```java
           return "MetadataImage(imageId=" + imageId +
   ```



##########
metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java:
##########
@@ -366,7 +367,7 @@ public MetadataImage apply() {
     @Override
     public String toString() {
         return "MetadataDelta(" +
-            "highestOffset=" + highestOffset +
+            "highestOffset=" + highestOffsetExclusive +

Review Comment:
   Let's change the string literal `"highestOffset="` to match the internal field of this type. E.g. `"highestOffsetExclusive="`.



##########
metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java:
##########
@@ -51,7 +51,7 @@
 public final class MetadataDelta {
     private final MetadataImage image;
 
-    private long highestOffset;
+    private long highestOffsetExclusive;

Review Comment:
   How about `highestEndOffset`? I am suggesting the `EndOffset` suffix because I am planning of adding a type called `SnapshotId` which at high-level it will have two fields: `endOffset()` and `epoch()`.



##########
core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala:
##########
@@ -32,7 +32,7 @@ trait MetadataPublisher {
   def publish(delta: MetadataDelta, newImage: MetadataImage): Unit
 
   /**
-   * The highest offset of metadata topic which has been published
+   * The highest offset (exclusive) of metadata topic which has been published
    */
   def publishedOffset: Long

Review Comment:
   How about `publishedEndOffset`?



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##########
@@ -69,25 +72,24 @@ class BrokerMetadataSnapshotter(
     snapshotReasons: Set[SnapshotReason]
   ): Boolean = synchronized {
     if (_currentSnapshotOffset != -1) {
-      info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch} because " +
+      info(s"Declining to create a new snapshot at ${image.imageId} because " +
         s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
       false
     } else {
       val writer = writerBuilder.build(
-        image.highestOffsetAndEpoch().offset,
-        image.highestOffsetAndEpoch().epoch,
+        image.imageId,
         lastContainedLogTime
       )
       if (writer.nonEmpty) {
-        _currentSnapshotOffset = image.highestOffsetAndEpoch.offset
+        _currentSnapshotOffset = image.imageId.offset
 
         val snapshotReasonsMessage = SnapshotReason.stringFromReasons(snapshotReasons.asJava)
-        info(s"Creating a new snapshot at ${image.highestOffsetAndEpoch} because: $snapshotReasonsMessage")
+        info(s"Creating a new snapshot at ${image.imageId} because: $snapshotReasonsMessage")
         eventQueue.append(new CreateSnapshotEvent(image, writer.get))
         true
       } else {
-        info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
-          s"there is already a snapshot at offset ${image.highestOffsetAndEpoch().offset}")
+        info(s"Declining to create a new snapshot at ${image.imageId} because " +
+          s"there is already a snapshot at this offset")

Review Comment:
   There is no string interpolation so `s""` is not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12963: MINOR: More consistent handling of snapshot IDs

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12963:
URL: https://github.com/apache/kafka/pull/12963#discussion_r1042809968


##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -166,15 +172,15 @@ class BrokerMetadataListenerTest {
       image = newImage
     }
 
-    override def publishedOffset: Long = -1
+    override def publishedEndOffset: Long = -1
   }
 
   private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
   private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
 
   private def generateManyRecords(listener: BrokerMetadataListener,
                                   endOffset: Long): Unit = {
-    (0 to 10000).foreach { _ =>
+    (0 until 10000).foreach { _ =>
       listener.handleCommit(
         RecordTestUtils.mockBatchReader(
           endOffset,

Review Comment:
   This is probably beyond the scope of this PR but it is unfortunate that this test is creating a sequence of batches that are impossible with KRaft. I assume that we are generating this many record to trigger size-based snapshots.



##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -248,28 +254,28 @@ class BrokerMetadataListenerTest {
       assertEquals(200L, listener.highestMetadataOffset)
 
       // Check that we generate at least one snapshot once we see enough records.
-      assertEquals(-1L, snapshotter.prevCommittedOffset)
-      generateManyRecords(listener, 1000L)
-      assertEquals(1000L, snapshotter.prevCommittedOffset)
-      assertEquals(1000L, snapshotter.activeSnapshotOffset)
-      snapshotter.activeSnapshotOffset = -1L
+      assertNull(snapshotter.prevSnapshotId)
+      generateManyRecords(listener, endOffset = 1000L)
+      assertEquals(new OffsetAndEpoch(1000L, 0), snapshotter.prevSnapshotId)
+      assertEquals(new OffsetAndEpoch(1000L, 0), snapshotter.activeSnapshotId)
+      snapshotter.activeSnapshotId = null
 
       // Test creating a new snapshot after publishing it.
       val publisher = new MockMetadataPublisher()
       listener.startPublishing(publisher).get()
-      generateManyRecords(listener, 2000L)
+      generateManyRecords(listener, endOffset = 2000L)
       listener.getImageRecords().get()
-      assertEquals(2000L, snapshotter.activeSnapshotOffset)
-      assertEquals(2000L, snapshotter.prevCommittedOffset)
+      assertEquals(new OffsetAndEpoch(2000L, 0), snapshotter.prevSnapshotId)
+      assertEquals(new OffsetAndEpoch(2000L, 0), snapshotter.activeSnapshotId)
 
       // Test how we handle the snapshotter returning false.
-      generateManyRecords(listener, 3000L)
-      assertEquals(2000L, snapshotter.activeSnapshotOffset)
-      generateManyRecords(listener, 4000L)
-      assertEquals(2000L, snapshotter.activeSnapshotOffset)
-      snapshotter.activeSnapshotOffset = -1L
-      generateManyRecords(listener, 5000L)
-      assertEquals(5000L, snapshotter.activeSnapshotOffset)
+      generateManyRecords(listener, endOffset = 3000L)
+      assertEquals(new OffsetAndEpoch(2000L, 0), snapshotter.activeSnapshotId)
+      generateManyRecords(listener, endOffset = 4000L)
+      assertEquals(new OffsetAndEpoch(2000L, 0), snapshotter.activeSnapshotId)
+      snapshotter.activeSnapshotId = null
+      generateManyRecords(listener, endOffset = 5000L)
+      assertEquals(new OffsetAndEpoch(5000L, 0), snapshotter.activeSnapshotId)

Review Comment:
   Given my comments regarding `mockBatchReader`. I would expect these snapshot id comparisons to fail, no?



##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -166,15 +172,15 @@ class BrokerMetadataListenerTest {
       image = newImage
     }
 
-    override def publishedOffset: Long = -1
+    override def publishedEndOffset: Long = -1
   }
 
   private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
   private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
 
   private def generateManyRecords(listener: BrokerMetadataListener,
                                   endOffset: Long): Unit = {

Review Comment:
   I think there is some inconsistency here. This field is called `endOffset` and it is passed to `mockBatchReader`. `mockBatchReader` calls this parameter `lastOffset`. Kafka tends to make "endOffsets" exclusive while "lastOffsets" are inclusive.
   
   Looking at the implementation of `mockBatchReader`, `lastOffset` is treated as the inclusive last offset of the batch.



##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -1230,7 +1230,7 @@ object TestUtils extends Logging {
     TestUtils.waitUntilTrue(
       () => {
         brokers.forall { broker =>
-          val metadataOffset = broker.asInstanceOf[BrokerServer].metadataPublisher.publishedOffset
+          val metadataOffset = broker.asInstanceOf[BrokerServer].metadataPublisher.publishedEndOffset
           metadataOffset >= controllerOffset

Review Comment:
   `controllerOffset` is the last offset in the controller, inclusive. metadataOffset is exclusive. Maybe the best change is to have this line instead in line 1229:
   ```scala
   val controllerEndOffset = controllerServer.raftManager.replicatedLog.endOffset().offset
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12963: MINOR: More consistent handling of snapshot IDs

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12963:
URL: https://github.com/apache/kafka/pull/12963#discussion_r1044748464


##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -166,15 +172,15 @@ class BrokerMetadataListenerTest {
       image = newImage
     }
 
-    override def publishedOffset: Long = -1
+    override def publishedEndOffset: Long = -1
   }
 
   private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
   private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
 
   private def generateManyRecords(listener: BrokerMetadataListener,
                                   endOffset: Long): Unit = {

Review Comment:
   Got it. The part that I don't understand is that we send that value (`endOffset`) to `mockBatchReader` in the first parameter (`lastOffset`). Looking at the implementation of `mockBatchReader`, it looks to me like that code assumes that the offset is inclusive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji closed pull request #12963: MINOR: More consistent handling of snapshot IDs

Posted by GitBox <gi...@apache.org>.
hachikuji closed pull request #12963: MINOR: More consistent handling of snapshot IDs
URL: https://github.com/apache/kafka/pull/12963


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #12963: MINOR: More consistent handling of snapshot IDs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #12963:
URL: https://github.com/apache/kafka/pull/12963#issuecomment-1347439806

   Closing this patch since the issue with MetadataImage.EMPTY was addressed in https://github.com/apache/kafka/commit/b2dea17041157ceee741041d23783ff993b88ef1. I've opened a separate patch for the refactor of `RaftClient.createSnapshot`: https://github.com/apache/kafka/pull/12981.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12963: MINOR: More consistent handling of snapshot IDs

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12963:
URL: https://github.com/apache/kafka/pull/12963#discussion_r1042828301


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -1230,7 +1230,7 @@ object TestUtils extends Logging {
     TestUtils.waitUntilTrue(
       () => {
         brokers.forall { broker =>
-          val metadataOffset = broker.asInstanceOf[BrokerServer].metadataPublisher.publishedOffset
+          val metadataOffset = broker.asInstanceOf[BrokerServer].metadataPublisher.publishedEndOffset
           metadataOffset >= controllerOffset

Review Comment:
   We could also maybe rename `metadataOffset` to `metadataEndOffset`.
   
   I think we discussed this in the past but I wonder we should use types like `Offset` and `EndOffset` instead of `long` and implement comparison logic for them. Java's future primitive classes would be ideal for this: https://openjdk.org/jeps/401



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12963: MINOR: More consistent handling of snapshot IDs

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12963:
URL: https://github.com/apache/kafka/pull/12963#discussion_r1044743802


##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -166,15 +172,15 @@ class BrokerMetadataListenerTest {
       image = newImage
     }
 
-    override def publishedOffset: Long = -1
+    override def publishedEndOffset: Long = -1
   }
 
   private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
   private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
 
   private def generateManyRecords(listener: BrokerMetadataListener,
                                   endOffset: Long): Unit = {
-    (0 to 10000).foreach { _ =>
+    (0 until 10000).foreach { _ =>
       listener.handleCommit(
         RecordTestUtils.mockBatchReader(
           endOffset,

Review Comment:
   This code generates 10k batches and sends them to the state machine but they all have the same base offset and end offset, no? You could imagine a listener implementation that ignores batches with offset that it already applied.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12963: MINOR: More consistent handling of snapshot IDs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12963:
URL: https://github.com/apache/kafka/pull/12963#discussion_r1043680891


##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -166,15 +172,15 @@ class BrokerMetadataListenerTest {
       image = newImage
     }
 
-    override def publishedOffset: Long = -1
+    override def publishedEndOffset: Long = -1
   }
 
   private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
   private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
 
   private def generateManyRecords(listener: BrokerMetadataListener,
                                   endOffset: Long): Unit = {
-    (0 to 10000).foreach { _ =>
+    (0 until 10000).foreach { _ =>
       listener.handleCommit(
         RecordTestUtils.mockBatchReader(
           endOffset,

Review Comment:
   Are you referring to the removing replicas changes here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12963: MINOR: More consistent handling of snapshot IDs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12963:
URL: https://github.com/apache/kafka/pull/12963#discussion_r1043679688


##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -166,15 +172,15 @@ class BrokerMetadataListenerTest {
       image = newImage
     }
 
-    override def publishedOffset: Long = -1
+    override def publishedEndOffset: Long = -1
   }
 
   private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
   private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
 
   private def generateManyRecords(listener: BrokerMetadataListener,
                                   endOffset: Long): Unit = {

Review Comment:
   I changed the method here to make `endOffset` exclusive, so it seems ok now? Or am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org