You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/24 21:44:56 UTC

[GitHub] [kafka] niket-goel opened a new pull request, #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

niket-goel opened a new pull request, #12206:
URL: https://github.com/apache/kafka/pull/12206

   
   This commit adds an Admin API handler for DescribeQuorum Request and also
   adds in two new fields LastFetchTimestamp and LastCaughtUpTimestamp to
   the DescribeQuorumResponse as described by KIP-836.
   
   This commit does not implement the newly added fields. Those will be
   added in a subsequent commit.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891768433


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.

Review Comment:
   Yeah, I agree with this. Maybe we can just say that this class contains useful debugging state for KRaft replication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891613523


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,94 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                try {
+                    if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(quorumResponse.data().errorCode()).exception();
+                    }
+                    if (quorumResponse.data().topics().size() > 1) {
+                        String msg = String.format("DescribeMetadataQuorum received {} topics when 1 was expected",
+                                quorumResponse.data().topics().size());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0);
+                    if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+                        String msg = String.format("DescribeMetadataQuorum received a topic with name {} when {} was expected",
+                                topic.topicName(), METADATA_TOPIC_NAME);
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    if (topic.partitions().size() > 1) {
+                        String msg = String.format("DescribeMetadataQuorum received a topic {} with {} partitions when 1 was expected",
+                                topic.topicName(), topic.partitions().size());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0);
+                    if (partition.partitionIndex() != METADATA_TOPIC_PARTITION.partition()) {
+                        String msg = String.format("DescribeMetadataQuorum received a single partition with index {} when {} was expected",
+                                partition.partitionIndex(), METADATA_TOPIC_PARTITION.partition());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    if (partition.errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(partition.errorCode()).exception();
+                    }
+                    future.complete(createQuorumResult(partition));
+                } catch (RuntimeException e) {
+                    throw e;
+                } catch (Exception e) {

Review Comment:
   `UnknownServerException` extends `KafkaException`, which extends `RuntimeException`. So I think all the errors that we are raising above get re-thrown in the previous `catch`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r881020616


##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1382,6 +1382,23 @@ default DescribeFeaturesResult describeFeatures() {
         return describeFeatures(new DescribeFeaturesOptions());
     }
 
+    /**
+     * Describe the state of the raft quorum
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+     * the returned {@code DescribeQuorumResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+     *   If the authenticated user didn't have {@code DESCRIBE} access to the cluster.</li>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the controller could list the cluster links.</li>
+     * </ul>
+     *
+     * @param options The options to use when describing the quorum.
+     * @return The DescribeQuorumResult.
+     */
+    DescribeQuorumResult describeQuorum(DescribeQuorumOptions options);

