You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/17 22:48:56 UTC

[kafka] branch trunk updated: KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader (#12517)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e5b865d6bf KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader (#12517)
e5b865d6bf is described below

commit e5b865d6bf37495e9949878c8206b9459aa5e1f4
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Aug 17 15:48:32 2022 -0700

    KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to non-leader (#12517)
    
    Currently the server will return `INVALID_REQUEST` if a `DescribeQuorum` request is sent to a node that is not the current leader. In addition to being inconsistent with all of the other leader APIs in the raft layer, this error is treated as fatal by both the forwarding manager and the admin client. Instead, we should return `NOT_LEADER_OR_FOLLOWER` as we do with the other APIs. This error is retriable and we can rely on the admin client to retry it after seeing this error.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../common/requests/DescribeQuorumResponse.java    | 16 +++++++++++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 23 +++++++++++++++++
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  5 +++-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 29 ++++++++++++++++++++++
 .../apache/kafka/raft/RaftClientTestContext.java   | 27 ++++++++++----------
 5 files changed, 85 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
index cbf945b704..06ae681bc5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
@@ -37,7 +37,7 @@ import java.util.Map;
  * - {@link Errors#BROKER_NOT_AVAILABLE}
  *
  * Partition level errors:
- * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER}
  * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
  */
 public class DescribeQuorumResponse extends AbstractResponse {
@@ -72,6 +72,19 @@ public class DescribeQuorumResponse extends AbstractResponse {
         return DEFAULT_THROTTLE_TIME;
     }
 
+    public static DescribeQuorumResponseData singletonErrorResponse(
+        TopicPartition topicPartition,
+        Errors error
+    ) {
+        return new DescribeQuorumResponseData()
+            .setTopics(Collections.singletonList(new DescribeQuorumResponseData.TopicData()
+                .setTopicName(topicPartition.topic())
+                .setPartitions(Collections.singletonList(new DescribeQuorumResponseData.PartitionData()
+                    .setPartitionIndex(topicPartition.partition())
+                    .setErrorCode(error.code())))));
+    }
+
+
     public static DescribeQuorumResponseData singletonResponse(TopicPartition topicPartition,
                                                                int leaderId,
                                                                int leaderEpoch,
@@ -82,6 +95,7 @@ public class DescribeQuorumResponse extends AbstractResponse {
             .setTopics(Collections.singletonList(new DescribeQuorumResponseData.TopicData()
                 .setTopicName(topicPartition.topic())
                 .setPartitions(Collections.singletonList(new DescribeQuorumResponseData.PartitionData()
+                    .setPartitionIndex(topicPartition.partition())
                     .setErrorCode(Errors.NONE.code())
                     .setLeaderId(leaderId)
                     .setLeaderEpoch(leaderEpoch)
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index de57813679..5faf53f075 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -5192,6 +5192,29 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumRetriableError() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+                ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+                ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+
+            // First request fails with a NOT_LEADER_OR_FOLLOWER error (which is retriable)
+            env.kafkaClient().prepareResponse(
+                body -> body instanceof DescribeQuorumRequest,
+                prepareDescribeQuorumResponse(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, false, false, false, false, false));
+
+            // The second request succeeds
+            env.kafkaClient().prepareResponse(
+                body -> body instanceof DescribeQuorumRequest,
+                prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, false, false, false));
+
+            KafkaFuture<QuorumInfo> future = env.adminClient().describeMetadataQuorum().quorumInfo();
+            QuorumInfo quorumInfo = future.get();
+            assertEquals(defaultQuorumInfo(false), quorumInfo);
+        }
+    }
+
     @Test
     public void testDescribeMetadataQuorumFailure() {
         try (final AdminClientUnitTestEnv env = mockClientEnv()) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index cac7a8a3cb..042a141a76 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1171,7 +1171,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         }
 
         if (!quorum.isLeader()) {
-            return DescribeQuorumRequest.getTopLevelErrorResponse(Errors.INVALID_REQUEST);
+            return DescribeQuorumResponse.singletonErrorResponse(
+                log.topicPartition(),
+                Errors.NOT_LEADER_OR_FOLLOWER
+            );
         }
 
         LeaderState<T> leaderState = quorum.leaderStateOrThrow();
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 9b2771d2b3..a8a346e6db 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
 import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
 import org.apache.kafka.common.message.EndQuorumEpochResponseData;
 import org.apache.kafka.common.message.FetchResponseData;
@@ -1946,6 +1947,34 @@ public class KafkaRaftClientTest {
         );
     }
 
+    @Test
+    public void testDescribeQuorumNonLeader() throws Exception {
+        int localId = 0;
+        int voter2 = localId + 1;
+        int voter3 = localId + 2;
+        int epoch = 2;
+        Set<Integer> voters = Utils.mkSet(localId, voter2, voter3);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .build();
+
+        context.deliverRequest(DescribeQuorumRequest.singletonRequest(context.metadataPartition));
+        context.pollUntilResponse();
+
+        DescribeQuorumResponseData responseData = context.collectDescribeQuorumResponse();
+        assertEquals(Errors.NONE, Errors.forCode(responseData.errorCode()));
+
+        assertEquals(1, responseData.topics().size());
+        DescribeQuorumResponseData.TopicData topicData = responseData.topics().get(0);
+        assertEquals(context.metadataPartition.topic(), topicData.topicName());
+
+        assertEquals(1, topicData.partitions().size());
+        DescribeQuorumResponseData.PartitionData partitionData = topicData.partitions().get(0);
+        assertEquals(context.metadataPartition.partition(), partitionData.partitionIndex());
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, Errors.forCode(partitionData.errorCode()));
+    }
+
     @Test
     public void testDescribeQuorum() throws Exception {
         int localId = 0;
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index d48e41fb31..b825fc8867 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.raft;
 
-import java.util.function.Consumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.memory.MemoryPool;
@@ -57,8 +56,8 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.raft.internals.BatchBuilder;
 import org.apache.kafka.raft.internals.StringSerde;
 import org.apache.kafka.server.common.serialization.RecordSerde;
-import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.snapshot.RawSnapshotWriter;
+import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 
@@ -76,6 +75,7 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
@@ -128,7 +128,7 @@ public final class RaftClientTestContext {
         private final QuorumStateStore quorumStateStore = new MockQuorumStateStore();
         private final MockableRandom random = new MockableRandom(1L);
         private final LogContext logContext = new LogContext();
-        private final MockLog log = new MockLog(METADATA_PARTITION,  Uuid.METADATA_TOPIC_ID, logContext);
+        private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID, logContext);
         private final Set<Integer> voters;
         private final OptionalInt localId;
 
@@ -440,21 +440,24 @@ public final class RaftClientTestContext {
         assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState());
     }
 
-    int assertSentDescribeQuorumResponse(
-        int leaderId,
-        int leaderEpoch,
-        long highWatermark,
-        List<ReplicaState> voterStates,
-        List<ReplicaState> observerStates
-    ) {
+    DescribeQuorumResponseData collectDescribeQuorumResponse() {
         List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.DESCRIBE_QUORUM);
         assertEquals(1, sentMessages.size());
         RaftResponse.Outbound raftMessage = sentMessages.get(0);
         assertTrue(
             raftMessage.data() instanceof DescribeQuorumResponseData,
             "Unexpected request type " + raftMessage.data());
-        DescribeQuorumResponseData response = (DescribeQuorumResponseData) raftMessage.data();
+        return (DescribeQuorumResponseData) raftMessage.data();
+    }
 
+    void assertSentDescribeQuorumResponse(
+        int leaderId,
+        int leaderEpoch,
+        long highWatermark,
+        List<ReplicaState> voterStates,
+        List<ReplicaState> observerStates
+    ) {
+        DescribeQuorumResponseData response = collectDescribeQuorumResponse();
         DescribeQuorumResponseData expectedResponse = DescribeQuorumResponse.singletonResponse(
             metadataPartition,
             leaderId,
@@ -462,9 +465,7 @@ public final class RaftClientTestContext {
             highWatermark,
             voterStates,
             observerStates);
-
         assertEquals(expectedResponse, response);
-        return raftMessage.correlationId();
     }
 
     int assertSentVoteRequest(int epoch, int lastEpoch, long lastEpochOffset, int numVoteReceivers) {