You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/23 06:57:12 UTC

[GitHub] [kafka] showuon commented on a diff in pull request #12548: KAFKA-14142; Expose kraft high watermark update time in quorum command

showuon commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r952117995


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -63,20 +76,26 @@ public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         QuorumInfo that = (QuorumInfo) o;
-        return leaderId.equals(that.leaderId)
-            && voters.equals(that.voters)
-            && observers.equals(that.observers);
+        return leaderId == that.leaderId
+            && leaderEpoch == that.leaderEpoch
+            && highWatermark == that.highWatermark
+            && Objects.equals(highWatermarkUpdateTimeMs, that.highWatermarkUpdateTimeMs)
+            && Objects.equals(voters, that.voters)
+            && Objects.equals(observers, that.observers);

Review Comment:
   nice catch!



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -302,21 +302,112 @@ public void testGetNonLeaderFollowersByFetchOffsetDescending() {
     }
 
     @Test
-    public void testGetVoterStates() {
-        int node1 = 1;
-        int node2 = 2;
+    public void testDescribeQuorumWithSingleVoter() {
+        MockTime time = new MockTime();
         long leaderStartOffset = 10L;
         long leaderEndOffset = 15L;
 
-        LeaderState<?> state = setUpLeaderAndFollowers(node1, node2, leaderStartOffset, leaderEndOffset);
+        LeaderState<?> state = newLeaderState(mkSet(localId), leaderStartOffset);
+
+        // Until we have updated local state, high watermark should be uninitialized
+        assertEquals(Optional.empty(), state.highWatermark());
+        DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(-1, partitionData.highWatermark());
+        assertEquals(-1, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+        assertEquals(1, partitionData.currentVoters().size());
+        assertEquals(new DescribeQuorumResponseData.ReplicaState()
+                .setReplicaId(localId)
+                .setLogEndOffset(-1)
+                .setLastFetchTimestamp(time.milliseconds())
+                .setLastCaughtUpTimestamp(time.milliseconds()),
+            partitionData.currentVoters().get(0));
+
+
+        // Now update the high watermark and verify that describe output
+        long highWatermarkUpdateTimeMs = time.milliseconds();
+        assertTrue(state.updateLocalState(highWatermarkUpdateTimeMs, new LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark());
+
+        time.sleep(500);
+
+        partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(leaderEndOffset, partitionData.highWatermark());
+        assertEquals(highWatermarkUpdateTimeMs, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+        assertEquals(1, partitionData.currentVoters().size());
+        assertEquals(new DescribeQuorumResponseData.ReplicaState()
+                .setReplicaId(localId)
+                .setLogEndOffset(leaderEndOffset)
+                .setLastFetchTimestamp(time.milliseconds())
+                .setLastCaughtUpTimestamp(time.milliseconds()),
+            partitionData.currentVoters().get(0));
+    }
+
+    @Test
+    public void testDescribeQuorumWithMultipleVoters() {
+        MockTime time = new MockTime();
+        int activeFollowerId = 1;
+        int inactiveFollowerId = 2;
+        long leaderStartOffset = 10L;
+        long leaderEndOffset = 15L;
 
-        assertEquals(mkMap(
-            mkEntry(localId, leaderEndOffset),
-            mkEntry(node1, leaderStartOffset),
-            mkEntry(node2, leaderEndOffset)
-        ), state.quorumResponseVoterStates(0)
-            .stream()
-            .collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, DescribeQuorumResponseData.ReplicaState::logEndOffset)));
+        LeaderState<?> state = newLeaderState(mkSet(localId, activeFollowerId, inactiveFollowerId), leaderStartOffset);
+        assertFalse(state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.empty(), state.highWatermark());
+
+        long activeFollowerFetchTimeMs = time.milliseconds();
+        assertTrue(state.updateReplicaState(activeFollowerId, activeFollowerFetchTimeMs, new LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark());
+
+        time.sleep(500);
+
+        DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(leaderEndOffset, partitionData.highWatermark());
+        assertEquals(activeFollowerFetchTimeMs, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+
+        List<DescribeQuorumResponseData.ReplicaState> voterStates = partitionData.currentVoters();
+        assertEquals(3, voterStates.size());
+
+        DescribeQuorumResponseData.ReplicaState leaderState = voterStates.stream()
+            .filter(voterState -> voterState.replicaId() == localId)
+            .findFirst()
+            .orElseThrow(() -> new AssertionError(""));

Review Comment:
   Forgot to add the error message here and below.



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -24,32 +24,45 @@
  * This class is used to describe the state of the quorum received in DescribeQuorumResponse.
  */
 public class QuorumInfo {
-    private final Integer leaderId;
-    private final Integer leaderEpoch;
-    private final Long highWatermark;
+    private final int leaderId;
+    private final long leaderEpoch;

Review Comment:
   The `leaderEpoch` is in int32 type, any reason we change to use `long` here?
   
   https://github.com/apache/kafka/blob/add4ca6c7f1b289477aa7d6e918f5d22b78088fe/clients/src/main/resources/common/message/DescribeQuorumResponse.json#L37-L38



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -304,7 +311,7 @@ public long epochStartOffset() {
         return epochStartOffset;
     }
 
-    private ReplicaState getReplicaState(int remoteNodeId) {
+    private ReplicaState getOrCreateReplicaState(int remoteNodeId) {

Review Comment:
   nice renaming



##########
core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala:
##########
@@ -158,14 +158,15 @@ object MetadataQuorumCommand {
         -1
       }
     println(
-      s"""|ClusterId:              $clusterId
-          |LeaderId:               ${quorumInfo.leaderId}
-          |LeaderEpoch:            ${quorumInfo.leaderEpoch}
-          |HighWatermark:          ${quorumInfo.highWatermark}
-          |MaxFollowerLag:         $maxFollowerLag
-          |MaxFollowerLagTimeMs:   $maxFollowerLagTimeMs
-          |CurrentVoters:          ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")}
-          |CurrentObservers:       ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")}
+      s"""|ClusterId:                  $clusterId
+          |LeaderId:                   ${quorumInfo.leaderId}
+          |LeaderEpoch:                ${quorumInfo.leaderEpoch}
+          |HighWatermark:              ${quorumInfo.highWatermark}
+          |HighWatermarkUpdateTimeMs:  ${quorumInfo.highWatermarkUpdateTimeMs}

Review Comment:
   Since this is a script output, I think it'd better we did some pre-processing to avoid printing out something like: `OptionalLong[12345]` or `OptionalLong.empty`. 
   



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -326,35 +417,57 @@ private LeaderState<?> setUpLeaderAndFollowers(int follower1,
         LeaderState<?> state = newLeaderState(mkSet(localId, follower1, follower2), leaderStartOffset);
         state.updateLocalState(0, new LogOffsetMetadata(leaderEndOffset));
         assertEquals(Optional.empty(), state.highWatermark());
-        state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset), leaderEndOffset);
-        state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset), leaderEndOffset);
+        state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset));
+        state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset));
         return state;
     }
 
     @Test
-    public void testGetObserverStatesWithObserver() {
+    public void testDescribeQuorumWithObservers() {
+        MockTime time = new MockTime();
         int observerId = 10;
         long epochStartOffset = 10L;
 
         LeaderState<?> state = newLeaderState(mkSet(localId), epochStartOffset);
-        long timestamp = 20L;
-        assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(epochStartOffset), epochStartOffset + 10));
-
-        assertEquals(Collections.singletonMap(observerId, epochStartOffset),
-                state.quorumResponseObserverStates(timestamp)
-                    .stream()
-                    .collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, DescribeQuorumResponseData.ReplicaState::logEndOffset)));
+        long highWatermarkUpdateTime = time.milliseconds();
+        assertTrue(state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(epochStartOffset + 1)));
+        assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark());
+
+        time.sleep(500);
+        long observerFetchTimeMs = time.milliseconds();
+        assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1)));
+
+        time.sleep(500);
+        DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(epochStartOffset + 1, partitionData.highWatermark());
+        assertEquals(highWatermarkUpdateTime, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+
+        List<DescribeQuorumResponseData.ReplicaState> observerStates = partitionData.observers();
+        assertEquals(1, observerStates.size());

Review Comment:
   Should we also verify `voters` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org