You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/03 13:38:28 UTC

[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r888913198


##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1446,6 +1446,35 @@ default DescribeFeaturesResult describeFeatures() {
      */
     UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);
 
+    /**
+     * Describes the state of the metadata quorum.
+     * <p>
+     * This is a convenience method for {@link #describeMetadataQuorum(DescribeMetadataQuorumOptions)}  with default options.

Review Comment:
   nit: There is an extra space before `with`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumResult.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of {@link Admin#describeMetadataQuorum(DescribeMetadataQuorumOptions)}
+ *
+ */
+public class DescribeMetadataQuorumResult {
+
+    private final KafkaFuture<QuorumInfo> quorumInfo;
+
+    DescribeMetadataQuorumResult(KafkaFuture<QuorumInfo> quorumInfo) {
+        this.quorumInfo = quorumInfo;
+    }
+
+    /**
+     * Returns a future QuorumInfo

Review Comment:
   nit: Should we say `Returns a future containing the quorum info.`?



##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumOptions.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+/**
+ * Options for {@link ConfluentAdmin#describeQuorum(DescribeMetadataQuorumOptions)}.
+ *

Review Comment:
   nit: We could remove this empty line.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,91 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(0);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                response.getVoterInfo(topicName, partition).forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                response.getObserverInfo(topicName, partition).forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                try {
+                    if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(quorumResponse.data().errorCode()).exception();
+                    }
+                    if (quorumResponse.data().topics().size() > 1) {
+                        log.error("DescribeMetadataQuorum received {} topics when 1 was expected",
+                                quorumResponse.data().topics().size());
+                        throw new UnknownServerException();

Review Comment:
   * We should add a message in every exception provided back to the user. If we do this, we can log as debug instead of error.
   * Instead of throwing the exception, we usually complete the future and return.



##########
clients/src/main/resources/common/message/DescribeQuorumResponse.json:
##########
@@ -17,7 +17,8 @@
   "apiKey": 55,
   "type": "response",
   "name": "DescribeQuorumResponse",
-  "validVersions": "0",
+  // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState

Review Comment:
   nit: Could we add the KIP reference?



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,

Review Comment:
   nit: You could use `NodeApiVersions.create()`.



##########
clients/src/main/resources/common/message/DescribeQuorumRequest.json:
##########
@@ -18,7 +18,8 @@
   "type": "request",
   "listeners": ["broker", "controller"],
   "name": "DescribeQuorumRequest",
-  "validVersions": "0",
+  // Version 1 adds additional fields in the response. The request is unchanged.

Review Comment:
   nit: Could we add the KIP reference?



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+                    ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+                    ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, false, false));
+            final KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            final QuorumInfo quorumInfo = future.get();
+            assertEquals(defaultQuorumInfo(), quorumInfo);
+        }
+    }
+
+    @Test
+    public void testDescribeMetadataQuorumFailure() {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+                        ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+                        ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+
+            // Test top level error
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.INVALID_REQUEST, Errors.NONE, false, false, false, false));
+            KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, Errors.INVALID_REQUEST.exception().getClass());

Review Comment:
   nit: `InvalidRequestException.class`?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {

Review Comment:
   I wonder if we could use `@ApiKeyVersionsSource(apiKey = ApiKeys.DESCRIBE_QUORUM)`. I don't know if that works with `@ClusterTest` though.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+                    ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+                    ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, false, false));
+            final KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            final QuorumInfo quorumInfo = future.get();
+            assertEquals(defaultQuorumInfo(), quorumInfo);
+        }
+    }
+
+    @Test
+    public void testDescribeMetadataQuorumFailure() {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+                        ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+                        ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+
+            // Test top level error
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.INVALID_REQUEST, Errors.NONE, false, false, false, false));
+            KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, Errors.INVALID_REQUEST.exception().getClass());
+            future = null;
+
+            // Test incorrect topic count
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, true, false, false, false));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, UnknownServerException.class);
+            future = null;
+
+            // Test incorrect topic name
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, true, false, false));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, UnknownServerException.class);
+            future = null;
+
+            // Test incorrect partition count
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, true, false));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, UnknownServerException.class);
+            future = null;
+
+            // Test incorrect partition index
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, false, true));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, UnknownServerException.class);
+            future = null;
+
+            // Test partition level error
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.INVALID_REQUEST, false, false, false, false));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, Errors.INVALID_REQUEST.exception().getClass());
+
+            // Test all incorrect and no errors
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, true, true, true, true));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, UnknownServerException.class);
+            future = null;
+
+            // Test all incorrect and both errors
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.INVALID_REQUEST, Errors.INVALID_REQUEST, true, true, true, true));
+            future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            TestUtils.assertFutureThrows(future, Errors.INVALID_REQUEST.exception().getClass());
+        }
+    }
+

Review Comment:
   Could we add a test case where `lastFetchTimestamp` and `lastCaughtUpTimestamp` are not provided in the response? I suppose that should result in empty options, right?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])

Review Comment:
   nit: We could use `.toShort`.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -725,4 +725,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(KafkaRaftServer.MetadataTopic, quorumInfo.topic())
+        assertEquals(3, quorumInfo.voters.size())
+        assertEquals(0, quorumInfo.observers.size())

Review Comment:
   Do we need to do something about this in this PR?



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.

Review Comment:
   The reference to DescribeQuorumResponse is a bit weird here for our users. Could we to better describe what QuorumInfo represents?



##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumResult.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of {@link Admin#describeMetadataQuorum(DescribeMetadataQuorumOptions)}
+ *

Review Comment:
   nit: We could remove this empty line.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4328,91 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(0);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<QuorumInfo.ReplicaState> voters = new ArrayList<>();
+                List<QuorumInfo.ReplicaState> observers = new ArrayList<>();
+                response.getVoterInfo(topicName, partition).forEach(v -> {
+                    voters.add(new QuorumInfo.ReplicaState(v.replicaId(),
+                                v.logEndOffset(),
+                                OptionalLong.of(v.lastFetchTimestamp()),
+                                OptionalLong.of(v.lastCaughtUpTimestamp())));
+                });
+                response.getObserverInfo(topicName, partition).forEach(o -> {
+                    observers.add(new QuorumInfo.ReplicaState(o.replicaId(),
+                                o.logEndOffset(),
+                                OptionalLong.of(o.lastFetchTimestamp()),
+                                OptionalLong.of(o.lastCaughtUpTimestamp())));
+                });
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                return new Builder(DescribeQuorumRequest.singletonRequest(
+                        new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition())));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                try {
+                    if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(quorumResponse.data().errorCode()).exception();
+                    }
+                    if (quorumResponse.data().topics().size() > 1) {
+                        log.error("DescribeMetadataQuorum received {} topics when 1 was expected",
+                                quorumResponse.data().topics().size());
+                        throw new UnknownServerException();
+                    }
+                    DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0);
+                    if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+                        log.error("DescribeMetadataQuorum received a topic with name {} when {} was expected",
+                                topic.topicName(), METADATA_TOPIC_NAME);
+                        throw new UnknownServerException();
+                    }
+                    if (topic.partitions().size() > 1) {
+                        log.error("DescribeMetadataQuorum received a topic {} with {} partitions when 1 was expected",
+                                topic.topicName(), topic.partitions().size());
+                        throw new UnknownServerException();
+                    }
+                    DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0);
+                    if (partition.partitionIndex() != METADATA_TOPIC_PARTITION.partition()) {
+                        log.error("DescribeMetadataQuorum received a single partition with index {} when {} was expected",
+                                partition.partitionIndex(), METADATA_TOPIC_PARTITION.partition());
+                        throw new UnknownServerException();
+                    }
+                    if (partition.errorCode() != Errors.NONE.code()) {
+                        throw Errors.forCode(partition.errorCode()).exception();
+                    }
+                    future.complete(createQuorumResult(quorumResponse));

Review Comment:
   At this point, we have the `partition` that we are interested in. Couldn't we directly use it to populate `QuorumInfo` instead of looking it up again in `createQuorumResult`?



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
+        this.topic = topic;
+        this.leaderId = leaderId;
+        this.voters = voters;
+        this.observers = observers;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    public Integer leaderId() {
+        return leaderId;
+    }
+
+    public List<ReplicaState> voters() {
+        return voters;
+    }
+
+    public List<ReplicaState> observers() {
+        return observers;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        QuorumInfo that = (QuorumInfo) o;
+        return topic.equals(that.topic)
+            && leaderId.equals(that.leaderId)
+            && voters.equals(that.voters)
+            && observers.equals(that.observers);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topic, leaderId, voters, observers);
+    }
+
+    @Override
+    public String toString() {
+        return "QuorumInfo(" +
+            "topic='" + topic + '\'' +
+            ", leaderId=" + leaderId +
+            ", voters=" + voters.toString() +
+            ", observers=" + observers.toString() +
+            ')';
+    }
+
+    public static class ReplicaState {
+        private final int replicaId;
+        private final long logEndOffset;
+        private final OptionalLong lastFetchTimeMs;
+        private final OptionalLong lastCaughtUpTimeMs;
+
+        ReplicaState() {
+            this(0, 0, OptionalLong.empty(), OptionalLong.empty());
+        }
+
+        ReplicaState(int replicaId, long logEndOffset,
+                OptionalLong lastFetchTimeMs, OptionalLong lastCaughtUpTimeMs) {
+            this.replicaId = replicaId;
+            this.logEndOffset = logEndOffset;
+            this.lastFetchTimeMs = lastFetchTimeMs;
+            this.lastCaughtUpTimeMs = lastCaughtUpTimeMs;
+        }
+
+        public int replicaId() {

Review Comment:
   nit: Should we add javadoc to all the getters to be consistent? It is a bit weird to have for some but not for all.



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;
+    private final Integer leaderId;
+    private final List<ReplicaState> voters;
+    private final List<ReplicaState> observers;
+
+    public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {

Review Comment:
   nit: Could we keep it package private?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+        val voterData = partitionData.currentVoters().asScala

Review Comment:
   nit: Indentation is off here.



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
+        .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
+      assertTrue(leaderState.logEndOffset > 0)
+
+        val voterData = partitionData.currentVoters().asScala
+        val observerData = partitionData.observers().asScala
+        if (version == 0) {

Review Comment:
   Could we verify anything if version is > 0?



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This is used to describe per-partition state in the DescribeQuorumResponse.
+ */
+public class QuorumInfo {
+    private final String topic;

Review Comment:
   It is curious that we have the topic but not the partition. Is it on purpose, perhaps because we assume only one partition anyway?



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +778,35 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val props = cluster.clientProperties()
+      props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+      val admin = Admin.create(props)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo().get()
+
+        assertEquals(KafkaRaftServer.MetadataTopic, quorumInfo.topic())

Review Comment:
   Should we verify all the fields to ensure that the server give us the info that we expected?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala:
##########
@@ -54,30 +54,44 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest
   def testDescribeQuorum(): Unit = {
-    val request = new DescribeQuorumRequest.Builder(
-      singletonRequest(KafkaRaftServer.MetadataPartition)
-    ).build()
-
-    val response = connectAndReceive[DescribeQuorumResponse](request)
-
-    assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
-    assertEquals(1, response.data.topics.size)
-
-    val topicData = response.data.topics.get(0)
-    assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
-    assertEquals(1, topicData.partitions.size)
-
-    val partitionData = topicData.partitions.get(0)
-    assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
-    assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
-    assertTrue(partitionData.leaderEpoch > 0)
-
-    val leaderId = partitionData.leaderId
-    assertTrue(leaderId > 0)
-
-    val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)
-      .getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
-    assertTrue(leaderState.logEndOffset > 0)
+    for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
+      val request = new DescribeQuorumRequest.Builder(
+        singletonRequest(KafkaRaftServer.MetadataPartition)
+      ).build(version.asInstanceOf[Short])
+      val response = connectAndReceive[DescribeQuorumResponse](request)
+
+      assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals(1, response.data.topics.size)
+
+      val topicData = response.data.topics.get(0)
+      assertEquals(KafkaRaftServer.MetadataTopic, topicData.topicName)
+      assertEquals(1, topicData.partitions.size)
+
+      val partitionData = topicData.partitions.get(0)
+      assertEquals(KafkaRaftServer.MetadataPartition.partition, partitionData.partitionIndex)
+      assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertTrue(partitionData.leaderEpoch > 0)
+
+      val leaderId = partitionData.leaderId
+      assertTrue(leaderId > 0)
+
+      val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId)

Review Comment:
   Should we assert all the fields of all the replicas? I would also verify that we have the correct voters and observers, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org