You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/23 23:02:23 UTC

[GitHub] [kafka] jsancio commented on a diff in pull request #12548: MINOR: A few cleanups for DescribeQuorum APIs

jsancio commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r953171207


##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -313,43 +312,70 @@ private ReplicaState getReplicaState(int remoteNodeId) {
         return state;
     }
 
-    List<DescribeQuorumResponseData.ReplicaState> quorumResponseVoterStates(long currentTimeMs) {
-        return quorumResponseReplicaStates(voterStates.values(), localId, currentTimeMs);
+    public DescribeQuorumResponseData.PartitionData describeQuorum(long currentTimeMs) {
+        clearInactiveObservers(currentTimeMs);
+
+        return new DescribeQuorumResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code())
+            .setLeaderId(localId)
+            .setLeaderEpoch(epoch)
+            .setHighWatermark(highWatermark().map(offsetMetadata -> offsetMetadata.offset).orElse(-1L))
+            .setCurrentVoters(describeReplicaStates(voterStates, currentTimeMs))
+            .setObservers(describeReplicaStates(observerStates, currentTimeMs));
     }
 
-    List<DescribeQuorumResponseData.ReplicaState> quorumResponseObserverStates(long currentTimeMs) {
-        clearInactiveObservers(currentTimeMs);
-        return quorumResponseReplicaStates(observerStates.values(), localId, currentTimeMs);
+    // Visible for testing
+    DescribeQuorumResponseData.ReplicaState describeVoterState(

Review Comment:
   Hmm. Okay. Since we are adding this for testing only, I am wondering if we should have tests call `describeQuorum(long)` and have a test utility that filters and returns the `ReplicaState` for a give voter or observer.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -204,50 +198,80 @@ private boolean updateHighWatermark() {
         return false;
     }
 
+    private void setHighWatermark(
+        Optional<LogOffsetMetadata> newHighWatermark,

Review Comment:
   Looks like based on the call sites `newHighWatermark.isPresent()` is always `true`. Should we document that this method takes a `Optional<LogOffsetMetadata>` so that we don't allocate a new object? Should this method "assert" that `newHighWatermark.isPresent()` is always `true`?



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -125,15 +135,15 @@ public long logEndOffset() {
          * @return The value of the lastFetchTime if known, empty otherwise
          */
         public OptionalLong lastFetchTimeMs() {

Review Comment:
   Let's fix this to `lastFetchTimestamp`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -125,15 +135,15 @@ public long logEndOffset() {
          * @return The value of the lastFetchTime if known, empty otherwise
          */
         public OptionalLong lastFetchTimeMs() {
-            return lastFetchTimeMs;
+            return lastFetchTimestamp;
         }
 
         /**
          * Return the lastCaughtUpTime in milliseconds for this replica.
          * @return The value of the lastCaughtUpTime if known, empty otherwise
          */
         public OptionalLong lastCaughtUpTimeMs() {

Review Comment:
   Let's fix this to `lastCaughtUpTimestamp`.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -359,31 +385,46 @@ private boolean isVoter(int remoteNodeId) {
     private static class ReplicaState implements Comparable<ReplicaState> {
         final int nodeId;
         Optional<LogOffsetMetadata> endOffset;
-        OptionalLong lastFetchTimestamp;
-        OptionalLong lastFetchLeaderLogEndOffset;
-        OptionalLong lastCaughtUpTimestamp;
+        long lastFetchTimestamp;
+        long lastFetchLeaderLogEndOffset;
+        long lastCaughtUpTimestamp;
         boolean hasAcknowledgedLeader;
 
         public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
             this.nodeId = nodeId;
             this.endOffset = Optional.empty();
-            this.lastFetchTimestamp = OptionalLong.empty();
-            this.lastFetchLeaderLogEndOffset = OptionalLong.empty();
-            this.lastCaughtUpTimestamp = OptionalLong.empty();
+            this.lastFetchTimestamp = -1;
+            this.lastFetchLeaderLogEndOffset = -1;
+            this.lastCaughtUpTimestamp = -1;
             this.hasAcknowledgedLeader = hasAcknowledgedLeader;
         }
 
-        void updateFetchTimestamp(long currentFetchTimeMs, long leaderLogEndOffset) {
-            // To be resilient to system time shifts we do not strictly
-            // require the timestamp be monotonically increasing.
-            lastFetchTimestamp = OptionalLong.of(Math.max(lastFetchTimestamp.orElse(-1L), currentFetchTimeMs));
-            lastFetchLeaderLogEndOffset = OptionalLong.of(leaderLogEndOffset);
+        void updateLeaderState(
+            LogOffsetMetadata endOffsetMetadata
+        ) {
+            // For the leader, we only update the end offset. The remaining fields
+            // (such as the caught up time) are determined implicitly.
+            this.endOffset = Optional.of(endOffsetMetadata);
         }
 
-        void updateLastCaughtUpTimestamp(long lastCaughtUpTime) {
-            // This value relies on the fetch timestamp which does not
-            // require monotonicity
-            lastCaughtUpTimestamp = OptionalLong.of(Math.max(lastCaughtUpTimestamp.orElse(-1L), lastCaughtUpTime));
+        void updateFollowerState(
+            long currentTimeMs,
+            LogOffsetMetadata fetchOffsetMetadata,
+            Optional<LogOffsetMetadata> leaderEndOffsetOpt
+        ) {
+            leaderEndOffsetOpt.ifPresent(leaderEndOffset -> {
+                if (fetchOffsetMetadata.offset >= leaderEndOffset.offset) {
+                    lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, currentTimeMs);
+                } else if (lastFetchLeaderLogEndOffset > 0
+                    && fetchOffsetMetadata.offset >= lastFetchLeaderLogEndOffset) {
+                    lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, lastFetchTimestamp);
+                }
+                lastFetchLeaderLogEndOffset = leaderEndOffset.offset;

Review Comment:
   What do you think about writing a comment explaining that the order of these two statements is important? For example, if `lastFetchLeaderLogEndOffset = leaderEndOffset.offset` is moved to before the `if` this algorithm doesn't work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org