You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/25 07:41:47 UTC

[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag

dajac commented on code in PR #12206:
URL: https://github.com/apache/kafka/pull/12206#discussion_r881292400


##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeMetadataQuorumResult.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.QuorumInfo;
+
+/**
+ * The result of {@link Admin#describeMetadataQuorum(DescribeMetadataQuorumOptions)}
+ *
+ */
+public class DescribeMetadataQuorumResult {
+
+    private final KafkaFuture<QuorumInfo> quorumInfo;
+
+    public DescribeMetadataQuorumResult(KafkaFuture<QuorumInfo> quorumInfo) {

Review Comment:
   nit: Could we keep it package private?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getVoterOffsets(topicName, partition).entrySet()) {

Review Comment:
   nit: We could use `response.getVoterOffsets(topicName, partition).forEach((replicaId, logEndOffset) ->`. That makes the code a bit more readable in my opinion.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));

Review Comment:
   hm.. `OptionalLong.empty()` seems wrong here. Don't you need to put the last fetch time and the last caughtup time?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));
+                }
+                List<ReplicaState> observers = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getObserverOffsets(topicName, partition).entrySet()) {
+                    observers.add(new ReplicaState(entry.getKey(), entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));

Review Comment:
   Same question here.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);
+                List<ReplicaState> voters = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getVoterOffsets(topicName, partition).entrySet()) {
+                    voters.add(new ReplicaState(entry.getKey(), entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));
+                }
+                List<ReplicaState> observers = new ArrayList<>();
+                for (Map.Entry<Integer, Long> entry: response.getObserverOffsets(topicName, partition).entrySet()) {
+                    observers.add(new ReplicaState(entry.getKey(), entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));
+                }
+                QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
+                return info;
+            }
+
+            @Override
+            DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
+                DescribeQuorumRequestData data = new DescribeQuorumRequestData()
+                        .setTopics(singletonList(new DescribeQuorumRequestData.TopicData()
+                                .setPartitions(singletonList(new DescribeQuorumRequestData.PartitionData()
+                                        .setPartitionIndex(0)))
+                                .setTopicName(METADATA_TOPIC_NAME)));
+                return new Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
+                if (quorumResponse.data().errorCode() == Errors.NONE.code()) {
+                    future.complete(createQuorumResult(quorumResponse));
+                } else {
+                    future.completeExceptionally(Errors.forCode(quorumResponse.data().errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), throwable);

Review Comment:
   nit: Why not directly calling `future.completeExceptionally`?



##########
clients/src/main/java/org/apache/kafka/common/utils/QuorumInfo.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;

Review Comment:
   Is `org.apache.kafka.common` the right package here? It seems preferable to put them in `org.apache.kafka.clients.admin` package.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -592,6 +597,26 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures
                 .setErrorCode(error.code()));
     }
 