Review Comment:
   By the way, usually these APIs have a default option which leaves out the `XOptions` class. For example, see `describeFeatures()` above?
   
   Also nit: can we not split up the `describeFeatures` APIs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r890986175


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4337,25 +4337,22 @@ public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuoru
         final Call call = new Call(
                 "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
 
-            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
-                Integer partition = 0;
-                String topicName = response.getTopicNameByIndex(0);
-                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
                 List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
                 List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
-                response.getVoterInfo(topicName, partition).forEach(v -> {
+                partition.currentVoters().forEach(v -> {
                     voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
                                 v.logEndOffset(),
                                 OptionalLong.of(v.lastFetchTimestamp()),
                                 OptionalLong.of(v.lastCaughtUpTimestamp())));

Review Comment:
   When `lastFetchTimestamp` or `lastCaughtUpTimestamp` are not provided (equals to -1), don't we want to return an empty option instead of returning an option containing -1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on PR #12206:
URL: https://github.com/apache/kafka/pull/12206#issuecomment-1154209076

   Thanks for the in-depth review @dajac @hachikuji and @dengziming. I have updated the PR with the final NITs from @dajac fixed. Are we good to merge with 1 LGTM or do we need 1 more?
   
   > Also in response to _"Is it the expected behavior to have zero observers?"_
   
   @dengziming also raised the same question in a previous comment, and we decided to tackle that in a separate PR. I will cut a JIRA for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r889423970


##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,

Review Comment:
   We can't use `NodeApiVersions.create()` here due to a bug here, which I'm trying to fix here: https://github.com/apache/kafka/pull/11784



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r882249160


##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1446,6 +1446,35 @@ default DescribeFeaturesResult describeFeatures() {
      */
     UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);
 
+    /**
+     * Describes the state of the metadata quorum.
+     * <p>
+     * This is a convenience method for {@link #describeMetadataQuorum(DescribeMetadataQuorumOptions)}  with default options.
+     * See the overload for more details.
+     *
+     * @return the {@link DescribeMetadataQuorumResult} containing the result
+     */
+    default DescribeMetadataQuorumResult describeMetadataQuorum() {

Review Comment:
   I will update the KIP and the thread once this PR is approved to reflect the changes here.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dengziming commented on PR #12206:
URL: https://github.com/apache/kafka/pull/12206#issuecomment-1138359588

   @niket-goel Thanks for the reply, I left 2 minor comments. I still find the class name is `DescribeQuorumIntegrationTest` but the file name is `DescribeQuorumTest.scala`, you should make them the same, I run your newly added tests locally but some of them failed, maybe there are still some bugs to be fixed.
   
   For the last problem,  I mean we can also get quorum directly from the controller, I can give you an example:
   
   For a single node KRaft cluster, we can get quorum info from BrokerServer:
   ```
   properties.put("bootstrap.servers",  "localhost:9092"); // connect to BrokerServer 9092
   AdminClient client = AdminClient.create(Properties)
   client.describeMetadataQuorum()
   ```
   
   We can also connect to ControllerServer:
   ```
   properties.put("bootstrap.servers",  "localhost:9093"); // connect to ControllerServer 9093
   AdminClient client = AdminClient.create(Properties)
   client.describeMetadataQuorum()
   ```
   
   We can leave this problem to a separate PR since it's unrelated here.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891766251


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,73 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(0);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                response.getVoterInfo(topicName, partition).forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                response.getObserverInfo(topicName, partition).forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {

Review Comment:
   Nevermind, this is an override.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on PR #12206:
URL: https://github.com/apache/kafka/pull/12206#issuecomment-1155361127

   I agree with you guys. I have pushed a commit to remove the flaky test for now. How about we merge this PR as is (without the failing test) and then follow up on KAFKA-13940 to enable it back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r882483263


##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -725,4 +725,34 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() = {

Review Comment:
   nit: add : Unit after method



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -725,4 +725,34 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(KafkaRaftServer.MetadataTopic, quorumInfo.topic())
+        assertEquals(3, quorumInfo.voters.size())

Review Comment:
   we can also check observers here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r881043900


##########
clients/src/main/java/org/apache/kafka/common/utils/QuorumInfo.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.List;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+  private final String topic;
+  private final Integer leaderId;
+  private final List<ReplicaState> voters;
+  private final List<ReplicaState> observers;
+
+  public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+    this.topic = topic;
+    this.leaderId = leaderId;
+    this.voters = voters;
+    this.observers = observers;
+  }
+
+  public String topic() {
+    return topic;
+  }
+
+  public Integer leaderId() {
+    return leaderId;
+  }
+
+  public List<ReplicaState> voters() {
+    return voters;
+  }
+
+  public List<ReplicaState> observers() {
+    return observers;
+  }
+
+  public static class ReplicaState {
+    private final int replicaId;
+    private final long logEndOffset;
+    private final long lastFetchTimeMs;
+    private final long lastCaughtUpTimeMs;
+
+    public ReplicaState(int replicaId, long logEndOffset) {

Review Comment:
   Sorry, I did not get what you meant. We need access to it in `KafkaAdminClient.java`. Are you suggesting it be protected? or have a builder instead of exposing the constructor? Or building it through the `QuorumInfo` itself?



##########
clients/src/main/java/org/apache/kafka/common/utils/QuorumInfo.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.List;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+  private final String topic;
+  private final Integer leaderId;
+  private final List<ReplicaState> voters;
+  private final List<ReplicaState> observers;
+
+  public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+    this.topic = topic;
+    this.leaderId = leaderId;
+    this.voters = voters;
+    this.observers = observers;
+  }
+
+  public String topic() {
+    return topic;
+  }
+
+  public Integer leaderId() {
+    return leaderId;
+  }
+
+  public List<ReplicaState> voters() {
+    return voters;
+  }
+
+  public List<ReplicaState> observers() {
+    return observers;
+  }
+
+  public static class ReplicaState {
+    private final int replicaId;
+    private final long logEndOffset;
+    private final long lastFetchTimeMs;
+    private final long lastCaughtUpTimeMs;
+
+    public ReplicaState(int replicaId, long logEndOffset) {
+      this.replicaId = replicaId;
+      this.logEndOffset = logEndOffset;
+      this.lastFetchTimeMs = -1;
+      this.lastCaughtUpTimeMs = -1;
+    }
+
+    public ReplicaState(int replicaId, long logEndOffset,
+        long lastFetchTimeMs, long lastCaughtUpTimeMs) {
+      this.replicaId = replicaId;
+      this.logEndOffset = logEndOffset;
+      this.lastFetchTimeMs = lastFetchTimeMs;
+      this.lastCaughtUpTimeMs = lastCaughtUpTimeMs;
+    }
+
+    public int replicaId() {
+      return replicaId;
+    }
+
+    public long logEndOffset() {
+      return logEndOffset;
+    }
+
+    public long lastFetchTimeMs() {

Review Comment:
   `OptionalLong` makes sense. Let me make that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r881002654


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -318,6 +325,11 @@ public class KafkaAdminClient extends AdminClient {
     private final Logger log;
     private final LogContext logContext;
 
+    /**
+     * The name of the internal raft metadata topic
+     */
+    private static final String METADATA_TOPIC_NAME = "__cluster_metadata";

Review Comment:
   Hmm, I hadn't really been thinking about the fact that we would have to expose this to the client. I guess that is the consequence of having such a general `DescribeQuorum` API. This makes me wonder if we ought to be more forward looking with the naming here. Suppose that we ultimately decide to use raft for partition replication as well. Then we might want to be able to use `DescribeQuorum` for user partitions as well, but we haven't given ourselves a lot of room for extension in the `describeQuorum` API. Would it make sense to make the new API more specific to the metadata quorum?
   ```java
   public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options)
   ```
   It is more verbose, but it is also clearer.
   
   We should also move this constant to `org.apache.kafka.common.internals.Topic`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r882104924


##########
core/src/test/scala/unit/kafka/server/DescribeQuorumIntegrationTest.scala:
##########
@@ -17,25 +17,30 @@
 package kafka.server
 
 import java.io.IOException
-
 import kafka.test.ClusterInstance
 import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
 import kafka.test.junit.ClusterTestExtensions
-import kafka.utils.NotNothing
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.{NotNothing, TestUtils}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeMetadataQuorumOptions}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
+import org.apache.kafka.metadata.BrokerState
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
+import org.slf4j.LoggerFactory
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
+@Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
-class DescribeQuorumRequestTest(cluster: ClusterInstance) {
+class DescribeQuorumIntegrationTest(cluster: ClusterInstance) {
+  val log = LoggerFactory.getLogger(classOf[DescribeQuorumIntegrationTest])
 
   @ClusterTest(clusterType = Type.ZK)
   def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {

Review Comment:
   Another issue is that the new fields are defaulted to `-1` when not set, but also `-1` when the value for a particualr `voter` or `observer` is unknown. Thinking if there is a way test this reliably.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r882205666


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+        this.topic = topic;
+        this.leaderId = leaderId;
+        this.voters = voters;
+        this.observers = observers;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    public Integer leaderId() {
+        return leaderId;
+    }
+
+    public List<ReplicaState> voters() {
+        return voters;
+    }
+
+    public List<ReplicaState> observers() {
+        return observers;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        QuorumInfo that = (QuorumInfo) o;
+        return topic.equals(that.topic)
+            && leaderId.equals(that.leaderId)
+            && voters.equals(that.voters)
+            && observers.equals(that.observers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topic, leaderId, voters, observers);
+    }
+
+    @Override
+    public String toString() {
+        return "QuorumInfo{" +
+            "topic='" + topic + '\'' +
+            ", leaderId=" + leaderId +
+            ", voters=" + voters.toString() +
+            ", observers=" + observers.toString() +
+            '}';
+    }
+
+    public static class ReplicaState {
+        private final int replicaId;
+        private final long logEndOffset;
+        private final OptionalLong lastFetchTimeMs;
+        private final OptionalLong lastCaughtUpTimeMs;
+
+        public ReplicaState() {

Review Comment:
   ~~I need the constructor access in `DescribeQuorumResponse.java`. I can sort of work around that by returning the fields raw or encapsulated in a different object, but i thought this was simpler. If we want to prioritize hiding the constructor more, I can change it so that the `getVoterInfo` and `getObserverInfo` return raw fields. Do you have any thoughts on this?~~



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+        this.topic = topic;
+        this.leaderId = leaderId;
+        this.voters = voters;
+        this.observers = observers;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    public Integer leaderId() {
+        return leaderId;
+    }
+
+    public List<ReplicaState> voters() {
+        return voters;
+    }
+
+    public List<ReplicaState> observers() {
+        return observers;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        QuorumInfo that = (QuorumInfo) o;
+        return topic.equals(that.topic)
+            && leaderId.equals(that.leaderId)
+            && voters.equals(that.voters)
+            && observers.equals(that.observers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topic, leaderId, voters, observers);
+    }
+
+    @Override
+    public String toString() {
+        return "QuorumInfo{" +
+            "topic='" + topic + '\'' +
+            ", leaderId=" + leaderId +
+            ", voters=" + voters.toString() +
+            ", observers=" + observers.toString() +
+            '}';
+    }
+
+    public static class ReplicaState {
+        private final int replicaId;
+        private final long logEndOffset;
+        private final OptionalLong lastFetchTimeMs;
+        private final OptionalLong lastCaughtUpTimeMs;
+
+        public ReplicaState() {

Review Comment:
   Never mind. I think I was just being a little stupid :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891518561


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,94 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                try {
+                    if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(quorumResponse.data().errorCode()).exception();
+                    }
+                    if (quorumResponse.data().topics().size() > 1) {
+                        String msg = String.format("DescribeMetadataQuorum received {} topics when 1 was expected",
+                                quorumResponse.data().topics().size());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0);
+                    if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+                        String msg = String.format("DescribeMetadataQuorum received a topic with name {} when {} was expected",
+                                topic.topicName(), METADATA_TOPIC_NAME);
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    if (topic.partitions().size() > 1) {
+                        String msg = String.format("DescribeMetadataQuorum received a topic {} with {} partitions when 1 was expected",
+                                topic.topicName(), topic.partitions().size());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0);
+                    if (partition.partitionIndex() != METADATA_TOPIC_PARTITION.partition()) {
+                        String msg = String.format("DescribeMetadataQuorum received a single partition with index {} when {} was expected",
+                                partition.partitionIndex(), METADATA_TOPIC_PARTITION.partition());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    if (partition.errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(partition.errorCode()).exception();
+                    }
+                    future.complete(createQuorumResult(partition));
+                } catch (RuntimeException e) {
+                    throw e;

Review Comment:
   The reason I added this block is because I noticed a gradle warning which suggested that (with the addition of the general Exception catch block), some runtime exceptions might get hidden. A little reading on this suggested that a best practice is to catch and re-throw runtime exceptions. I guess your comment here is that it is best to just do the same thing with runtime exceptions as we are doing with other exceptions and complete the future with an error. Am I understanding that correctly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r892884885


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                            v.logEndOffset(),
+                            v.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+                            v.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                            o.logEndOffset(),
+                            o.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+                            o.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp())));

Review Comment:
   Do we really think this is worth another method.?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                            v.logEndOffset(),
+                            v.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+                            v.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                            o.logEndOffset(),
+                            o.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+                            o.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                    throw Errors.forCode(quorumResponse.data().errorCode()).exception();

Review Comment:
   That is correct understanding. The intent (based on some discussion with Jason) is to allow the default handler to manage exceptions returned by the server. The `UnknownServerException` that we raise here will end up calling `handleFailure` and complete exceptionally.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +778,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(0, quorumInfo.leaderId())

Review Comment:
   Done!



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,46 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala
+      val observerData = partitionData.observers().asScala
+      if (version == 0) {

Review Comment:
   I think I missed replying to this comment earlier. Apologies about that. I am not sure if we can check anything for versions greater than zero at the moment. As of this PR we are not setting the fields in the response and so we cannot verify that. Was there something you had in mind that we should verify for versions > 0?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,46 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala
+      val observerData = partitionData.observers().asScala
+      if (version == 0) {
+        voterData.foreach { state =>
+          assertTrue(0 < state.replicaId)

Review Comment:
   yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r893659548


##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java:
##########
@@ -93,4 +93,5 @@ public static DescribeQuorumResponseData singletonResponse(TopicPartition topicP
     public static DescribeQuorumResponse parse(ByteBuffer buffer, short version) {
         return new DescribeQuorumResponse(new DescribeQuorumResponseData(new ByteBufferAccessor(buffer), version));
     }
+

Review Comment:
   nit: This empty line could be removed.



##########
clients/src/main/resources/common/message/DescribeQuorumResponse.json:
##########
@@ -17,7 +17,8 @@
   "apiKey": 55,
   "type": "response",
   "name": "DescribeQuorumResponse",
-  "validVersions": "0",
+  // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState (KIP-836)

Review Comment:
   nit: Can we add a dot at the end of the sentence?



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.

Review Comment:
   This comment may have been missed.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                            v.logEndOffset(),
+                            v.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+                            v.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                            o.logEndOffset(),
+                            o.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+                            o.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                    throw Errors.forCode(quorumResponse.data().errorCode()).exception();

Review Comment:
   Great, thanks.



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    QuorumInfo(Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+        this.leaderId = leaderId;
+        this.voters = voters;
+        this.observers = observers;
+    }
+
+    public Integer leaderId() {
+        return leaderId;
+    }
+
+    public List<ReplicaState> voters() {
+        return voters;
+    }
+
+    public List<ReplicaState> observers() {
+        return observers;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        QuorumInfo that = (QuorumInfo) o;
+        return leaderId.equals(that.leaderId)
+            && voters.equals(that.voters)
+            && observers.equals(that.observers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(leaderId, voters, observers);
+    }
+
+    @Override
+    public String toString() {
+        return "QuorumInfo(" +
+            "leaderId=" + leaderId +
+            ", voters=" + voters.toString() +
+            ", observers=" + observers.toString() +
+            ')';
+    }
+
+    public static class ReplicaState {
+        private final int replicaId;
+        private final long logEndOffset;
+        private final OptionalLong lastFetchTimeMs;
+        private final OptionalLong lastCaughtUpTimeMs;
+
+        ReplicaState() {
+            this(0, 0, OptionalLong.empty(), OptionalLong.empty());
+        }
+
+        ReplicaState(int replicaId, long logEndOffset,
+                OptionalLong lastFetchTimeMs, OptionalLong lastCaughtUpTimeMs) {

Review Comment:
   nit: The code format seems a bit off here. I think that we would format like this:
   
   ```
   ReplicaState(
       int replicaId,
       long logEndOffset,
       OptionalLong lastFetchTimeMs,
       OptionalLong lastCaughtUpTimeMs
   ) {
   ```



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +778,43 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(3000, quorumInfo.leaderId())

Review Comment:
   nit: We usually omit parenthesis for getters. There are a few other cases in this file.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -725,4 +725,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(KafkaRaftServer.MetadataTopic, quorumInfo.topic())
+        assertEquals(3, quorumInfo.voters.size())
+        assertEquals(0, quorumInfo.observers.size())

Review Comment:
   This comment has not been addressed.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                            v.logEndOffset(),
+                            v.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+                            v.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                            o.logEndOffset(),
+                            o.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+                            o.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp())));

Review Comment:
   Yes. We should avoid code duplication. Btw, you could also use the stream api here.: `partition.observers().stream.map(function).collect(Collectors.toList())`. I leave this one up to you.



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,46 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala
+      val observerData = partitionData.observers().asScala
+      if (version == 0) {

Review Comment:
   In this case, should we just remove that version check and verify all versions? My understanding is that they should all be the same at the moment. We can update the test when we implement the server side. What do you think?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,48 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala
+      val observerData = partitionData.observers().asScala

Review Comment:
   nit: Should we assert the number of voters/observers as well?



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +778,43 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(3000, quorumInfo.leaderId())
+        assertEquals(0, quorumInfo.observers.size())
+        assertEquals(3, quorumInfo.voters.size())
+        quorumInfo.voters().forEach( voter => {

Review Comment:
   nit: We usually format block as follow: `quorumInfo.voters.forEach { voter =>`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r885096122


##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +56,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to ApiKeys.DESCRIBE_QUORUM.latestVersion) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+        assertTrue(leaderState.logEndOffset > 0)

Review Comment:
   nit: these are all misaligned



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +56,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to ApiKeys.DESCRIBE_QUORUM.latestVersion) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+        assertTrue(leaderState.logEndOffset > 0)
+
+        val voterData = partitionData.currentVoters().asScala
+        val observerData = partitionData.observers().asScala
+        if (version == 0) {
+          voterData.foreach( state => {

Review Comment:
   nit: the idiomatic way to write this is 
   ```scala
   voterData.foreach { state =>
     ...
   }
   ```



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4846,6 +4878,35 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+                    ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+                    ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE));
+            final KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            final QuorumInfo quorumInfo = future.get();
+            assertEquals(defaultQuorumInfo(), quorumInfo);
+        }
+    }
+
+    @Test
+    public void testDescribeMetadataQuorumFailure() {

Review Comment:
   Could we have a test case with a partition-level error?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -26,16 +25,19 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
+import org.slf4j.LoggerFactory
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
+@Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
 class DescribeQuorumRequestTest(cluster: ClusterInstance) {
+  val log = LoggerFactory.getLogger(classOf[DescribeQuorumRequestTest])

Review Comment:
   Seems unused?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +56,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to ApiKeys.DESCRIBE_QUORUM.latestVersion) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+        assertTrue(leaderState.logEndOffset > 0)
+
+        val voterData = partitionData.currentVoters().asScala
+        val observerData = partitionData.observers().asScala
+        if (version == 0) {
+          voterData.foreach( state => {
+            assertTrue(state.lastFetchTimestamp() == -1)

Review Comment:
   nit: we should use `assertEquals`. The advantage is that we can see what the actual value was in the failure message, which is sometimes useful to understand the failure.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,73 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(0);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                response.getVoterInfo(topicName, partition).forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                response.getObserverInfo(topicName, partition).forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {

Review Comment:
   nit: it doesn't look like `timeoutMs` is used? Also, could we use `DescribeQuorumRequest.singletonRequest`?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +56,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to ApiKeys.DESCRIBE_QUORUM.latestVersion) {

Review Comment:
   How about this?
   ```scala
   for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);

Review Comment:
   @niket-goel any comment here? I think this part still reads awkward. Converting to an intermediate map is conventional. An alternative would be to do a quick validation of the response. We can structure the checks like this:
   
   1. Check top-level error code
   2. Verify only one topic in the response which matches metadata topic
   3. Verify only one partition in the response with id 0
   4. Check partition-level error code.
   
   This is similar to how we handle the request in `KafkaRaftClient.handleDescribeQuorumRequest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r882153710


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+        this.topic = topic;
+        this.leaderId = leaderId;
+        this.voters = voters;
+        this.observers = observers;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    public Integer leaderId() {
+        return leaderId;
+    }
+
+    public List<ReplicaState> voters() {
+        return voters;
+    }
+
+    public List<ReplicaState> observers() {
+        return observers;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        QuorumInfo that = (QuorumInfo) o;
+        return topic.equals(that.topic)
+            && leaderId.equals(that.leaderId)
+            && voters.equals(that.voters)
+            && observers.equals(that.observers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topic, leaderId, voters, observers);
+    }
+
+    @Override
+    public String toString() {
+        return "QuorumInfo{" +

Review Comment:
   nit: conventionally (if perhaps not always consistently), we prefer parenthesis for `toString` implementations



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4327,58 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                response.getVoterInfo(topicName, partition).forEach(voters::add);
+                response.getObserverInfo(topicName, partition).forEach(observers::add);
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                DescribeQuorumRequestData data = new DescribeQuorumRequestData()
+                        .setTopics(singletonList(new DescribeQuorumRequestData.TopicData()
+                                .setPartitions(singletonList(new DescribeQuorumRequestData.PartitionData()
+                                        .setPartitionIndex(0)))
+                                .setTopicName(METADATA_TOPIC_NAME)));
+                return new Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                if (quorumResponse.data().errorCode() == Errors.NONE.code()) {

Review Comment:
   Hmm.. We're checking the top-level error code, but the response also has a partition-level error to check.



##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1446,6 +1446,35 @@ default DescribeFeaturesResult describeFeatures() {
      */
     UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);
 
+    /**
+     * Describes the state of the metadata quorum.
+     * <p>
+     * This is a convenience method for {@link #describeMetadataQuorum(DescribeMetadataQuorumOptions)}  with default options.
+     * See the overload for more details.
+     *
+     * @return the {@link DescribeMetadataQuorumResult} containing the result
+     */
+    default DescribeMetadataQuorumResult describeMetadataQuorum() {

Review Comment:
   It is very common for API details to change during implementation. Once we're satisfied and ready to merge, we should update the KIP. Typically we would also send a message to the vote thread in case there are any concerns.



##########
clients/src/main/resources/common/message/DescribeQuorumRequest.json:
##########
@@ -18,7 +18,7 @@
   "type": "request",
   "listeners": ["broker", "controller"],
   "name": "DescribeQuorumRequest",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   nit: maybe we can something like this:
   ```
     // Version 1 adds additional fields in the response. The request is unchanged.
   ```



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumIntegrationTest.scala:
##########
@@ -26,16 +25,17 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
+@Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
-class DescribeQuorumRequestTest(cluster: ClusterInstance) {
+class DescribeQuorumTest(cluster: ClusterInstance) {

Review Comment:
   I'm a little confused what happened here, but it looks like we should get rid of this class and rename `DescribeQuorumTest` to `DescribeQuorumIntegrationTest`. Also, did we lose the admin integration test you added?



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+        this.topic = topic;
+        this.leaderId = leaderId;
+        this.voters = voters;
+        this.observers = observers;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    public Integer leaderId() {
+        return leaderId;
+    }
+
+    public List<ReplicaState> voters() {
+        return voters;
+    }
+
+    public List<ReplicaState> observers() {
+        return observers;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        QuorumInfo that = (QuorumInfo) o;
+        return topic.equals(that.topic)
+            && leaderId.equals(that.leaderId)
+            && voters.equals(that.voters)
+            && observers.equals(that.observers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topic, leaderId, voters, observers);
+    }
+
+    @Override
+    public String toString() {
+        return "QuorumInfo{" +
+            "topic='" + topic + '\'' +
+            ", leaderId=" + leaderId +
+            ", voters=" + voters.toString() +
+            ", observers=" + observers.toString() +
+            '}';
+    }
+
+    public static class ReplicaState {
+        private final int replicaId;
+        private final long logEndOffset;
+        private final OptionalLong lastFetchTimeMs;
+        private final OptionalLong lastCaughtUpTimeMs;
+
+        public ReplicaState() {

Review Comment:
   nit: do we need to expose this? Usually we only expose the minimum necessary constructors since we're often stuck with what we expose for a long time.



##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java:
##########
@@ -93,4 +97,57 @@ public static DescribeQuorumResponseData singletonResponse(TopicPartition topicP
     public static DescribeQuorumResponse parse(ByteBuffer buffer, short version) {
         return new DescribeQuorumResponse(new DescribeQuorumResponseData(new ByteBufferAccessor(buffer), version));
     }
+
+    public String getTopicNameByIndex(Integer index) {
+        return data.topics().get(index).topicName();
+    }
+
+    public Integer getPartitionLeaderId(String topicName, Integer partition) {
+        Integer leaderId = -1;
+        TopicData topic = data.topics().stream()
+            .filter(t -> t.topicName().equals(topicName))
+            .findFirst()
+            .orElse(null);
+        if (topic != null) {
+            leaderId = Integer.valueOf(topic.partitions().get(partition).leaderId());
+        }
+        return leaderId;
+    }
+
+    public List<QuorumInfo.ReplicaState> getVoterInfo(String topicName, Integer partition) {
+        List<QuorumInfo.ReplicaState> voterInfo = new ArrayList<>();
+        TopicData topic = data.topics().stream()
+            .filter(t -> t.topicName().equals(topicName))
+            .findFirst()
+            .orElse(null);
+        if (topic != null) {
+            topic.partitions().get(partition).currentVoters().forEach(
+                v -> {
+                    voterInfo.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                            v.logEndOffset(),
+                            OptionalLong.of(v.lastFetchTimestamp()),
+                            OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+        }
+        return voterInfo;
+    }
+
+    public List<QuorumInfo.ReplicaState> getObserverInfo(String topicName, Integer partition) {

Review Comment:
   nit: `getVoterInfo` and `getObserverInfo` seem basically the same. Feels like we can factor out the common logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r882261419


##########
core/src/test/scala/unit/kafka/server/DescribeQuorumIntegrationTest.scala:
##########
@@ -26,16 +25,17 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
+@Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
-class DescribeQuorumRequestTest(cluster: ClusterInstance) {
+class DescribeQuorumTest(cluster: ClusterInstance) {

Review Comment:
   The class name and file name are inconsistent.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4327,58 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);

Review Comment:
   `response.getTopicNameByIndex(partition)` is a little confusing here, how can we get a topic by partition, I think we should rename partition to topicIndex or use `response.getTopicNameByIndex(0)` directly.



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumTest.scala:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.io.IOException
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.NotNothing
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT)
+@Tag("integration")
+class DescribeQuorumIntegrationTest(cluster: ClusterInstance) {

Review Comment:
   Ditto, The class name and file name are inconsistent.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4846,6 +4878,31 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create((short) 55, (short) 0, (short) 1));

Review Comment:
   ApiKeys.DESCRIBE_QUORUM.id,  DESCRIBE_QUORUM.highestSupportVersion, DESCRIBE_QUORUM.lowestSupportVersion may be better here.



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumTest.scala:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.io.IOException
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.NotNothing
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT)
+@Tag("integration")
+class DescribeQuorumIntegrationTest(cluster: ClusterInstance) {
+  val log = LoggerFactory.getLogger(classOf[DescribeQuorumIntegrationTest])
+
+  @ClusterTest(clusterType = Type.ZK)
+  def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {
+    val apiRequest = new ApiVersionsRequest.Builder().build()
+    val apiResponse =  connectAndReceive[ApiVersionsResponse](apiRequest)
+    assertNull(apiResponse.apiVersion(ApiKeys.DESCRIBE_QUORUM.id))
+
+    val describeQuorumRequest = new DescribeQuorumRequest.Builder(
+      singletonRequest(KafkaRaftServer.MetadataPartition)
+    ).build()
+
+    assertThrows(classOf[IOException], () => {
+      connectAndReceive[DescribeQuorumResponse](describeQuorumRequest)
+    })
+  }
+
+  @ClusterTest
+  def testDescribeQuorum(): Unit = {
+    for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to ApiKeys.DESCRIBE_QUORUM.latestVersion) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+        assertTrue(leaderState.logEndOffset > 0)
+
+        val voterData = partitionData.currentVoters().asScala
+        if (version == 0) {
+          voterData.foreach( state => {
+            assertTrue(state.lastFetchTimestamp() == -1)
+            assertTrue(state.lastCaughtUpTimestamp() == -1)
+          })
+        }
+    }
+  }
+
+  private def connectAndReceive[T <: AbstractResponse](
+    request: AbstractRequest
+  )(
+    implicit classTag: ClassTag[T], nn: NotNothing[T]
+  ): T = {
+    IntegrationTestUtils.connectAndReceive(
+      request,
+      cluster.brokerSocketServers().asScala.head,

Review Comment:
   I wonder if we can send it to controllerSockerServer directly? for example, we set bootstrap.server=localhost:9093, we should prevent a client from doing this.



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumTest.scala:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.io.IOException
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.NotNothing
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT)
+@Tag("integration")
+class DescribeQuorumIntegrationTest(cluster: ClusterInstance) {
+  val log = LoggerFactory.getLogger(classOf[DescribeQuorumIntegrationTest])
+
+  @ClusterTest(clusterType = Type.ZK)
+  def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {
+    val apiRequest = new ApiVersionsRequest.Builder().build()
+    val apiResponse =  connectAndReceive[ApiVersionsResponse](apiRequest)
+    assertNull(apiResponse.apiVersion(ApiKeys.DESCRIBE_QUORUM.id))
+
+    val describeQuorumRequest = new DescribeQuorumRequest.Builder(
+      singletonRequest(KafkaRaftServer.MetadataPartition)
+    ).build()
+
+    assertThrows(classOf[IOException], () => {
+      connectAndReceive[DescribeQuorumResponse](describeQuorumRequest)
+    })
+  }
+
+  @ClusterTest
+  def testDescribeQuorum(): Unit = {
+    for (version <- ApiKeys.DESCRIBE_QUORUM.oldestVersion to ApiKeys.DESCRIBE_QUORUM.latestVersion) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+        assertTrue(leaderState.logEndOffset > 0)
+
+        val voterData = partitionData.currentVoters().asScala

Review Comment:
   It's worth checking `partitionData.observers()` here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r890689748


##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java:
##########
@@ -93,4 +94,25 @@ public static DescribeQuorumResponseData singletonResponse(TopicPartition topicP
     public static DescribeQuorumResponse parse(ByteBuffer buffer, short version) {
         return new DescribeQuorumResponse(new DescribeQuorumResponseData(new ByteBufferAccessor(buffer), version));
     }
+
+    /**
+     * Get the replica info for the given topic name and partition.
+     * @param topicName Name of the topic to fetch
+     * @param partition Index of the parition to fetch
+     * @param getVoterInfo Return the voter information if true, return observers otherwise
+     * @return List of {@link ReplicaState}
+     */
+    private List<ReplicaState> getReplicaInfo(String topicName, Integer partition, boolean getVoterInfo) {

Review Comment:
   nit: are we using this anymore?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,94 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                try {
+                    if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(quorumResponse.data().errorCode()).exception();
+                    }
+                    if (quorumResponse.data().topics().size() > 1) {
+                        String msg = String.format("DescribeMetadataQuorum received {} topics when 1 was expected",
+                                quorumResponse.data().topics().size());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0);
+                    if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+                        String msg = String.format("DescribeMetadataQuorum received a topic with name {} when {} was expected",
+                                topic.topicName(), METADATA_TOPIC_NAME);
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    if (topic.partitions().size() > 1) {
+                        String msg = String.format("DescribeMetadataQuorum received a topic {} with {} partitions when 1 was expected",
+                                topic.topicName(), topic.partitions().size());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0);
+                    if (partition.partitionIndex() != METADATA_TOPIC_PARTITION.partition()) {
+                        String msg = String.format("DescribeMetadataQuorum received a single partition with index {} when {} was expected",
+                                partition.partitionIndex(), METADATA_TOPIC_PARTITION.partition());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    if (partition.errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(partition.errorCode()).exception();
+                    }
+                    future.complete(createQuorumResult(partition));
+                } catch (RuntimeException e) {
+                    throw e;
+                } catch (Exception e) {

Review Comment:
   Which exceptions are we catching here?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,94 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                try {
+                    if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(quorumResponse.data().errorCode()).exception();
+                    }
+                    if (quorumResponse.data().topics().size() > 1) {
+                        String msg = String.format("DescribeMetadataQuorum received {} topics when 1 was expected",
+                                quorumResponse.data().topics().size());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0);
+                    if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+                        String msg = String.format("DescribeMetadataQuorum received a topic with name {} when {} was expected",
+                                topic.topicName(), METADATA_TOPIC_NAME);
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    if (topic.partitions().size() > 1) {
+                        String msg = String.format("DescribeMetadataQuorum received a topic {} with {} partitions when 1 was expected",
+                                topic.topicName(), topic.partitions().size());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0);
+                    if (partition.partitionIndex() != METADATA_TOPIC_PARTITION.partition()) {
+                        String msg = String.format("DescribeMetadataQuorum received a single partition with index {} when {} was expected",
+                                partition.partitionIndex(), METADATA_TOPIC_PARTITION.partition());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    if (partition.errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(partition.errorCode()).exception();
+                    }
+                    future.complete(createQuorumResult(partition));
+                } catch (RuntimeException e) {
+                    throw e;

Review Comment:
   Is the intent to retry here? For some errors, such as auth failures, we probably would rather fail fast.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,73 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(0);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                response.getVoterInfo(topicName, partition).forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                response.getObserverInfo(topicName, partition).forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {

Review Comment:
   You may have missed the comment here.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +778,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(0, quorumInfo.leaderId())
+        assertEquals(3, quorumInfo.voters.size())
+        assertEquals(0, quorumInfo.observers.size())
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close

Review Comment:
   nit: we usually add parenthesis for mutators



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r886641824


##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -725,4 +725,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(KafkaRaftServer.MetadataTopic, quorumInfo.topic())
+        assertEquals(3, quorumInfo.voters.size())
+        assertEquals(0, quorumInfo.observers.size())

Review Comment:
   I find the cause here, We set nodeId=-1 if it's a broker so observers.size==0
   https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/core/src/main/scala/kafka/raft/RaftManager.scala#L185-L189
   
   I changed it to `val nodeId = OptionalInt.of(config.nodeId)`, then observers.size==4



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on PR #12206:
URL: https://github.com/apache/kafka/pull/12206#issuecomment-1136674585

   Published another revision addressing @hachikuji's comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on PR #12206:
URL: https://github.com/apache/kafka/pull/12206#issuecomment-1138153211

   Thanks for review @dengziming .
   
   A few comments:
   > Firstly, we can add a test in PlaintextAdminIntegrationTest for this change
   
   I looked at the file and the tests there look similar to the what was added to `KraftClusterTest`. Could you check if the intent was to run an integration test like that? We can always increase the checks that test does.
   
   > the second one is less important, it's recommended to use AdminApiHandler instead of using runnable.call directly, you can refer to FenceProducersHandler.java as a simple example.
   
   I looked at the class and it looks like a wrapper for the runnable call. Do you mind if we make that improvement in a subsequent change (just trying to get this PR under control :) )
   
   > I wonder if we can send it to controllerSockerServer directly? for example, we set bootstrap.server=localhost:9093, we should prevent a client from doing this.
   
   A couple of things here -- This code existed before this change (I just moved it). I would love to improve it though. I just did not understand what exactly your comment meant. Could you please elaborate?
   
   Apart from the above questions I think I have addressed all the other concerns everyone raised in the latest version. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r889424223


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;

Review Comment:
   I think It's better to add partition here because we are making way for multi-raft.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r889238615


##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,

Review Comment:
   Not sure about this comment. I think I would need to add the `DESCRIBE_QUORUM` API to the list of known APIs even if we used the default constructor, right? If I do not do that, the test encounters an error where it claims to not know about the `DESCRIBE_QUORUM` API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r889220152


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,91 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(0);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                response.getVoterInfo(topicName, partition).forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                response.getObserverInfo(topicName, partition).forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                try {
+                    if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(quorumResponse.data().errorCode()).exception();
+                    }
+                    if (quorumResponse.data().topics().size() > 1) {
+                        log.error("DescribeMetadataQuorum received {} topics when 1 was expected",
+                                quorumResponse.data().topics().size());
+                        throw new UnknownServerException();

Review Comment:
   Good point about adding the message to the exception being returned.
   
   The behavior of the code block is equivalent to completing the future and returning. I just didn't want to repeat the future completion code. Is there a well known convention to handle code blocks like this in JAVA?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891518982


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,94 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                try {
+                    if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(quorumResponse.data().errorCode()).exception();
+                    }
+                    if (quorumResponse.data().topics().size() > 1) {
+                        String msg = String.format("DescribeMetadataQuorum received {} topics when 1 was expected",
+                                quorumResponse.data().topics().size());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0);
+                    if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+                        String msg = String.format("DescribeMetadataQuorum received a topic with name {} when {} was expected",
+                                topic.topicName(), METADATA_TOPIC_NAME);
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    if (topic.partitions().size() > 1) {
+                        String msg = String.format("DescribeMetadataQuorum received a topic {} with {} partitions when 1 was expected",
+                                topic.topicName(), topic.partitions().size());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0);
+                    if (partition.partitionIndex() != METADATA_TOPIC_PARTITION.partition()) {
+                        String msg = String.format("DescribeMetadataQuorum received a single partition with index {} when {} was expected",
+                                partition.partitionIndex(), METADATA_TOPIC_PARTITION.partition());
+                        log.debug(msg);
+                        throw new UnknownServerException(msg);
+                    }
+                    if (partition.errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(partition.errorCode()).exception();
+                    }
+                    future.complete(createQuorumResult(partition));
+                } catch (RuntimeException e) {
+                    throw e;
+                } catch (Exception e) {

Review Comment:
   So this block is me trying to have a single `future.completeExceptionally()` call in this code block. We are catching `UnknownServerException` and any exception that might be returned by the server in its response. The generic catch handler can be avoided by not throwing the exceptions returned by the server, and instead just completing exceptionally within the block above. I just find it more maintainable to have single calls to completions etc. If this is creating some other issues in the code, i can change this.



##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java:
##########
@@ -93,4 +94,25 @@ public static DescribeQuorumResponseData singletonResponse(TopicPartition topicP
     public static DescribeQuorumResponse parse(ByteBuffer buffer, short version) {
         return new DescribeQuorumResponse(new DescribeQuorumResponseData(new ByteBufferAccessor(buffer), version));
     }
+
+    /**
+     * Get the replica info for the given topic name and partition.
+     * @param topicName Name of the topic to fetch
+     * @param partition Index of the parition to fetch
+     * @param getVoterInfo Return the voter information if true, return observers otherwise
+     * @return List of {@link ReplicaState}
+     */
+    private List<ReplicaState> getReplicaInfo(String topicName, Integer partition, boolean getVoterInfo) {

Review Comment:
   no we are not. Thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891554323


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4337,25 +4337,22 @@ public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuoru
         final Call call = new Call(
                 "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
 
-            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
-                Integer partition = 0;
-                String topicName = response.getTopicNameByIndex(0);
-                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
                 List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
                 List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
-                response.getVoterInfo(topicName, partition).forEach(v -> {
+                partition.currentVoters().forEach(v -> {
                     voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
                                 v.logEndOffset(),
                                 OptionalLong.of(v.lastFetchTimestamp()),
                                 OptionalLong.of(v.lastCaughtUpTimestamp())));

Review Comment:
   I went back and forth between that and ended up returning a -1 optional here. I now remember that the original intention was to have an empty optional. Will address this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r888913198


##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1446,6 +1446,35 @@ default DescribeFeaturesResult describeFeatures() {
      */
     UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);
 
+    /**
+     * Describes the state of the metadata quorum.
+     * <p>
+     * This is a convenience method for {@link #describeMetadataQuorum(DescribeMetadataQuorumOptions)}  with default options.

Review Comment:
   nit: There is an extra space before `with`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumResult.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of {@link Admin#describeMetadataQuorum(DescribeMetadataQuorumOptions)}
+ *
+ */
+public class DescribeMetadataQuorumResult {
+
+    private final KafkaFuture<QuorumInfo> quorumInfo;
+
+    DescribeMetadataQuorumResult(KafkaFuture<QuorumInfo> quorumInfo) {
+        this.quorumInfo = quorumInfo;
+    }
+
+    /**
+     * Returns a future QuorumInfo

Review Comment:
   nit: Should we say `Returns a future containing the quorum info.`?



##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumOptions.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+/**
+ * Options for {@link ConfluentAdmin#describeQuorum(DescribeMetadataQuorumOptions)}.
+ *

Review Comment:
   nit: We could remove this empty line.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,91 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(0);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                response.getVoterInfo(topicName, partition).forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                response.getObserverInfo(topicName, partition).forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                try {
+                    if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(quorumResponse.data().errorCode()).exception();
+                    }
+                    if (quorumResponse.data().topics().size() > 1) {
+                        log.error("DescribeMetadataQuorum received {} topics when 1 was expected",
+                                quorumResponse.data().topics().size());
+                        throw new UnknownServerException();

Review Comment:
   * We should add a message in every exception provided back to the user. If we do this, we can log as debug instead of error.
   * Instead of throwing the exception, we usually complete the future and return.



##########
clients/src/main/resources/common/message/DescribeQuorumResponse.json:
##########
@@ -17,7 +17,8 @@
   "apiKey": 55,
   "type": "response",
   "name": "DescribeQuorumResponse",
-  "validVersions": "0",
+  // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState

Review Comment:
   nit: Could we add the KIP reference?



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,

Review Comment:
   nit: You could use `NodeApiVersions.create()`.



##########
clients/src/main/resources/common/message/DescribeQuorumRequest.json:
##########
@@ -18,7 +18,8 @@
   "type": "request",
   "listeners": ["broker", "controller"],
   "name": "DescribeQuorumRequest",
-  "validVersions": "0",
+  // Version 1 adds additional fields in the response. The request is unchanged.

Review Comment:
   nit: Could we add the KIP reference?



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+                    ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+                    ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, false, false));
+            final KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            final QuorumInfo quorumInfo = future.get();
+            assertEquals(defaultQuorumInfo(), quorumInfo);
+        }
+    }
+
+    @Test
+    public void testDescribeMetadataQuorumFailure() {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+                        ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+                        ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+
+            // Test top level error
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.INVALID_REQUEST, Errors.NONE, false, false, false, false));
+            KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, Errors.INVALID_REQUEST.exception().getClass());

Review Comment:
   nit: `InvalidRequestException.class`?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {

Review Comment:
   I wonder if we could use `@ApiKeyVersionsSource(apiKey = ApiKeys.DESCRIBE_QUORUM)`. I don't know if that works with `@ClusterTest` though.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+                    ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+                    ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, false, false));
+            final KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            final QuorumInfo quorumInfo = future.get();
+            assertEquals(defaultQuorumInfo(), quorumInfo);
+        }
+    }
+
+    @Test
+    public void testDescribeMetadataQuorumFailure() {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+                        ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+                        ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+
+            // Test top level error
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.INVALID_REQUEST, Errors.NONE, false, false, false, false));
+            KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, Errors.INVALID_REQUEST.exception().getClass());
+            future = null;
+
+            // Test incorrect topic count
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, true, false, false, false));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, UnknownServerException.class);
+            future = null;
+
+            // Test incorrect topic name
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, true, false, false));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, UnknownServerException.class);
+            future = null;
+
+            // Test incorrect partition count
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, true, false));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, UnknownServerException.class);
+            future = null;
+
+            // Test incorrect partition index
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, false, true));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, UnknownServerException.class);
+            future = null;
+
+            // Test partition level error
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.INVALID_REQUEST, false, false, false, false));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, Errors.INVALID_REQUEST.exception().getClass());
+
+            // Test all incorrect and no errors
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, true, true, true, true));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, UnknownServerException.class);
+            future = null;
+
+            // Test all incorrect and both errors
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.INVALID_REQUEST, Errors.INVALID_REQUEST, true, true, true, true));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, Errors.INVALID_REQUEST.exception().getClass());
+        }
+    }
+

Review Comment:
   Could we add a test case where `lastFetchTimestamp` and `lastCaughtUpTimestamp` are not provided in the response? I suppose that should result in empty options, right?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])

Review Comment:
   nit: We could use `.toShort`.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -725,4 +725,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(KafkaRaftServer.MetadataTopic, quorumInfo.topic())
+        assertEquals(3, quorumInfo.voters.size())
+        assertEquals(0, quorumInfo.observers.size())

Review Comment:
   Do we need to do something about this in this PR?



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.

Review Comment:
   The reference to DescribeQuorumResponse is a bit weird here for our users. Could we to better describe what QuorumInfo represents?



##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumResult.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of {@link Admin#describeMetadataQuorum(DescribeMetadataQuorumOptions)}
+ *

Review Comment:
   nit: We could remove this empty line.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,91 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(0);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                response.getVoterInfo(topicName, partition).forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                response.getObserverInfo(topicName, partition).forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                try {
+                    if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(quorumResponse.data().errorCode()).exception();
+                    }
+                    if (quorumResponse.data().topics().size() > 1) {
+                        log.error("DescribeMetadataQuorum received {} topics when 1 was expected",
+                                quorumResponse.data().topics().size());
+                        throw new UnknownServerException();
+                    }
+                    DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0);
+                    if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+                        log.error("DescribeMetadataQuorum received a topic with name {} when {} was expected",
+                                topic.topicName(), METADATA_TOPIC_NAME);
+                        throw new UnknownServerException();
+                    }
+                    if (topic.partitions().size() > 1) {
+                        log.error("DescribeMetadataQuorum received a topic {} with {} partitions when 1 was expected",
+                                topic.topicName(), topic.partitions().size());
+                        throw new UnknownServerException();
+                    }
+                    DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0);
+                    if (partition.partitionIndex() != METADATA_TOPIC_PARTITION.partition()) {
+                        log.error("DescribeMetadataQuorum received a single partition with index {} when {} was expected",
+                                partition.partitionIndex(), METADATA_TOPIC_PARTITION.partition());
+                        throw new UnknownServerException();
+                    }
+                    if (partition.errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(partition.errorCode()).exception();
+                    }
+                    future.complete(createQuorumResult(quorumResponse));

Review Comment:
   At this point, we have the `partition` that we are interested in. Couldn't we directly use it to populate `QuorumInfo` instead of looking it up again in `createQuorumResult`?



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+        this.topic = topic;
+        this.leaderId = leaderId;
+        this.voters = voters;
+        this.observers = observers;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    public Integer leaderId() {
+        return leaderId;
+    }
+
+    public List<ReplicaState> voters() {
+        return voters;
+    }
+
+    public List<ReplicaState> observers() {
+        return observers;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        QuorumInfo that = (QuorumInfo) o;
+        return topic.equals(that.topic)
+            && leaderId.equals(that.leaderId)
+            && voters.equals(that.voters)
+            && observers.equals(that.observers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topic, leaderId, voters, observers);
+    }
+
+    @Override
+    public String toString() {
+        return "QuorumInfo(" +
+            "topic='" + topic + '\'' +
+            ", leaderId=" + leaderId +
+            ", voters=" + voters.toString() +
+            ", observers=" + observers.toString() +
+            ')';
+    }
+
+    public static class ReplicaState {
+        private final int replicaId;
+        private final long logEndOffset;
+        private final OptionalLong lastFetchTimeMs;
+        private final OptionalLong lastCaughtUpTimeMs;
+
+        ReplicaState() {
+            this(0, 0, OptionalLong.empty(), OptionalLong.empty());
+        }
+
+        ReplicaState(int replicaId, long logEndOffset,
+                OptionalLong lastFetchTimeMs, OptionalLong lastCaughtUpTimeMs) {
+            this.replicaId = replicaId;
+            this.logEndOffset = logEndOffset;
+            this.lastFetchTimeMs = lastFetchTimeMs;
+            this.lastCaughtUpTimeMs = lastCaughtUpTimeMs;
+        }
+
+        public int replicaId() {

Review Comment:
   nit: Should we add javadoc to all the getters to be consistent? It is a bit weird to have for some but not for all.



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {

Review Comment:
   nit: Could we keep it package private?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+        val voterData = partitionData.currentVoters().asScala

Review Comment:
   nit: Indentation is off here.



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+        val voterData = partitionData.currentVoters().asScala
+        val observerData = partitionData.observers().asScala
+        if (version == 0) {

Review Comment:
   Could we verify anything if version is > 0?



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;

Review Comment:
   It is curious that we have the topic but not the partition. Is it on purpose, perhaps because we assume only one partition anyway?



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +778,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(KafkaRaftServer.MetadataTopic, quorumInfo.topic())

Review Comment:
   Should we verify all the fields to ensure that the server give us the info that we expected?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)

Review Comment:
   Should we assert all the fields of all the replicas? I would also verify that we have the correct voters and observers, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #12206:
URL: https://github.com/apache/kafka/pull/12206#issuecomment-1154495677

   @dajac I think this is the reason there are no observers: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/RaftManager.scala#L188. We should just use `config.nodeId` consistently regardless of `process.roles`. @niket-goel @dengziming Do either of you have time to fix this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r895790891


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to describe the state of the quorum received in DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    QuorumInfo(Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+        this.leaderId = leaderId;
+        this.voters = voters;
+        this.observers = observers;
+    }
+
+    public Integer leaderId() {
+        return leaderId;
+    }
+
+    public List<ReplicaState> voters() {
+        return voters;
+    }
+
+    public List<ReplicaState> observers() {
+        return observers;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        QuorumInfo that = (QuorumInfo) o;
+        return leaderId.equals(that.leaderId)
+            && voters.equals(that.voters)
+            && observers.equals(that.observers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(leaderId, voters, observers);
+    }
+
+    @Override
+    public String toString() {
+        return "QuorumInfo(" +
+            "leaderId=" + leaderId +
+            ", voters=" + voters.toString() +
+            ", observers=" + observers.toString() +
+            ')';
+    }
+
+    public static class ReplicaState {
+        private final int replicaId;
+        private final long logEndOffset;
+        private final OptionalLong lastFetchTimeMs;
+        private final OptionalLong lastCaughtUpTimeMs;
+
+        ReplicaState() {
+            this(0, 0, OptionalLong.empty(), OptionalLong.empty());
+        }
+
+        ReplicaState(
+                int replicaId,
+                long logEndOffset,
+                OptionalLong lastFetchTimeMs,
+                OptionalLong lastCaughtUpTimeMs

Review Comment:
   nit: We usually use 4 spaces indentation here.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +778,44 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo.get()
+
+        assertEquals(0, quorumInfo.observers.size())
+        assertEquals(3, quorumInfo.voters.size())

Review Comment:
   nit: We could omit parenthesis after `size`. The same for the previous line.



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,48 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala
+      val observerData = partitionData.observers().asScala
+      assertEquals(1, voterData.size)
+      assertEquals(0, observerData.size)
+      voterData.foreach { state =>
+        assertTrue(0 < state.replicaId)
+        assertTrue(0 < state.logEndOffset())
+        assertEquals(-1, state.lastFetchTimestamp())
+        assertEquals(-1, state.lastCaughtUpTimestamp())

Review Comment:
   nit: We can also omit parenthesis for those..



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This class is used to describe the state of the quorum received in DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    QuorumInfo(Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+        this.leaderId = leaderId;
+        this.voters = voters;
+        this.observers = observers;
+    }
+
+    public Integer leaderId() {
+        return leaderId;
+    }
+
+    public List<ReplicaState> voters() {
+        return voters;
+    }
+
+    public List<ReplicaState> observers() {
+        return observers;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        QuorumInfo that = (QuorumInfo) o;
+        return leaderId.equals(that.leaderId)
+            && voters.equals(that.voters)
+            && observers.equals(that.observers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(leaderId, voters, observers);
+    }
+
+    @Override
+    public String toString() {
+        return "QuorumInfo(" +
+            "leaderId=" + leaderId +
+            ", voters=" + voters.toString() +
+            ", observers=" + observers.toString() +

Review Comment:
   nit: I suppose that we could remove the `toString` as they as implicit.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -725,4 +725,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(KafkaRaftServer.MetadataTopic, quorumInfo.topic())
+        assertEquals(3, quorumInfo.voters.size())
+        assertEquals(0, quorumInfo.observers.size())

Review Comment:
   @hachikuji Is it the expected behavior to have zero observers?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,48 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala

Review Comment:
   nit: We could omit parenthesis after `currentVoters`. The same for the next line.



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,48 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala
+      val observerData = partitionData.observers().asScala
+      assertEquals(1, voterData.size)
+      assertEquals(0, observerData.size)
+      voterData.foreach { state =>
+        assertTrue(0 < state.replicaId)
+        assertTrue(0 < state.logEndOffset())
+        assertEquals(-1, state.lastFetchTimestamp())
+        assertEquals(-1, state.lastCaughtUpTimestamp())
+      }
+      observerData.foreach { state =>
+        assertTrue(0 < state.replicaId)
+        assertTrue(0 < state.logEndOffset())
+        assertEquals(-1, state.lastFetchTimestamp())
+        assertEquals(-1, state.lastCaughtUpTimestamp())
+      }

Review Comment:
   nit: Should we just remove this block if `assertEquals(0, observerData.size)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r890414480


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;

Review Comment:
   The intention was to keep it simple for now (given we are not using the system as multi-raft yet). We could add in the partition information here as well, but then how far do we want to go. Do we want this structure to mirror the `TopicData` and `PartitionData` types in the `DescribeQuorumResponseData`, or do we just want to do something simple (like adding in a partition ID) for now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r890415293


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;

Review Comment:
   @niket-goel Maybe we had better leave topic/partition out of this object? I don't think there's any reason to expose `__cluster_metadata` to users of the admin client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r890984307


##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,

Review Comment:
   Ack. I was not aware of this bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r891769739


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                            v.logEndOffset(),
+                            v.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+                            v.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                            o.logEndOffset(),
+                            o.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+                            o.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                    throw Errors.forCode(quorumResponse.data().errorCode()).exception();
+                }
+                if (quorumResponse.data().topics().size() > 1) {

Review Comment:
   Maybe we should check if size is not equal to 1 here and below. I guess an empty list is also possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r880984176


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4181,6 +4193,61 @@ private static byte[] getSaltedPasword(ScramMechanism publicScramMechanism, byte
                 .hi(password, salt, iterations);
     }
 
+    @Override
+    public DescribeQuorumResult describeQuorum(DescribeQuorumOptions options) {
+      NodeProvider provider = new LeastLoadedNodeProvider();
+
+      final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+      final long now = time.milliseconds();
+        final Call call = new Call(
+            "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), entry.getValue()));
+                }
+                List<ReplicaState> observers = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getObserverOffsets(topicName, partition).entrySet()) {
+                    observers.add(new ReplicaState(entry.getKey(), entry.getValue()));
+                }
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+               return info;

Review Comment:
   nit: this looks misaligned



##########
clients/src/main/java/org/apache/kafka/common/utils/QuorumInfo.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.List;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {

Review Comment:
   Would it make sense to implement `equals` and `hashCode`? That is often useful for testing. Also it would be nice to have a good `toString` implementations.



##########
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##########
@@ -256,6 +256,21 @@ public static List<Node> voterConnectionsToNodes(Map<Integer, RaftConfig.Address
             .collect(Collectors.toList());
     }
 
+    public static String nodesToVoterConnections(List<Node> nodes) {

Review Comment:
   Seems like we're not using this anywhere. Included by mistake perhaps?



##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1382,6 +1382,23 @@ default DescribeFeaturesResult describeFeatures() {
         return describeFeatures(new DescribeFeaturesOptions());
     }
 
+    /**
+     * Describe the state of the raft quorum

Review Comment:
   How about "metadata quorum" or "kraft quorum"? Also nit: missing period.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -318,6 +325,11 @@ public class KafkaAdminClient extends AdminClient {
     private final Logger log;
     private final LogContext logContext;
 
+    /**
+     * The name of the internal raft metadata topic
+     */
+    private static final String METADATA_TOPIC_NAME = "__cluster_metadata";

Review Comment:
   Hmm, I hadn't really been thinking about the fact that we would have to expose this to the client. I guess that is the consequence of having such a general `DescribeQuorum` API. This makes me wonder if we ought to be more forward looking with the naming here. Suppose that we ultimately decide to use raft for partition replication as well. Then we might want to be able to use `DescribeQuorum` for user partitions as well, but we haven't given ourselves a lot of room for extension in the `describeQuorum` API. Would it make sense to 
   ```java
   public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options)
   ```
   It is more verbose, but it is also clearer.
   
   We should also move this constant to `org.apache.kafka.common.internals.Topic`.



##########
clients/src/main/java/org/apache/kafka/common/utils/QuorumInfo.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.List;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+  private final String topic;
+  private final Integer leaderId;
+  private final List<ReplicaState> voters;
+  private final List<ReplicaState> observers;
+
+  public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+    this.topic = topic;
+    this.leaderId = leaderId;
+    this.voters = voters;
+    this.observers = observers;
+  }
+
+  public String topic() {
+    return topic;
+  }
+
+  public Integer leaderId() {
+    return leaderId;
+  }
+
+  public List<ReplicaState> voters() {
+    return voters;
+  }
+
+  public List<ReplicaState> observers() {
+    return observers;
+  }
+
+  public static class ReplicaState {
+    private final int replicaId;
+    private final long logEndOffset;
+    private final long lastFetchTimeMs;
+    private final long lastCaughtUpTimeMs;
+
+    public ReplicaState(int replicaId, long logEndOffset) {
+      this.replicaId = replicaId;
+      this.logEndOffset = logEndOffset;
+      this.lastFetchTimeMs = -1;
+      this.lastCaughtUpTimeMs = -1;
+    }
+
+    public ReplicaState(int replicaId, long logEndOffset,
+        long lastFetchTimeMs, long lastCaughtUpTimeMs) {
+      this.replicaId = replicaId;
+      this.logEndOffset = logEndOffset;
+      this.lastFetchTimeMs = lastFetchTimeMs;
+      this.lastCaughtUpTimeMs = lastCaughtUpTimeMs;
+    }
+
+    public int replicaId() {
+      return replicaId;
+    }
+
+    public long logEndOffset() {
+      return logEndOffset;
+    }
+
+    public long lastFetchTimeMs() {

Review Comment:
   Can we document the result for older versions of `DescribeQuorum`? I am tempted to suggest that we change the type to `OptionalLong` to make it clear that the value may be absent for older versions.



##########
clients/src/main/resources/common/message/DescribeQuorumResponse.json:
##########
@@ -17,7 +17,7 @@
   "apiKey": 55,
   "type": "response",
   "name": "DescribeQuorumResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   Usually we add some comments to describe what has changed in each version. You can see the other request specs for examples.



##########
core/src/test/scala/integration/kafka/admin/DescribeQuorumTest.scala:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.admin
+
+import kafka.server.KafkaRaftServer
+import kafka.testkit.KafkaClusterTestKit
+import kafka.testkit.TestKitNodes
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeQuorumOptions}
+import org.apache.kafka.metadata.BrokerState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{Tag, Test, Timeout}
+import org.slf4j.LoggerFactory
+
+
+@Timeout(120)
+@Tag("integration")
+class DescribeQuorumTest {

Review Comment:
   We have another class `DescribeQuorumRequesetTest`. Would it make sense to move this test there? Perhaps we could generalize it `DescribeQuorumIntegrationTest` or something like that.



##########
clients/src/main/java/org/apache/kafka/common/utils/QuorumInfo.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.List;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+  private final String topic;
+  private final Integer leaderId;
+  private final List<ReplicaState> voters;
+  private final List<ReplicaState> observers;
+
+  public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+    this.topic = topic;
+    this.leaderId = leaderId;
+    this.voters = voters;
+    this.observers = observers;
+  }
+
+  public String topic() {
+    return topic;
+  }
+
+  public Integer leaderId() {
+    return leaderId;
+  }
+
+  public List<ReplicaState> voters() {
+    return voters;
+  }
+
+  public List<ReplicaState> observers() {
+    return observers;
+  }
+
+  public static class ReplicaState {
+    private final int replicaId;
+    private final long logEndOffset;
+    private final long lastFetchTimeMs;
+    private final long lastCaughtUpTimeMs;
+
+    public ReplicaState(int replicaId, long logEndOffset) {

Review Comment:
   Do we need to expose this?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4181,6 +4193,61 @@ private static byte[] getSaltedPasword(ScramMechanism publicScramMechanism, byte
                 .hi(password, salt, iterations);
     }
 
+    @Override
+    public DescribeQuorumResult describeQuorum(DescribeQuorumOptions options) {
+      NodeProvider provider = new LeastLoadedNodeProvider();
+
+      final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+      final long now = time.milliseconds();
+        final Call call = new Call(
+            "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), entry.getValue()));
+                }
+                List<ReplicaState> observers = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getObserverOffsets(topicName, partition).entrySet()) {
+                    observers.add(new ReplicaState(entry.getKey(), entry.getValue()));
+                }
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+               return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                DescribeQuorumRequestData data = new DescribeQuorumRequestData()
+                    .setTopics(singletonList(new DescribeQuorumRequestData.TopicData()
+                        .setPartitions(singletonList(new DescribeQuorumRequestData.PartitionData()
+                            .setPartitionIndex(0)))
+                        .setTopicName(METADATA_TOPIC_NAME)));
+                return new Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                if (quorumResponse.data().errorCode() == Errors.NONE.code()) {
+                    future.complete(createQuorumResult(quorumResponse));
+                } else {
+                    future.completeExceptionally(Errors.forCode(quorumResponse.data().errorCode()).exception());

Review Comment:
   It would be helpful to have a couple unit tests in `KafkaAdminClientTest` covering failure and success cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r882012798


##########
core/src/test/scala/unit/kafka/server/DescribeQuorumIntegrationTest.scala:
##########
@@ -80,6 +85,36 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
     assertTrue(leaderState.logEndOffset > 0)
   }
 
+  @ClusterTest
+  def testDescribeQuorumRequestToBrokers() = {

Review Comment:
   I actually had this test in a separate suite akin to `KRaftClusterTest` before, but moved it here because Jason pointed out that a test file for this API already existed. I think I am going to move the test to where you suggested. It does make more sense to me there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #12206:
URL: https://github.com/apache/kafka/pull/12206#issuecomment-1154541770

   @niket-goel It looks like we are running into https://issues.apache.org/jira/browse/KAFKA-13940, which is causing the new integration test to be flaky. I think our options are either to handle this error code in the client or to change the raft implementation to throw a retriable error (probably either `NotController` or `NotLeader`).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12206:
URL: https://github.com/apache/kafka/pull/12206#issuecomment-1154853297

   > @niket-goel It looks like we are running into https://issues.apache.org/jira/browse/KAFKA-13940, which is causing the new integration test to be flaky. I think our options are either to handle this error code in the client or to change the raft implementation to throw a retriable error (probably either `NotController` or `NotLeader`).
   
   I lean towards the server side change to be consistent with other APIs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r892495891


##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +778,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(0, quorumInfo.leaderId())

Review Comment:
   Should we check all the data in quorum info in this integration test? Just checking the leader and the number of voters/observers seems weak to me.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                            v.logEndOffset(),
+                            v.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+                            v.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                            o.logEndOffset(),
+                            o.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+                            o.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp())));

Review Comment:
   Could we extract this bloc into an helper method and share it with the voters part?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,46 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala
+      val observerData = partitionData.observers().asScala
+      if (version == 0) {

Review Comment:
   What about version > 0?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                partition.currentVoters().forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                            v.logEndOffset(),
+                            v.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()),
+                            v.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                partition.observers().forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                            o.logEndOffset(),
+                            o.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()),
+                            o.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                    throw Errors.forCode(quorumResponse.data().errorCode()).exception();

Review Comment:
   For my understanding, is the idea to retry the `Retriable` ones here? I also suppose that the `UnknownServerException` thrown will be caught and result in calling `handleFailure`. Am I right?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,46 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.toShort)
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+      val voterData = partitionData.currentVoters().asScala
+      val observerData = partitionData.observers().asScala
+      if (version == 0) {
+        voterData.foreach { state =>
+          assertTrue(0 < state.replicaId)

Review Comment:
   Should we also assert `logEndOffset`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r883057402


##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java:
##########
@@ -93,4 +94,57 @@ public static DescribeQuorumResponseData singletonResponse(TopicPartition topicP
     public static DescribeQuorumResponse parse(ByteBuffer buffer, short version) {
         return new DescribeQuorumResponse(new DescribeQuorumResponseData(new ByteBufferAccessor(buffer), version));
     }
+
+    public String getTopicNameByIndex(Integer index) {
+        return data.topics().get(index).topicName();
+    }
+
+    public Integer getPartitionLeaderId(String topicName, Integer partition) {
+        Integer leaderId = -1;
+        TopicData topic = data.topics().stream()
+            .filter(t -> t.topicName().equals(topicName))
+            .findFirst()
+            .orElse(null);
+        if (topic != null) {
+            leaderId = Integer.valueOf(topic.partitions().get(partition).leaderId());
+        }
+        return leaderId;
+    }
+
+    /**
+     * Get the replica info for the given topic name and partition.
+     * @param topicName Name of the topic to fetch
+     * @param partition Index of the parition to fetch
+     * @param getVoterInfo Return the voter information if true, return observers otherwise
+     * @return List of {@link ReplicaState}
+     */
+    private List<ReplicaState> getReplicaInfo(String topicName, Integer partition, boolean getVoterInfo) {
+        //List<QuorumInfo.ReplicaState> replicaInfo = new ArrayList<>();
+        TopicData topic = data.topics().stream()
+                .filter(t -> t.topicName().equals(topicName))
+                .findFirst()
+                .orElse(null);
+        if (topic != null) {
+            List<ReplicaState> replicaStates = getVoterInfo ? topic.partitions().get(partition).currentVoters()
+                    : topic.partitions().get(partition).observers();
+            //replicaStates.forEach(

Review Comment:
   Left in by mistake?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r882011850


##########
core/src/test/scala/unit/kafka/server/DescribeQuorumIntegrationTest.scala:
##########
@@ -17,25 +17,30 @@
 package kafka.server
 
 import java.io.IOException
-
 import kafka.test.ClusterInstance
 import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
 import kafka.test.junit.ClusterTestExtensions
-import kafka.utils.NotNothing
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.{NotNothing, TestUtils}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeMetadataQuorumOptions}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
+import org.apache.kafka.metadata.BrokerState
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
+import org.slf4j.LoggerFactory
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
+@Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
-class DescribeQuorumRequestTest(cluster: ClusterInstance) {
+class DescribeQuorumIntegrationTest(cluster: ClusterInstance) {
+  val log = LoggerFactory.getLogger(classOf[DescribeQuorumIntegrationTest])
 
   @ClusterTest(clusterType = Type.ZK)
   def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {

Review Comment:
   Thanks for the detailed review @dajac. Some of the misses were me working with multiple versions of the code. Appreciate you catching those. I will udpate the next version with all of your comments addressed. One quick question about this comment though:
   Is there an example of this somewhere that I can look at? Not sure how to change and verify the version of the request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r882150774


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);

Review Comment:
   I agree it is a little awkward. A common way of structuring responses which include topic partitions is to first build something like `Map<TopicPartition, QuorumInfo>`. Then we can pluck out the partitions we need. For example, see `logDirDescriptions` in this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r882150774


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);

Review Comment:
   I agree it is a little awkward. A common way of structuring responses which include topic partitions is to first build something like `Map<TopicPartition, QuorumInfo>`. Then we can pluck out the partitions we need.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r882205666


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+        this.topic = topic;
+        this.leaderId = leaderId;
+        this.voters = voters;
+        this.observers = observers;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    public Integer leaderId() {
+        return leaderId;
+    }
+
+    public List<ReplicaState> voters() {
+        return voters;
+    }
+
+    public List<ReplicaState> observers() {
+        return observers;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        QuorumInfo that = (QuorumInfo) o;
+        return topic.equals(that.topic)
+            && leaderId.equals(that.leaderId)
+            && voters.equals(that.voters)
+            && observers.equals(that.observers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topic, leaderId, voters, observers);
+    }
+
+    @Override
+    public String toString() {
+        return "QuorumInfo{" +
+            "topic='" + topic + '\'' +
+            ", leaderId=" + leaderId +
+            ", voters=" + voters.toString() +
+            ", observers=" + observers.toString() +
+            '}';
+    }
+
+    public static class ReplicaState {
+        private final int replicaId;
+        private final long logEndOffset;
+        private final OptionalLong lastFetchTimeMs;
+        private final OptionalLong lastCaughtUpTimeMs;
+
+        public ReplicaState() {

Review Comment:
   I need the constructor access in `DescribeQuorumResponse.java`. I can sort of work around that by returning the fields raw or encapsulated in a different object, but i thought this was simpler. If we want to prioritize hiding the constructor more, I can change it so that the `getVoterInfo` and `getObserverInfo` return raw fields. Do you have any thoughts on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r885406422


##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -725,4 +725,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(KafkaRaftServer.MetadataTopic, quorumInfo.topic())
+        assertEquals(3, quorumInfo.voters.size())
+        assertEquals(0, quorumInfo.observers.size())

Review Comment:
   I wonder why there isn't an observer, we should have 4 observers since we have 4 brokers, this may also not be related to this PR, I will spend some time investigating.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r890446439


##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {

Review Comment:
   I had considered giving the same comment, but I thought it might be better--or at least faster--to use the same integration test setup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on PR #12206:
URL: https://github.com/apache/kafka/pull/12206#issuecomment-1154525501

   Thanks @hachikuji . I have created https://issues.apache.org/jira/browse/KAFKA-13986 for the observer count issue and will raise a PR once this one is merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r881044720


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -318,6 +325,11 @@ public class KafkaAdminClient extends AdminClient {
     private final Logger log;
     private final LogContext logContext;
 
+    /**
+     * The name of the internal raft metadata topic
+     */
+    private static final String METADATA_TOPIC_NAME = "__cluster_metadata";

Review Comment:
   This is a great point! I think it makes sense to give ourselves wiggle room and call this `MetadataQuorum`.
   Re: moving the constant -- I was actually looking for a common place to put it so that both the clients and the server code can use the same value. Is there any class that they share? Right now the name for the topic on the server side is defined in `KafkaRaftServer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r881044944


##########
raft/src/main/java/org/apache/kafka/raft/RaftConfig.java:
##########
@@ -256,6 +256,21 @@ public static List<Node> voterConnectionsToNodes(Map<Integer, RaftConfig.Address
             .collect(Collectors.toList());
     }
 
+    public static String nodesToVoterConnections(List<Node> nodes) {

Review Comment:
   It was included by mistake.. thanks for pointing out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #12206:
URL: https://github.com/apache/kafka/pull/12206#issuecomment-1136500940

   cc @dengziming for reviews


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r881292400


##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumResult.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.QuorumInfo;
+
+/**
+ * The result of {@link Admin#describeMetadataQuorum(DescribeMetadataQuorumOptions)}
+ *
+ */
+public class DescribeMetadataQuorumResult {
+
+    private final KafkaFuture<QuorumInfo> quorumInfo;
+
+    public DescribeMetadataQuorumResult(KafkaFuture<QuorumInfo> quorumInfo) {

Review Comment:
   nit: Could we keep it package private?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getVoterOffsets(topicName, partition).entrySet()) {

Review Comment:
   nit: We could use `response.getVoterOffsets(topicName, partition).forEach((replicaId, logEndOffset) ->`. That makes the code a bit more readable in my opinion.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));

Review Comment:
   hm.. `OptionalLong.empty()` seems wrong here. Don't you need to put the last fetch time and the last caughtup time?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));
+                }
+                List<ReplicaState> observers = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getObserverOffsets(topicName, partition).entrySet()) {
+                    observers.add(new ReplicaState(entry.getKey(), entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));

Review Comment:
   Same question here.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));
+                }
+                List<ReplicaState> observers = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getObserverOffsets(topicName, partition).entrySet()) {
+                    observers.add(new ReplicaState(entry.getKey(), entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));
+                }
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                DescribeQuorumRequestData data = new DescribeQuorumRequestData()
+                        .setTopics(singletonList(new DescribeQuorumRequestData.TopicData()
+                                .setPartitions(singletonList(new DescribeQuorumRequestData.PartitionData()
+                                        .setPartitionIndex(0)))
+                                .setTopicName(METADATA_TOPIC_NAME)));
+                return new Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                if (quorumResponse.data().errorCode() == Errors.NONE.code()) {
+                    future.complete(createQuorumResult(quorumResponse));
+                } else {
+                    future.completeExceptionally(Errors.forCode(quorumResponse.data().errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), throwable);

Review Comment:
   nit: Why not directly calling `future.completeExceptionally`?



##########
clients/src/main/java/org/apache/kafka/common/utils/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;

Review Comment:
   Is `org.apache.kafka.common` the right package here? It seems preferable to put them in `org.apache.kafka.clients.admin` package.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -592,6 +597,26 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures
                 .setErrorCode(error.code()));
     }
 
+    private static QuorumInfo defaultQuorumInfo() {
+        return new QuorumInfo(Topic.METADATA_TOPIC_NAME, 0,
+                singletonList(new QuorumInfo.ReplicaState()),
+                singletonList(new QuorumInfo.ReplicaState()));
+    }
+
+    private static DescribeQuorumResponse prepareDescribeQuorumResponse(Errors error) {
+        if (error == Errors.NONE) {
+            return new DescribeQuorumResponse(DescribeQuorumResponse.singletonResponse(
+                        new TopicPartition(Topic.METADATA_TOPIC_NAME, 0),
+                        0, 0, 0,
+                        singletonList(new DescribeQuorumResponseData.ReplicaState()),
+                        singletonList(new DescribeQuorumResponseData.ReplicaState()))

Review Comment:
   It would be better to fully populate the response here. That would have caught the issue that I mentioned earlier.



##########
clients/src/main/resources/common/message/DescribeQuorumResponse.json:
##########
@@ -17,9 +17,10 @@
   "apiKey": 55,
   "type": "response",
   "name": "DescribeQuorumResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
+    // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState

Review Comment:
   nit: We usually put this before `validVersions`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {

Review Comment:
   nit: Should we use `describeMetadataQuorum`?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumIntegrationTest.scala:
##########
@@ -80,6 +85,36 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
     assertTrue(leaderState.logEndOffset > 0)
   }
 
+  @ClusterTest
+  def testDescribeQuorumRequestToBrokers() = {

Review Comment:
   Looking at the other tests, this test suite does not seem to be the right place for this test. I wonder if `KRaftClusterTest` would be more appropriate for instance but I don't feel strong about this.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4846,6 +4871,32 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create((short) 55, (short) 0, (short) 1));
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE));
+            final KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            final QuorumInfo quorumInfo = future.get();
+            assertEquals(defaultQuorumInfo(), quorumInfo);
+        }
+    }
+
+    @Test
+    public void testDescribeMetadataQuorumFailure() {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create((short) 55, (short) 0, (short) 1));
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.INVALID_REQUEST));
+            final KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            final ExecutionException e = assertThrows(ExecutionException.class, future::get);
+            assertEquals(e.getCause().getClass(), Errors.INVALID_REQUEST.exception().getClass());

Review Comment:
   nit: You could use `TestUtils.assertFutureThrows`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);

Review Comment:
   Implementation of `getPartitionLeaderId`, `getVoterOffsets` and `getObserverOffsets` always start by looking up the relevant topic/partition in the response. Did you consider doing this lookup once and use the result to construct the `QuorumInfo`?



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -592,6 +597,26 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures
                 .setErrorCode(error.code()));
     }
 
+    private static QuorumInfo defaultQuorumInfo() {
+        return new QuorumInfo(Topic.METADATA_TOPIC_NAME, 0,
+                singletonList(new QuorumInfo.ReplicaState()),
+                singletonList(new QuorumInfo.ReplicaState()));
+    }
+
+    private static DescribeQuorumResponse prepareDescribeQuorumResponse(Errors error) {
+        if (error == Errors.NONE) {
+            return new DescribeQuorumResponse(DescribeQuorumResponse.singletonResponse(
+                        new TopicPartition(Topic.METADATA_TOPIC_NAME, 0),
+                        0, 0, 0,
+                        singletonList(new DescribeQuorumResponseData.ReplicaState()),
+                        singletonList(new DescribeQuorumResponseData.ReplicaState()))
+                    );
+        }
+        return new DescribeQuorumResponse(
+                new DescribeQuorumResponseData()
+                .setErrorCode(error.code()));

Review Comment:
   nit: Indentation seems off here.



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumIntegrationTest.scala:
##########
@@ -17,25 +17,30 @@
 package kafka.server
 
 import java.io.IOException
-
 import kafka.test.ClusterInstance
 import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
 import kafka.test.junit.ClusterTestExtensions
-import kafka.utils.NotNothing
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.{NotNothing, TestUtils}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeMetadataQuorumOptions}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
+import org.apache.kafka.metadata.BrokerState
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
+import org.slf4j.LoggerFactory
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
+@Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
-class DescribeQuorumRequestTest(cluster: ClusterInstance) {
+class DescribeQuorumIntegrationTest(cluster: ClusterInstance) {
+  val log = LoggerFactory.getLogger(classOf[DescribeQuorumIntegrationTest])
 
   @ClusterTest(clusterType = Type.ZK)
   def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {

Review Comment:
   Should we extend `testDescribeQuorum` to verify that new fields are set when v1 is used and not set when v0 is used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r882204566


##########
core/src/test/scala/unit/kafka/server/DescribeQuorumIntegrationTest.scala:
##########
@@ -26,16 +25,17 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
+@Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
-class DescribeQuorumRequestTest(cluster: ClusterInstance) {
+class DescribeQuorumTest(cluster: ClusterInstance) {

Review Comment:
   I changed it so that the new test resides in `KRaftClusterTest`. This was Daivd's suggestion and I felt it made more sense there, especially because that new test used a different test framework than these ones.
   The change in the name got included by mistake and I will revert it what it was.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] niket-goel commented on pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
niket-goel commented on PR #12206:
URL: https://github.com/apache/kafka/pull/12206#issuecomment-1138955079

   Thanks @dengziming . I have pushed another version where all tests are passing locally. Appreciate your help with the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji merged pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

Posted by GitBox <gi...@apache.org>.
hachikuji merged PR #12206:
URL: https://github.com/apache/kafka/pull/12206


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org