You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/22 23:44:55 UTC

[GitHub] [kafka] hachikuji opened a new pull request, #12548: KAFKA-14142; Expose kraft high watermark update time in quorum command

hachikuji opened a new pull request, #12548:
URL: https://github.com/apache/kafka/pull/12548

   For debugging, it is useful to know not only the current raft high watermark, but also when it was updated. This gives an operator an easy way to tell whether the quorum is healthy through commit progress. This patch adds a new tagged field `HighWatermarkUpdateTimeMs` in the `DescribeQuorum` response. This is intended as a late addition to https://cwiki.apache.org/confluence/display/KAFKA/KIP-836%3A+Addition+of+Information+in+DescribeQuorumResponse+about+Voter+Lag.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12548: MINOR: A few cleanups for DescribeQuorum APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r953178449


##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -204,50 +198,80 @@ private boolean updateHighWatermark() {
         return false;
     }
 
+    private void setHighWatermark(
+        Optional<LogOffsetMetadata> newHighWatermark,

Review Comment:
   Yeah, I think perhaps I can just revert this change. It made more sense when the patch was also updating the update time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12548: MINOR: A few cleanups for DescribeQuorum APIs

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r953262169


##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -302,21 +302,112 @@ public void testGetNonLeaderFollowersByFetchOffsetDescending() {
     }
 
     @Test
-    public void testGetVoterStates() {
-        int node1 = 1;
-        int node2 = 2;
+    public void testDescribeQuorumWithSingleVoter() {
+        MockTime time = new MockTime();
         long leaderStartOffset = 10L;
         long leaderEndOffset = 15L;
 
-        LeaderState<?> state = setUpLeaderAndFollowers(node1, node2, leaderStartOffset, leaderEndOffset);
+        LeaderState<?> state = newLeaderState(mkSet(localId), leaderStartOffset);
+
+        // Until we have updated local state, high watermark should be uninitialized
+        assertEquals(Optional.empty(), state.highWatermark());
+        DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(-1, partitionData.highWatermark());
+        assertEquals(-1, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+        assertEquals(1, partitionData.currentVoters().size());
+        assertEquals(new DescribeQuorumResponseData.ReplicaState()
+                .setReplicaId(localId)
+                .setLogEndOffset(-1)
+                .setLastFetchTimestamp(time.milliseconds())
+                .setLastCaughtUpTimestamp(time.milliseconds()),
+            partitionData.currentVoters().get(0));
+
+
+        // Now update the high watermark and verify that describe output
+        long highWatermarkUpdateTimeMs = time.milliseconds();
+        assertTrue(state.updateLocalState(highWatermarkUpdateTimeMs, new LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark());
+
+        time.sleep(500);
+
+        partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(leaderEndOffset, partitionData.highWatermark());
+        assertEquals(highWatermarkUpdateTimeMs, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+        assertEquals(1, partitionData.currentVoters().size());
+        assertEquals(new DescribeQuorumResponseData.ReplicaState()
+                .setReplicaId(localId)
+                .setLogEndOffset(leaderEndOffset)
+                .setLastFetchTimestamp(time.milliseconds())
+                .setLastCaughtUpTimestamp(time.milliseconds()),
+            partitionData.currentVoters().get(0));
+    }
+
+    @Test
+    public void testDescribeQuorumWithMultipleVoters() {
+        MockTime time = new MockTime();
+        int activeFollowerId = 1;
+        int inactiveFollowerId = 2;
+        long leaderStartOffset = 10L;
+        long leaderEndOffset = 15L;
 
-        assertEquals(mkMap(
-            mkEntry(localId, leaderEndOffset),
-            mkEntry(node1, leaderStartOffset),
-            mkEntry(node2, leaderEndOffset)
-        ), state.quorumResponseVoterStates(0)
-            .stream()
-            .collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, DescribeQuorumResponseData.ReplicaState::logEndOffset)));
+        LeaderState<?> state = newLeaderState(mkSet(localId, activeFollowerId, inactiveFollowerId), leaderStartOffset);
+        assertFalse(state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.empty(), state.highWatermark());
+
+        long activeFollowerFetchTimeMs = time.milliseconds();
+        assertTrue(state.updateReplicaState(activeFollowerId, activeFollowerFetchTimeMs, new LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark());
+
+        time.sleep(500);
+
+        DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(leaderEndOffset, partitionData.highWatermark());
+        assertEquals(activeFollowerFetchTimeMs, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+
+        List<DescribeQuorumResponseData.ReplicaState> voterStates = partitionData.currentVoters();
+        assertEquals(3, voterStates.size());
+
+        DescribeQuorumResponseData.ReplicaState leaderState = voterStates.stream()
+            .filter(voterState -> voterState.replicaId() == localId)
+            .findFirst()
+            .orElseThrow(() -> new AssertionError(""));

Review Comment:
   Please update it and below. Thanks.



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -326,35 +417,57 @@ private LeaderState<?> setUpLeaderAndFollowers(int follower1,
         LeaderState<?> state = newLeaderState(mkSet(localId, follower1, follower2), leaderStartOffset);
         state.updateLocalState(0, new LogOffsetMetadata(leaderEndOffset));
         assertEquals(Optional.empty(), state.highWatermark());
-        state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset), leaderEndOffset);
-        state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset), leaderEndOffset);
+        state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset));
+        state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset));
         return state;
     }
 
     @Test