+    private static QuorumInfo defaultQuorumInfo() {
+        return new QuorumInfo(Topic.METADATA_TOPIC_NAME, 0,
+                singletonList(new QuorumInfo.ReplicaState()),
+                singletonList(new QuorumInfo.ReplicaState()));
+    }
+
+    private static DescribeQuorumResponse prepareDescribeQuorumResponse(Errors error) {
+        if (error == Errors.NONE) {
+            return new DescribeQuorumResponse(DescribeQuorumResponse.singletonResponse(
+                        new TopicPartition(Topic.METADATA_TOPIC_NAME, 0),
+                        0, 0, 0,
+                        singletonList(new DescribeQuorumResponseData.ReplicaState()),
+                        singletonList(new DescribeQuorumResponseData.ReplicaState()))

Review Comment:
   It would be better to fully populate the response here. That would have caught the issue that I mentioned earlier.



##########
clients/src/main/resources/common/message/DescribeQuorumResponse.json:
##########
@@ -17,9 +17,10 @@
   "apiKey": 55,
   "type": "response",
   "name": "DescribeQuorumResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
+    // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in ReplicaState

Review Comment:
   nit: We usually put this before `validVersions`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {

Review Comment:
   nit: Should we use `describeMetadataQuorum`?



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumIntegrationTest.scala:
##########
@@ -80,6 +85,36 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
     assertTrue(leaderState.logEndOffset > 0)
   }
 
+  @ClusterTest
+  def testDescribeQuorumRequestToBrokers() = {

Review Comment:
   Looking at the other tests, this test suite does not seem to be the right place for this test. I wonder if `KRaftClusterTest` would be more appropriate for instance but I don't feel strong about this.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -4846,6 +4871,32 @@ public void testDescribeFeaturesFailure() {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumSuccess() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create((short) 55, (short) 0, (short) 1));
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.NONE));
+            final KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            final QuorumInfo quorumInfo = future.get();
+            assertEquals(defaultQuorumInfo(), quorumInfo);
+        }
+    }
+
+    @Test
+    public void testDescribeMetadataQuorumFailure() {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create((short) 55, (short) 0, (short) 1));
+            env.kafkaClient().prepareResponse(
+                    body -> body instanceof DescribeQuorumRequest,
+                    prepareDescribeQuorumResponse(Errors.INVALID_REQUEST));
+            final KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            final ExecutionException e = assertThrows(ExecutionException.class, future::get);
+            assertEquals(e.getCause().getClass(), Errors.INVALID_REQUEST.exception().getClass());

Review Comment:
   nit: You could use `TestUtils.assertFutureThrows`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -4321,6 +4330,61 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
+        NodeProvider provider = new LeastLoadedNodeProvider();
+
+        final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call(
+                "describeQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
+                Integer partition = 0;
+                String topicName = response.getTopicNameByIndex(partition);
+                Integer leaderId = response.getPartitionLeaderId(topicName, partition);

Review Comment:
   Implementation of `getPartitionLeaderId`, `getVoterOffsets` and `getObserverOffsets` always start by looking up the relevant topic/partition in the response. Did you consider doing this lookup once and use the result to construct the `QuorumInfo`?



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -592,6 +597,26 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures
                 .setErrorCode(error.code()));
     }
 
+    private static QuorumInfo defaultQuorumInfo() {
+        return new QuorumInfo(Topic.METADATA_TOPIC_NAME, 0,
+                singletonList(new QuorumInfo.ReplicaState()),
+                singletonList(new QuorumInfo.ReplicaState()));
+    }
+
+    private static DescribeQuorumResponse prepareDescribeQuorumResponse(Errors error) {
+        if (error == Errors.NONE) {
+            return new DescribeQuorumResponse(DescribeQuorumResponse.singletonResponse(
+                        new TopicPartition(Topic.METADATA_TOPIC_NAME, 0),
+                        0, 0, 0,
+                        singletonList(new DescribeQuorumResponseData.ReplicaState()),
+                        singletonList(new DescribeQuorumResponseData.ReplicaState()))
+                    );
+        }
+        return new DescribeQuorumResponse(
+                new DescribeQuorumResponseData()
+                .setErrorCode(error.code()));

Review Comment:
   nit: Indentation seems off here.



##########
core/src/test/scala/unit/kafka/server/DescribeQuorumIntegrationTest.scala:
##########
@@ -17,25 +17,30 @@
 package kafka.server
 
 import java.io.IOException
-
 import kafka.test.ClusterInstance
 import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
 import kafka.test.junit.ClusterTestExtensions
-import kafka.utils.NotNothing
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.{NotNothing, TestUtils}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeMetadataQuorumOptions}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
+import org.apache.kafka.metadata.BrokerState
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
+import org.slf4j.LoggerFactory
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
+@Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
-class DescribeQuorumRequestTest(cluster: ClusterInstance) {
+class DescribeQuorumIntegrationTest(cluster: ClusterInstance) {
+  val log = LoggerFactory.getLogger(classOf[DescribeQuorumIntegrationTest])
 
   @ClusterTest(clusterType = Type.ZK)
   def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {

Review Comment:
   Should we extend `testDescribeQuorum` to verify that new fields are set when v1 is used and not set when v0 is used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org