You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/12/16 21:05:25 UTC

[kafka] 06/11: MINOR; Improve high watermark log messages (#12975)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 793c36c55489ece8813e333bed082189da84a6f2
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Mon Dec 12 16:32:16 2022 -0800

    MINOR; Improve high watermark log messages (#12975)
    
    While debugging KRaft and the metadata state machines it is helpful to always log the first time the replica discovers the high watermark. All other updates to the high watermark are logged at trace because they are more frequent and less useful.
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../java/org/apache/kafka/raft/FollowerState.java  | 69 +++++++++++++++++-----
 .../java/org/apache/kafka/raft/LeaderState.java    | 28 +++++++--
 2 files changed, 75 insertions(+), 22 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index e3a30479745..1eb9eb685d9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -105,26 +105,41 @@ public class FollowerState implements EpochState {
         fetchTimer.reset(timeoutMs);
     }
 
-    public boolean updateHighWatermark(OptionalLong highWatermark) {
-        if (!highWatermark.isPresent() && this.highWatermark.isPresent())
-            throw new IllegalArgumentException("Attempt to overwrite current high watermark " + this.highWatermark +
-                " with unknown value");
-
-        if (this.highWatermark.isPresent()) {
-            long previousHighWatermark = this.highWatermark.get().offset;
-            long updatedHighWatermark = highWatermark.getAsLong();
-
-            if (updatedHighWatermark < 0)
-                throw new IllegalArgumentException("Illegal negative high watermark update");
-            if (previousHighWatermark > updatedHighWatermark)
-                throw new IllegalArgumentException("Non-monotonic update of high watermark attempted");
-            if (previousHighWatermark == updatedHighWatermark)
+    public boolean updateHighWatermark(OptionalLong newHighWatermark) {
+        if (!newHighWatermark.isPresent() && highWatermark.isPresent()) {
+            throw new IllegalArgumentException(
+                String.format("Attempt to overwrite current high watermark %s with unknown value", highWatermark)
+            );
+        }
+
+        if (highWatermark.isPresent()) {
+            long previousHighWatermark = highWatermark.get().offset;
+            long updatedHighWatermark = newHighWatermark.getAsLong();
+
+            if (updatedHighWatermark < 0) {
+                throw new IllegalArgumentException(
+                    String.format("Illegal negative (%s) high watermark update", updatedHighWatermark)
+                );
+            } else if (previousHighWatermark > updatedHighWatermark) {
+                throw new IllegalArgumentException(
+                    String.format(
+                        "Non-monotonic update of high watermark from %s to %s",
+                        previousHighWatermark,
+                        updatedHighWatermark
+                    )
+                );
+            } else if (previousHighWatermark == updatedHighWatermark) {
                 return false;
+            }
         }
 
-        this.highWatermark = highWatermark.isPresent() ?
-            Optional.of(new LogOffsetMetadata(highWatermark.getAsLong())) :
+        Optional<LogOffsetMetadata> oldHighWatermark = highWatermark;
+        highWatermark = newHighWatermark.isPresent() ?
+            Optional.of(new LogOffsetMetadata(newHighWatermark.getAsLong())) :
             Optional.empty();
+
+        logHighWatermarkUpdate(oldHighWatermark, highWatermark);
+
         return true;
     }
 
@@ -169,4 +184,26 @@ public class FollowerState implements EpochState {
             fetchingSnapshot.get().close();
         }
     }
+
+    private void logHighWatermarkUpdate(
+        Optional<LogOffsetMetadata> oldHighWatermark,
+        Optional<LogOffsetMetadata> newHighWatermark
+    ) {
+        if (!oldHighWatermark.equals(newHighWatermark)) {
+            if (oldHighWatermark.isPresent()) {
+                log.trace(
+                    "High watermark set to {} from {} for epoch {}",
+                    newHighWatermark,
+                    oldHighWatermark.get(),
+                    epoch
+                );
+            } else {
+                log.info(
+                    "High watermark set to {} for the first time for epoch {}",
+                    newHighWatermark,
+                    epoch
+                );
+            }
+        }
+    }
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 3aa7c0bfe75..ac0ef1260b9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -170,8 +170,10 @@ public class LeaderState<T> implements EpochState {
                     if (highWatermarkUpdateOffset > currentHighWatermarkMetadata.offset
                         || (highWatermarkUpdateOffset == currentHighWatermarkMetadata.offset &&
                             !highWatermarkUpdateMetadata.metadata.equals(currentHighWatermarkMetadata.metadata))) {
+                        Optional<LogOffsetMetadata> oldHighWatermark = highWatermark;
                         highWatermark = highWatermarkUpdateOpt;
                         logHighWatermarkUpdate(
+                            oldHighWatermark,
                             highWatermarkUpdateMetadata,
                             indexOfHw,
                             followersByDescendingFetchOffset
@@ -187,8 +189,10 @@ public class LeaderState<T> implements EpochState {
                         return false;
                     }
                 } else {
+                    Optional<LogOffsetMetadata> oldHighWatermark = highWatermark;
                     highWatermark = highWatermarkUpdateOpt;
                     logHighWatermarkUpdate(
+                        oldHighWatermark,
                         highWatermarkUpdateMetadata,
                         indexOfHw,
                         followersByDescendingFetchOffset
@@ -201,16 +205,28 @@ public class LeaderState<T> implements EpochState {
     }
 
     private void logHighWatermarkUpdate(
+        Optional<LogOffsetMetadata> oldHighWatermark,
         LogOffsetMetadata newHighWatermark,
         int indexOfHw,
         List<ReplicaState> followersByDescendingFetchOffset
     ) {
-        log.trace(
-            "High watermark set to {} based on indexOfHw {} and voters {}",
-            newHighWatermark,
-            indexOfHw,
-            followersByDescendingFetchOffset
-        );
+        if (oldHighWatermark.isPresent()) {
+            log.trace(
+                "High watermark set to {} from {} based on indexOfHw {} and voters {}",
+                newHighWatermark,
+                oldHighWatermark.get(),
+                indexOfHw,
+                followersByDescendingFetchOffset
+            );
+        } else {
+            log.info(
+                "High watermark set to {} for the first time for epoch {} based on indexOfHw {} and voters {}",
+                newHighWatermark,
+                epoch,
+                indexOfHw,
+                followersByDescendingFetchOffset
+            );
+        }
     }
 
     /**