-    public void testGetObserverStatesWithObserver() {
+    public void testDescribeQuorumWithObservers() {
+        MockTime time = new MockTime();
         int observerId = 10;
         long epochStartOffset = 10L;
 
         LeaderState<?> state = newLeaderState(mkSet(localId), epochStartOffset);
-        long timestamp = 20L;
-        assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(epochStartOffset), epochStartOffset + 10));
-
-        assertEquals(Collections.singletonMap(observerId, epochStartOffset),
-                state.quorumResponseObserverStates(timestamp)
-                    .stream()
-                    .collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, DescribeQuorumResponseData.ReplicaState::logEndOffset)));
+        long highWatermarkUpdateTime = time.milliseconds();
+        assertTrue(state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(epochStartOffset + 1)));
+        assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark());
+
+        time.sleep(500);
+        long observerFetchTimeMs = time.milliseconds();
+        assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1)));
+
+        time.sleep(500);
+        DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(epochStartOffset + 1, partitionData.highWatermark());
+        assertEquals(highWatermarkUpdateTime, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+
+        List<DescribeQuorumResponseData.ReplicaState> observerStates = partitionData.observers();
+        assertEquals(1, observerStates.size());

Review Comment:
   Any comment here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on pull request #12548: KAFKA-14142; Expose kraft high watermark update time in quorum command

Posted by GitBox <gi...@apache.org>.
dengziming commented on PR #12548:
URL: https://github.com/apache/kafka/pull/12548#issuecomment-1223860360

   > Also, some `MetadataQuorumCommandTest` tests failed due to this change. FYI.
   
   This is a very simple problem, we changed the output of the tool so the assertion in the test case failed, we only need to adjust the test case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji merged pull request #12548: MINOR: A few cleanups for DescribeQuorum APIs

Posted by GitBox <gi...@apache.org>.
hachikuji merged PR #12548:
URL: https://github.com/apache/kafka/pull/12548


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12548: KAFKA-14142; Expose kraft high watermark update time in quorum command

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12548:
URL: https://github.com/apache/kafka/pull/12548#issuecomment-1223638274

   Also, some `MetadataQuorumCommandTest` tests failed due to this change. FYI. 
   cc @dengziming , who just updated the metadata quorum command.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12548: MINOR: A few cleanups for DescribeQuorum APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r953176589


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -125,15 +135,15 @@ public long logEndOffset() {
          * @return The value of the lastFetchTime if known, empty otherwise
          */
         public OptionalLong lastFetchTimeMs() {

Review Comment:
   Fixed in the latest commit.



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -125,15 +135,15 @@ public long logEndOffset() {
          * @return The value of the lastFetchTime if known, empty otherwise
          */
         public OptionalLong lastFetchTimeMs() {
-            return lastFetchTimeMs;
+            return lastFetchTimestamp;
         }
 
         /**
          * Return the lastCaughtUpTime in milliseconds for this replica.
          * @return The value of the lastCaughtUpTime if known, empty otherwise
          */
         public OptionalLong lastCaughtUpTimeMs() {

Review Comment:
   Fixed in the recent commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12548: MINOR: A few cleanups for DescribeQuorum APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r953179732


##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -359,31 +385,46 @@ private boolean isVoter(int remoteNodeId) {
     private static class ReplicaState implements Comparable<ReplicaState> {
         final int nodeId;
         Optional<LogOffsetMetadata> endOffset;
-        OptionalLong lastFetchTimestamp;
-        OptionalLong lastFetchLeaderLogEndOffset;
-        OptionalLong lastCaughtUpTimestamp;
+        long lastFetchTimestamp;
+        long lastFetchLeaderLogEndOffset;
+        long lastCaughtUpTimestamp;
         boolean hasAcknowledgedLeader;
 
         public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
             this.nodeId = nodeId;
             this.endOffset = Optional.empty();
-            this.lastFetchTimestamp = OptionalLong.empty();
-            this.lastFetchLeaderLogEndOffset = OptionalLong.empty();
-            this.lastCaughtUpTimestamp = OptionalLong.empty();
+            this.lastFetchTimestamp = -1;
+            this.lastFetchLeaderLogEndOffset = -1;
+            this.lastCaughtUpTimestamp = -1;
             this.hasAcknowledgedLeader = hasAcknowledgedLeader;
         }
 
-        void updateFetchTimestamp(long currentFetchTimeMs, long leaderLogEndOffset) {
-            // To be resilient to system time shifts we do not strictly
-            // require the timestamp be monotonically increasing.
-            lastFetchTimestamp = OptionalLong.of(Math.max(lastFetchTimestamp.orElse(-1L), currentFetchTimeMs));
-            lastFetchLeaderLogEndOffset = OptionalLong.of(leaderLogEndOffset);
+        void updateLeaderState(
+            LogOffsetMetadata endOffsetMetadata
+        ) {
+            // For the leader, we only update the end offset. The remaining fields
+            // (such as the caught up time) are determined implicitly.
+            this.endOffset = Optional.of(endOffsetMetadata);
         }
 
-        void updateLastCaughtUpTimestamp(long lastCaughtUpTime) {
-            // This value relies on the fetch timestamp which does not
-            // require monotonicity
-            lastCaughtUpTimestamp = OptionalLong.of(Math.max(lastCaughtUpTimestamp.orElse(-1L), lastCaughtUpTime));
+        void updateFollowerState(
+            long currentTimeMs,
+            LogOffsetMetadata fetchOffsetMetadata,
+            Optional<LogOffsetMetadata> leaderEndOffsetOpt
+        ) {
+            leaderEndOffsetOpt.ifPresent(leaderEndOffset -> {
+                if (fetchOffsetMetadata.offset >= leaderEndOffset.offset) {
+                    lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, currentTimeMs);
+                } else if (lastFetchLeaderLogEndOffset > 0
+                    && fetchOffsetMetadata.offset >= lastFetchLeaderLogEndOffset) {
+                    lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, lastFetchTimestamp);
+                }
+                lastFetchLeaderLogEndOffset = leaderEndOffset.offset;

Review Comment:
   Yeah, good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #12548: MINOR: A few cleanups for DescribeQuorum APIs

Posted by GitBox <gi...@apache.org>.
showuon commented on PR #12548:
URL: https://github.com/apache/kafka/pull/12548#issuecomment-1226684539

   Thanks @hachikuji , LGTM! Nice refactor!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12548: KAFKA-14142; Expose kraft high watermark update time in quorum command

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r952117995


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -63,20 +76,26 @@ public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         QuorumInfo that = (QuorumInfo) o;
-        return leaderId.equals(that.leaderId)
-            && voters.equals(that.voters)
-            && observers.equals(that.observers);
+        return leaderId == that.leaderId
+            && leaderEpoch == that.leaderEpoch
+            && highWatermark == that.highWatermark
+            && Objects.equals(highWatermarkUpdateTimeMs, that.highWatermarkUpdateTimeMs)
+            && Objects.equals(voters, that.voters)
+            && Objects.equals(observers, that.observers);

Review Comment:
   nice catch!



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -302,21 +302,112 @@ public void testGetNonLeaderFollowersByFetchOffsetDescending() {
     }
 
     @Test
-    public void testGetVoterStates() {
-        int node1 = 1;
-        int node2 = 2;
+    public void testDescribeQuorumWithSingleVoter() {
+        MockTime time = new MockTime();
         long leaderStartOffset = 10L;
         long leaderEndOffset = 15L;
 
-        LeaderState<?> state = setUpLeaderAndFollowers(node1, node2, leaderStartOffset, leaderEndOffset);
+        LeaderState<?> state = newLeaderState(mkSet(localId), leaderStartOffset);
+
+        // Until we have updated local state, high watermark should be uninitialized
+        assertEquals(Optional.empty(), state.highWatermark());
+        DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(-1, partitionData.highWatermark());
+        assertEquals(-1, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+        assertEquals(1, partitionData.currentVoters().size());
+        assertEquals(new DescribeQuorumResponseData.ReplicaState()
+                .setReplicaId(localId)
+                .setLogEndOffset(-1)
+                .setLastFetchTimestamp(time.milliseconds())
+                .setLastCaughtUpTimestamp(time.milliseconds()),
+            partitionData.currentVoters().get(0));
+
+
+        // Now update the high watermark and verify that describe output
+        long highWatermarkUpdateTimeMs = time.milliseconds();
+        assertTrue(state.updateLocalState(highWatermarkUpdateTimeMs, new LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark());
+
+        time.sleep(500);
+
+        partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(leaderEndOffset, partitionData.highWatermark());
+        assertEquals(highWatermarkUpdateTimeMs, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+        assertEquals(1, partitionData.currentVoters().size());
+        assertEquals(new DescribeQuorumResponseData.ReplicaState()
+                .setReplicaId(localId)
+                .setLogEndOffset(leaderEndOffset)
+                .setLastFetchTimestamp(time.milliseconds())
+                .setLastCaughtUpTimestamp(time.milliseconds()),
+            partitionData.currentVoters().get(0));
+    }
+
+    @Test
+    public void testDescribeQuorumWithMultipleVoters() {
+        MockTime time = new MockTime();
+        int activeFollowerId = 1;
+        int inactiveFollowerId = 2;
+        long leaderStartOffset = 10L;
+        long leaderEndOffset = 15L;
 
-        assertEquals(mkMap(
-            mkEntry(localId, leaderEndOffset),
-            mkEntry(node1, leaderStartOffset),
-            mkEntry(node2, leaderEndOffset)
-        ), state.quorumResponseVoterStates(0)
-            .stream()
-            .collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, DescribeQuorumResponseData.ReplicaState::logEndOffset)));
+        LeaderState<?> state = newLeaderState(mkSet(localId, activeFollowerId, inactiveFollowerId), leaderStartOffset);
+        assertFalse(state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.empty(), state.highWatermark());
+
+        long activeFollowerFetchTimeMs = time.milliseconds();
+        assertTrue(state.updateReplicaState(activeFollowerId, activeFollowerFetchTimeMs, new LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark());
+
+        time.sleep(500);
+
+        DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(leaderEndOffset, partitionData.highWatermark());
+        assertEquals(activeFollowerFetchTimeMs, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+
+        List<DescribeQuorumResponseData.ReplicaState> voterStates = partitionData.currentVoters();
+        assertEquals(3, voterStates.size());
+
+        DescribeQuorumResponseData.ReplicaState leaderState = voterStates.stream()
+            .filter(voterState -> voterState.replicaId() == localId)
+            .findFirst()
+            .orElseThrow(() -> new AssertionError(""));

Review Comment:
   Forgot to add the error message here and below.



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -24,32 +24,45 @@
  * This class is used to describe the state of the quorum received in DescribeQuorumResponse.
  */
 public class QuorumInfo {
-    private final Integer leaderId;
-    private final Integer leaderEpoch;
-    private final Long highWatermark;
+    private final int leaderId;
+    private final long leaderEpoch;

Review Comment:
   The `leaderEpoch` is in int32 type, any reason we change to use `long` here?
   
   https://github.com/apache/kafka/blob/add4ca6c7f1b289477aa7d6e918f5d22b78088fe/clients/src/main/resources/common/message/DescribeQuorumResponse.json#L37-L38



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -304,7 +311,7 @@ public long epochStartOffset() {
         return epochStartOffset;
     }
 
-    private ReplicaState getReplicaState(int remoteNodeId) {
+    private ReplicaState getOrCreateReplicaState(int remoteNodeId) {

Review Comment:
   nice renaming



##########
core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala:
##########
@@ -158,14 +158,15 @@ object MetadataQuorumCommand {
         -1
       }
     println(
-      s"""|ClusterId:              $clusterId
-          |LeaderId:               ${quorumInfo.leaderId}
-          |LeaderEpoch:            ${quorumInfo.leaderEpoch}
-          |HighWatermark:          ${quorumInfo.highWatermark}
-          |MaxFollowerLag:         $maxFollowerLag
-          |MaxFollowerLagTimeMs:   $maxFollowerLagTimeMs
-          |CurrentVoters:          ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")}
-          |CurrentObservers:       ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")}
+      s"""|ClusterId:                  $clusterId
+          |LeaderId:                   ${quorumInfo.leaderId}
+          |LeaderEpoch:                ${quorumInfo.leaderEpoch}
+          |HighWatermark:              ${quorumInfo.highWatermark}
+          |HighWatermarkUpdateTimeMs:  ${quorumInfo.highWatermarkUpdateTimeMs}

Review Comment:
   Since this is a script output, I think it'd better we did some pre-processing to avoid printing out something like: `OptionalLong[12345]` or `OptionalLong.empty`. 
   



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -326,35 +417,57 @@ private LeaderState<?> setUpLeaderAndFollowers(int follower1,
         LeaderState<?> state = newLeaderState(mkSet(localId, follower1, follower2), leaderStartOffset);
         state.updateLocalState(0, new LogOffsetMetadata(leaderEndOffset));
         assertEquals(Optional.empty(), state.highWatermark());
-        state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset), leaderEndOffset);
-        state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset), leaderEndOffset);
+        state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset));
+        state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset));
         return state;
     }
 
     @Test
-    public void testGetObserverStatesWithObserver() {
+    public void testDescribeQuorumWithObservers() {
+        MockTime time = new MockTime();
         int observerId = 10;
         long epochStartOffset = 10L;
 
         LeaderState<?> state = newLeaderState(mkSet(localId), epochStartOffset);
-        long timestamp = 20L;
-        assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(epochStartOffset), epochStartOffset + 10));
-
-        assertEquals(Collections.singletonMap(observerId, epochStartOffset),
-                state.quorumResponseObserverStates(timestamp)
-                    .stream()
-                    .collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, DescribeQuorumResponseData.ReplicaState::logEndOffset)));
+        long highWatermarkUpdateTime = time.milliseconds();
+        assertTrue(state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(epochStartOffset + 1)));
+        assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark());
+
+        time.sleep(500);
+        long observerFetchTimeMs = time.milliseconds();
+        assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1)));
+
+        time.sleep(500);
+        DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(epochStartOffset + 1, partitionData.highWatermark());
+        assertEquals(highWatermarkUpdateTime, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+
+        List<DescribeQuorumResponseData.ReplicaState> observerStates = partitionData.observers();
+        assertEquals(1, observerStates.size());

Review Comment:
   Should we also verify `voters` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #12548: KAFKA-14142; Expose kraft high watermark update time in quorum command

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #12548:
URL: https://github.com/apache/kafka/pull/12548#issuecomment-1224954292

   Just a general note, I'm repurposing this patch as a general cleanup of the new DescribeQuorum APIs. I changed my mind about `HighWatermarkUpdateTimeMs`. I think it can be inferred from the existing state, even if imperfectly. I don't feel like the exact value adds that much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #12548: MINOR: A few cleanups for DescribeQuorum APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #12548:
URL: https://github.com/apache/kafka/pull/12548#issuecomment-1226006637

   @showuon Apologies for missing your comments. I pushed an update to address them. Thanks for reviewing!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12548: MINOR: A few cleanups for DescribeQuorum APIs

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r953171207


##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -313,43 +312,70 @@ private ReplicaState getReplicaState(int remoteNodeId) {
         return state;
     }
 
-    List<DescribeQuorumResponseData.ReplicaState> quorumResponseVoterStates(long currentTimeMs) {
-        return quorumResponseReplicaStates(voterStates.values(), localId, currentTimeMs);
+    public DescribeQuorumResponseData.PartitionData describeQuorum(long currentTimeMs) {
+        clearInactiveObservers(currentTimeMs);
+
+        return new DescribeQuorumResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code())
+            .setLeaderId(localId)
+            .setLeaderEpoch(epoch)
+            .setHighWatermark(highWatermark().map(offsetMetadata -> offsetMetadata.offset).orElse(-1L))
+            .setCurrentVoters(describeReplicaStates(voterStates, currentTimeMs))
+            .setObservers(describeReplicaStates(observerStates, currentTimeMs));
     }
 
-    List<DescribeQuorumResponseData.ReplicaState> quorumResponseObserverStates(long currentTimeMs) {
-        clearInactiveObservers(currentTimeMs);
-        return quorumResponseReplicaStates(observerStates.values(), localId, currentTimeMs);
+    // Visible for testing
+    DescribeQuorumResponseData.ReplicaState describeVoterState(

Review Comment:
   Hmm. Okay. Since we are adding this for testing only, I am wondering if we should have tests call `describeQuorum(long)` and have a test utility that filters and returns the `ReplicaState` for a give voter or observer.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -204,50 +198,80 @@ private boolean updateHighWatermark() {
         return false;
     }
 
+    private void setHighWatermark(
+        Optional<LogOffsetMetadata> newHighWatermark,

Review Comment:
   Looks like based on the call sites `newHighWatermark.isPresent()` is always `true`. Should we document that this method takes a `Optional<LogOffsetMetadata>` so that we don't allocate a new object? Should this method "assert" that `newHighWatermark.isPresent()` is always `true`?



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -125,15 +135,15 @@ public long logEndOffset() {
          * @return The value of the lastFetchTime if known, empty otherwise
          */
         public OptionalLong lastFetchTimeMs() {

Review Comment:
   Let's fix this to `lastFetchTimestamp`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -125,15 +135,15 @@ public long logEndOffset() {
          * @return The value of the lastFetchTime if known, empty otherwise
          */
         public OptionalLong lastFetchTimeMs() {
-            return lastFetchTimeMs;
+            return lastFetchTimestamp;
         }
 
         /**
          * Return the lastCaughtUpTime in milliseconds for this replica.
          * @return The value of the lastCaughtUpTime if known, empty otherwise
          */
         public OptionalLong lastCaughtUpTimeMs() {

Review Comment:
   Let's fix this to `lastCaughtUpTimestamp`.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -359,31 +385,46 @@ private boolean isVoter(int remoteNodeId) {
     private static class ReplicaState implements Comparable<ReplicaState> {
         final int nodeId;
         Optional<LogOffsetMetadata> endOffset;
-        OptionalLong lastFetchTimestamp;
-        OptionalLong lastFetchLeaderLogEndOffset;
-        OptionalLong lastCaughtUpTimestamp;
+        long lastFetchTimestamp;
+        long lastFetchLeaderLogEndOffset;
+        long lastCaughtUpTimestamp;
         boolean hasAcknowledgedLeader;
 
         public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
             this.nodeId = nodeId;
             this.endOffset = Optional.empty();
-            this.lastFetchTimestamp = OptionalLong.empty();
-            this.lastFetchLeaderLogEndOffset = OptionalLong.empty();
-            this.lastCaughtUpTimestamp = OptionalLong.empty();
+            this.lastFetchTimestamp = -1;
+            this.lastFetchLeaderLogEndOffset = -1;
+            this.lastCaughtUpTimestamp = -1;
             this.hasAcknowledgedLeader = hasAcknowledgedLeader;
         }
 
-        void updateFetchTimestamp(long currentFetchTimeMs, long leaderLogEndOffset) {
-            // To be resilient to system time shifts we do not strictly
-            // require the timestamp be monotonically increasing.
-            lastFetchTimestamp = OptionalLong.of(Math.max(lastFetchTimestamp.orElse(-1L), currentFetchTimeMs));
-            lastFetchLeaderLogEndOffset = OptionalLong.of(leaderLogEndOffset);
+        void updateLeaderState(
+            LogOffsetMetadata endOffsetMetadata
+        ) {
+            // For the leader, we only update the end offset. The remaining fields
+            // (such as the caught up time) are determined implicitly.
+            this.endOffset = Optional.of(endOffsetMetadata);
         }
 
-        void updateLastCaughtUpTimestamp(long lastCaughtUpTime) {
-            // This value relies on the fetch timestamp which does not
-            // require monotonicity
-            lastCaughtUpTimestamp = OptionalLong.of(Math.max(lastCaughtUpTimestamp.orElse(-1L), lastCaughtUpTime));
+        void updateFollowerState(
+            long currentTimeMs,
+            LogOffsetMetadata fetchOffsetMetadata,
+            Optional<LogOffsetMetadata> leaderEndOffsetOpt
+        ) {
+            leaderEndOffsetOpt.ifPresent(leaderEndOffset -> {
+                if (fetchOffsetMetadata.offset >= leaderEndOffset.offset) {
+                    lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, currentTimeMs);
+                } else if (lastFetchLeaderLogEndOffset > 0
+                    && fetchOffsetMetadata.offset >= lastFetchLeaderLogEndOffset) {
+                    lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, lastFetchTimestamp);
+                }
+                lastFetchLeaderLogEndOffset = leaderEndOffset.offset;

Review Comment:
   What do you think about writing a comment explaining that the order of these two statements is important? For example, if `lastFetchLeaderLogEndOffset = leaderEndOffset.offset` is moved to before the `if` this algorithm doesn't work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12548: MINOR: A few cleanups for DescribeQuorum APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r953181785


##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -313,43 +312,70 @@ private ReplicaState getReplicaState(int remoteNodeId) {
         return state;
     }
 
-    List<DescribeQuorumResponseData.ReplicaState> quorumResponseVoterStates(long currentTimeMs) {
-        return quorumResponseReplicaStates(voterStates.values(), localId, currentTimeMs);
+    public DescribeQuorumResponseData.PartitionData describeQuorum(long currentTimeMs) {
+        clearInactiveObservers(currentTimeMs);
+
+        return new DescribeQuorumResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code())
+            .setLeaderId(localId)
+            .setLeaderEpoch(epoch)
+            .setHighWatermark(highWatermark().map(offsetMetadata -> offsetMetadata.offset).orElse(-1L))
+            .setCurrentVoters(describeReplicaStates(voterStates, currentTimeMs))
+            .setObservers(describeReplicaStates(observerStates, currentTimeMs));
     }
 
-    List<DescribeQuorumResponseData.ReplicaState> quorumResponseObserverStates(long currentTimeMs) {
-        clearInactiveObservers(currentTimeMs);
-        return quorumResponseReplicaStates(observerStates.values(), localId, currentTimeMs);
+    // Visible for testing
+    DescribeQuorumResponseData.ReplicaState describeVoterState(

Review Comment:
   Makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12548: KAFKA-14142; Expose kraft high watermark update time in quorum command

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r952058593


##########
clients/src/main/resources/common/message/DescribeQuorumResponse.json:
##########
@@ -38,7 +38,9 @@
           "about": "The latest known leader epoch"},
         { "name": "HighWatermark", "type": "int64", "versions": "0+"},

Review Comment:
   Does this return `-1` if the high watermark offset is not known? If so, how about adding an about that explain this.



##########
clients/src/main/resources/common/message/DescribeQuorumResponse.json:
##########
@@ -38,7 +38,9 @@
           "about": "The latest known leader epoch"},
         { "name": "HighWatermark", "type": "int64", "versions": "0+"},
         { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
-        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
+        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" },
+        { "name": "HighWatermarkUpdateTimeMs", "type": "int64", "default": -1, "tag": 0,
+          "taggedVersions": "1+", "ignorable": true, "about": "The wall clock time that the high watermark was last updated on the leader" }

Review Comment:
   This comment applies to all of the places where you added the `Ms` suffix.
   
   Should it be `HighWatermarkUpdateTimestamp`? I was under the impression that in Kafka the suffix `Timestamp` means wall clock time while the suffix `Ms` means a duration or time span.
   
   We should document that `-1` means that the high watermark update timestamp is not know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12548: MINOR: A few cleanups for DescribeQuorum APIs

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r953190262


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -125,15 +135,15 @@ public long logEndOffset() {
          * @return The value of the lastFetchTime if known, empty otherwise
          */
         public OptionalLong lastFetchTimeMs() {

Review Comment:
   Hmm. Please double check. It looks like you fixed the `lastCaughtUpTimestamp()` methods but not this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12548: MINOR: A few cleanups for DescribeQuorum APIs

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r953193355


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -125,15 +135,15 @@ public long logEndOffset() {
          * @return The value of the lastFetchTime if known, empty otherwise
          */
         public OptionalLong lastFetchTimeMs() {

Review Comment:
   Yeah, you're right. I updated the comment, but forgot to change the method name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org