You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/07 22:21:31 UTC

[GitHub] [kafka] jsancio commented on a diff in pull request #12963: MINOR: More consistent handling of snapshot IDs

jsancio commented on code in PR #12963:
URL: https://github.com/apache/kafka/pull/12963#discussion_r1042716470


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##########
@@ -261,7 +261,7 @@ class BrokerMetadataPublisher(
       if (_firstPublish) {
         finishInitializingReplicaManager(newImage)
       }
-      publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
+      publishedOffsetAtomic.set(imageId.offset)

Review Comment:
   How about `publishedEndOffset`?



##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala:
##########
@@ -35,42 +34,42 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.Test
 
 import java.util
-import scala.compat.java8.OptionConverters._
 
 class BrokerMetadataSnapshotterTest {
   @Test
   def testCreateAndClose(): Unit = {
     val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None,
-      (_, _, _) => throw new RuntimeException("unimplemented"))
+      (_, _) => throw new RuntimeException("unimplemented"))

Review Comment:
   Minor but maybe we can use Java's `UnsupportedOperationException`.



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##########
@@ -69,25 +72,24 @@ class BrokerMetadataSnapshotter(
     snapshotReasons: Set[SnapshotReason]
   ): Boolean = synchronized {
     if (_currentSnapshotOffset != -1) {
-      info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch} because " +
+      info(s"Declining to create a new snapshot at ${image.imageId} because " +
         s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
       false
     } else {
       val writer = writerBuilder.build(
-        image.highestOffsetAndEpoch().offset,
-        image.highestOffsetAndEpoch().epoch,
+        image.imageId,
         lastContainedLogTime
       )
       if (writer.nonEmpty) {
-        _currentSnapshotOffset = image.highestOffsetAndEpoch.offset
+        _currentSnapshotOffset = image.imageId.offset

Review Comment:
   How about `_currentSnapshotEndOffset`?



##########
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java:
##########
@@ -133,6 +133,28 @@ public static <T> Optional<SnapshotWriter<T>> createWithHeader(
         });
     }
 
+    public static <T> SnapshotWriter<T> createWithHeader(

Review Comment:
   I think we can remove some code duplication if the function `public static <T> Optional<SnapshotWriter<T>> createWithHeader(...)` calls this implementation.



##########
metadata/src/main/java/org/apache/kafka/image/MetadataImage.java:
##########
@@ -159,7 +167,7 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        return "MetadataImage(highestOffsetAndEpoch=" + highestOffsetAndEpoch +
+        return "MetadataImage(id=" + imageId +

Review Comment:
   Should this be this instead so that the string literal matches the field name.
   ```java
           return "MetadataImage(imageId=" + imageId +
   ```



##########
metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java:
##########
@@ -366,7 +367,7 @@ public MetadataImage apply() {
     @Override
     public String toString() {
         return "MetadataDelta(" +
-            "highestOffset=" + highestOffset +
+            "highestOffset=" + highestOffsetExclusive +

Review Comment:
   Let's change the string literal `"highestOffset="` to match the internal field of this type. E.g. `"highestOffsetExclusive="`.



##########
metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java:
##########
@@ -51,7 +51,7 @@
 public final class MetadataDelta {
     private final MetadataImage image;
 
-    private long highestOffset;
+    private long highestOffsetExclusive;

Review Comment:
   How about `highestEndOffset`? I am suggesting the `EndOffset` suffix because I am planning of adding a type called `SnapshotId` which at high-level it will have two fields: `endOffset()` and `epoch()`.



##########
core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala:
##########
@@ -32,7 +32,7 @@ trait MetadataPublisher {
   def publish(delta: MetadataDelta, newImage: MetadataImage): Unit
 
   /**
-   * The highest offset of metadata topic which has been published
+   * The highest offset (exclusive) of metadata topic which has been published
    */
   def publishedOffset: Long

Review Comment:
   How about `publishedEndOffset`?



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##########
@@ -69,25 +72,24 @@ class BrokerMetadataSnapshotter(
     snapshotReasons: Set[SnapshotReason]
   ): Boolean = synchronized {
     if (_currentSnapshotOffset != -1) {
-      info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch} because " +
+      info(s"Declining to create a new snapshot at ${image.imageId} because " +
         s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
       false
     } else {
       val writer = writerBuilder.build(
-        image.highestOffsetAndEpoch().offset,
-        image.highestOffsetAndEpoch().epoch,
+        image.imageId,
         lastContainedLogTime
       )
       if (writer.nonEmpty) {
-        _currentSnapshotOffset = image.highestOffsetAndEpoch.offset
+        _currentSnapshotOffset = image.imageId.offset
 
         val snapshotReasonsMessage = SnapshotReason.stringFromReasons(snapshotReasons.asJava)
-        info(s"Creating a new snapshot at ${image.highestOffsetAndEpoch} because: $snapshotReasonsMessage")
+        info(s"Creating a new snapshot at ${image.imageId} because: $snapshotReasonsMessage")
         eventQueue.append(new CreateSnapshotEvent(image, writer.get))
         true
       } else {
-        info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
-          s"there is already a snapshot at offset ${image.highestOffsetAndEpoch().offset}")
+        info(s"Declining to create a new snapshot at ${image.imageId} because " +
+          s"there is already a snapshot at this offset")

Review Comment:
   There is no string interpolation so `s""` is not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org