You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/14 15:46:34 UTC

[GitHub] [kafka] mumrah commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

mumrah commented on a change in pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#discussion_r632616511



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1002,7 +1013,13 @@ private QuorumController(LogContext logContext,
     @Override
     public CompletableFuture<ElectLeadersResponseData>
             electLeaders(ElectLeadersRequestData request) {
-        return appendWriteEvent("electLeaders", request.timeoutMs(),
+        // If topicPartitions is null, we will try to trigger a new leader election on
+        // all partitions (!).  But if it's empty, there is nothing to do.
+        if (request.topicPartitions() != null && request.topicPartitions().isEmpty()) {
+            return CompletableFuture.completedFuture(new ElectLeadersResponseData());
+        }
+        return appendWriteEvent("electLeaders",
+            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),

Review comment:
       Is this related to timeouts, or is it just a different fix?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -800,19 +800,44 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVers
         boolean uncleanOk = electionTypeIsUnclean(request.electionType());
         List<ApiMessageAndVersion> records = new ArrayList<>();
         ElectLeadersResponseData response = new ElectLeadersResponseData();
-        for (TopicPartitions topic : request.topicPartitions()) {
-            ReplicaElectionResult topicResults =
-                new ReplicaElectionResult().setTopic(topic.topic());
-            response.replicaElectionResults().add(topicResults);
-            for (int partitionId : topic.partitions()) {
-                ApiError error = electLeader(topic.topic(), partitionId, uncleanOk, records);
-                topicResults.partitionResult().add(new PartitionResult().
-                    setPartitionId(partitionId).
-                    setErrorCode(error.error().code()).
-                    setErrorMessage(error.message()));
+        if (request.topicPartitions() == null) {
+            // If topicPartitions is null, we try to elect a new leader for every partition.
+            // There are some obvious issues with this wire protocol.  For example, what
+            // if we have too many partitions to fit the results in a single RPC?  Or what
+            // if we generate too many records to fit in a single batch?  This behavior

Review comment:
       Do we need all the records in a single batch, or could we create a batch for each topic (including its partitions)?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1165,4 +1192,22 @@ public long curClaimEpoch() {
     public void close() throws InterruptedException {
         queue.close();
     }
+
+    // VisibleForTesting
+    CountDownLatch pause() {
+        final CountDownLatch latch = new CountDownLatch(1);
+        appendControlEvent("pause", () -> {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                log.info("Interrupted while waiting for unpause.", e);
+            }
+        });
+        return latch;
+    }
+
+    // VisibleForTesting
+    Time time() {
+        return time;
+    }

Review comment:
       Do we need this? Can't we just use the Time we pass into the constructor in tests? Not a big deal really, just wondering

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1030,17 +1050,24 @@ private QuorumController(LogContext logContext,
     @Override
     public CompletableFuture<AlterPartitionReassignmentsResponseData>
             alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
-        CompletableFuture<AlterPartitionReassignmentsResponseData> future = new CompletableFuture<>();
-        future.completeExceptionally(new UnsupportedOperationException());
-        return future;
+        if (request.topics().isEmpty()) {
+            return CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData());
+        }
+        return appendWriteEvent("alterPartitionReassignments",
+            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
+            () -> {
+                throw new UnsupportedOperationException();
+            });
     }
 
     @Override
     public CompletableFuture<ListPartitionReassignmentsResponseData>
             listPartitionReassignments(ListPartitionReassignmentsRequestData request) {
-        CompletableFuture<ListPartitionReassignmentsResponseData> future = new CompletableFuture<>();
-        future.completeExceptionally(new UnsupportedOperationException());
-        return future;
+        return appendReadEvent("listPartitionReassignments",
+            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
+            () -> {
+                throw new UnsupportedOperationException();
+            });

Review comment:
       nit: we can do without the curly braces here and above. However, these will soon be replaced by the actual impl




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org