You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/15 20:16:58 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #12508: KAFKA-13888: Addition of Information in DescribeQuorumResponse

hachikuji commented on code in PR #12508:
URL: https://github.com/apache/kafka/pull/12508#discussion_r946081969


##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -323,19 +372,30 @@ private static class ReplicaState implements Comparable<ReplicaState> {
         final int nodeId;
         Optional<LogOffsetMetadata> endOffset;
         OptionalLong lastFetchTimestamp;
+        OptionalLong lastfetchLeaderLogEndOffset;

Review Comment:
   nit: should be `lastFetchLeaderLogEndOffset`



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +779,52 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+  def createAdminClient(cluster: KafkaClusterTestKit, useController: Boolean): Admin = {
+    var props: Properties = null
+    props = cluster.clientProperties()
+    props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+    Admin.create(props)
+  }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val admin = createAdminClient(cluster, false)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo.get()
+
+        assertEquals(0, quorumInfo.observers.size)

Review Comment:
   This probably fails now that brokers send their replicaId.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +779,52 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+  def createAdminClient(cluster: KafkaClusterTestKit, useController: Boolean): Admin = {
+    var props: Properties = null
+    props = cluster.clientProperties()
+    props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+    Admin.create(props)
+  }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val admin = createAdminClient(cluster, false)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo.get()
+
+        assertEquals(0, quorumInfo.observers.size)
+        assertEquals(3, quorumInfo.voters.size)

Review Comment:
   A more straightforward way to assert the voter/observer sets is in `DescribeQuorumRequestTest`:
   ```scala
         val voterData = partitionData.currentVoters.asScala
         assertEquals(cluster.controllerIds().asScala, voterData.map(_.replicaId).toSet);
   
         val observerData = partitionData.observers.asScala
         assertEquals(cluster.brokerIds().asScala, observerData.map(_.replicaId).toSet);
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1179,8 +1179,12 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest(
             leaderState.localId(),
             leaderState.epoch(),
             leaderState.highWatermark().isPresent() ? leaderState.highWatermark().get().offset : -1,
-            convertToReplicaStates(leaderState.getVoterEndOffsets()),
-            convertToReplicaStates(leaderState.getObserverStates(currentTimeMs))
+            convertToReplicaStates(leaderState.getVoterEndOffsets(),
+                                    leaderState.getVoterLastFetchTimes(),

Review Comment:
   I think it would be a bit simpler to push creation of the `ReplicaState` object into `LeaderState`:
   
   ```java
   class LeaderState {
     ...
   
     List<ReplicaState> voterStates(long currentTimeMs);
     List<ReplicaState> observerStates(long currentTimeMs);
   }
   ```
   Then we wouldn't need all the boilerplate methods to create all the different Map variations. Also, then the sanity checks in `convertToReplicaStates` become unnecessary.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -217,19 +218,30 @@ public boolean updateLocalState(long fetchTimestamp, LogOffsetMetadata logOffset
      * @param replicaId replica id
      * @param fetchTimestamp fetch timestamp
      * @param logOffsetMetadata new log offset and metadata
+     * @param leaderLogEndOffset current log end offset of the leader
      * @return true if the high watermark is updated too
      */
-    public boolean updateReplicaState(int replicaId,
-                                      long fetchTimestamp,
-                                      LogOffsetMetadata logOffsetMetadata) {
+    public boolean updateReplicaState(
+            int replicaId,
+            long fetchTimestamp,
+            LogOffsetMetadata logOffsetMetadata,
+            Long leaderLogEndOffset
+    ) {
         // Ignore fetches from negative replica id, as it indicates
         // the fetch is from non-replica. For example, a consumer.
         if (replicaId < 0) {
             return false;
         }
 
         ReplicaState state = getReplicaState(replicaId);
-        state.updateFetchTimestamp(fetchTimestamp);
+        // Update the Last CaughtUp Time
+        if (logOffsetMetadata.offset >= leaderLogEndOffset) {
+            state.updateLastCaughtUpTimestamp(fetchTimestamp);

Review Comment:
   We do these updates before the call to `updateEndOffset`, which may raise an exception (same thing for `updateFetchTimestamp` which is an existing issue). I think it would be better to update state only after we have confirmed the update is valid.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org