You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/13 19:47:25 UTC

[GitHub] [kafka] cmccabe opened a new pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

cmccabe opened a new pull request #10688:
URL: https://github.com/apache/kafka/pull/10688


   Not all RPC requests to the quorum controller include a timeout, but we
   should honor the timeouts that do exist.
   
   For electLeaders, attempt to trigger a leader election for all
   partitions when the request specifies null for the topics argument.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#discussion_r632695254



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1002,7 +1013,13 @@ private QuorumController(LogContext logContext,
     @Override
     public CompletableFuture<ElectLeadersResponseData>
             electLeaders(ElectLeadersRequestData request) {
-        return appendWriteEvent("electLeaders", request.timeoutMs(),
+        // If topicPartitions is null, we will try to trigger a new leader election on
+        // all partitions (!).  But if it's empty, there is nothing to do.
+        if (request.topicPartitions() != null && request.topicPartitions().isEmpty()) {
+            return CompletableFuture.completedFuture(new ElectLeadersResponseData());
+        }
+        return appendWriteEvent("electLeaders",
+            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),

Review comment:
       This is a slightly different fix, although sort of related.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#discussion_r632616511



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1002,7 +1013,13 @@ private QuorumController(LogContext logContext,
     @Override
     public CompletableFuture<ElectLeadersResponseData>
             electLeaders(ElectLeadersRequestData request) {
-        return appendWriteEvent("electLeaders", request.timeoutMs(),
+        // If topicPartitions is null, we will try to trigger a new leader election on
+        // all partitions (!).  But if it's empty, there is nothing to do.
+        if (request.topicPartitions() != null && request.topicPartitions().isEmpty()) {
+            return CompletableFuture.completedFuture(new ElectLeadersResponseData());
+        }
+        return appendWriteEvent("electLeaders",
+            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),

Review comment:
       Is this related to timeouts, or is it just a different fix?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -800,19 +800,44 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVers
         boolean uncleanOk = electionTypeIsUnclean(request.electionType());
         List<ApiMessageAndVersion> records = new ArrayList<>();
         ElectLeadersResponseData response = new ElectLeadersResponseData();
-        for (TopicPartitions topic : request.topicPartitions()) {
-            ReplicaElectionResult topicResults =
-                new ReplicaElectionResult().setTopic(topic.topic());
-            response.replicaElectionResults().add(topicResults);
-            for (int partitionId : topic.partitions()) {
-                ApiError error = electLeader(topic.topic(), partitionId, uncleanOk, records);
-                topicResults.partitionResult().add(new PartitionResult().
-                    setPartitionId(partitionId).
-                    setErrorCode(error.error().code()).
-                    setErrorMessage(error.message()));
+        if (request.topicPartitions() == null) {
+            // If topicPartitions is null, we try to elect a new leader for every partition.
+            // There are some obvious issues with this wire protocol.  For example, what
+            // if we have too many partitions to fit the results in a single RPC?  Or what
+            // if we generate too many records to fit in a single batch?  This behavior

Review comment:
       Do we need all the records in a single batch, or could we create a batch for each topic (including its partitions)?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1165,4 +1192,22 @@ public long curClaimEpoch() {
     public void close() throws InterruptedException {
         queue.close();
     }
+
+    // VisibleForTesting
+    CountDownLatch pause() {
+        final CountDownLatch latch = new CountDownLatch(1);
+        appendControlEvent("pause", () -> {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                log.info("Interrupted while waiting for unpause.", e);
+            }
+        });
+        return latch;
+    }
+
+    // VisibleForTesting
+    Time time() {
+        return time;
+    }

Review comment:
       Do we need this? Can't we just use the Time we pass into the constructor in tests? Not a big deal really, just wondering

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1030,17 +1050,24 @@ private QuorumController(LogContext logContext,
     @Override
     public CompletableFuture<AlterPartitionReassignmentsResponseData>
             alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
-        CompletableFuture<AlterPartitionReassignmentsResponseData> future = new CompletableFuture<>();
-        future.completeExceptionally(new UnsupportedOperationException());
-        return future;
+        if (request.topics().isEmpty()) {
+            return CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData());
+        }
+        return appendWriteEvent("alterPartitionReassignments",
+            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
+            () -> {
+                throw new UnsupportedOperationException();
+            });
     }
 
     @Override
     public CompletableFuture<ListPartitionReassignmentsResponseData>
             listPartitionReassignments(ListPartitionReassignmentsRequestData request) {
-        CompletableFuture<ListPartitionReassignmentsResponseData> future = new CompletableFuture<>();
-        future.completeExceptionally(new UnsupportedOperationException());
-        return future;
+        return appendReadEvent("listPartitionReassignments",
+            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
+            () -> {
+                throw new UnsupportedOperationException();
+            });

Review comment:
       nit: we can do without the curly braces here and above. However, these will soon be replaced by the actual impl




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#discussion_r632696250



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1165,4 +1192,22 @@ public long curClaimEpoch() {
     public void close() throws InterruptedException {
         queue.close();
     }
+
+    // VisibleForTesting
+    CountDownLatch pause() {
+        final CountDownLatch latch = new CountDownLatch(1);
+        appendControlEvent("pause", () -> {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                log.info("Interrupted while waiting for unpause.", e);
+            }
+        });
+        return latch;
+    }
+
+    // VisibleForTesting
+    Time time() {
+        return time;
+    }

Review comment:
       We have several helper classes for creating quorum controllers, so getting access to the time parameter that way would be difficult.  Anyway, this is package-private, so it really only applies to unit tests specifically.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10688:
URL: https://github.com/apache/kafka/pull/10688


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#discussion_r632697059



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -800,19 +800,44 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVers
         boolean uncleanOk = electionTypeIsUnclean(request.electionType());
         List<ApiMessageAndVersion> records = new ArrayList<>();
         ElectLeadersResponseData response = new ElectLeadersResponseData();
-        for (TopicPartitions topic : request.topicPartitions()) {
-            ReplicaElectionResult topicResults =
-                new ReplicaElectionResult().setTopic(topic.topic());
-            response.replicaElectionResults().add(topicResults);
-            for (int partitionId : topic.partitions()) {
-                ApiError error = electLeader(topic.topic(), partitionId, uncleanOk, records);
-                topicResults.partitionResult().add(new PartitionResult().
-                    setPartitionId(partitionId).
-                    setErrorCode(error.error().code()).
-                    setErrorMessage(error.message()));
+        if (request.topicPartitions() == null) {
+            // If topicPartitions is null, we try to elect a new leader for every partition.
+            // There are some obvious issues with this wire protocol.  For example, what
+            // if we have too many partitions to fit the results in a single RPC?  Or what
+            // if we generate too many records to fit in a single batch?  This behavior

Review comment:
       Thinking about this more, it doesn't seem necessary to do this atomically.  We can just do it non-atomically.  It's not a logically indivisible operation like creating a topic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#discussion_r632695450



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1030,17 +1050,24 @@ private QuorumController(LogContext logContext,
     @Override
     public CompletableFuture<AlterPartitionReassignmentsResponseData>
             alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
-        CompletableFuture<AlterPartitionReassignmentsResponseData> future = new CompletableFuture<>();
-        future.completeExceptionally(new UnsupportedOperationException());
-        return future;
+        if (request.topics().isEmpty()) {
+            return CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData());
+        }
+        return appendWriteEvent("alterPartitionReassignments",
+            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
+            () -> {
+                throw new UnsupportedOperationException();
+            });
     }
 
     @Override
     public CompletableFuture<ListPartitionReassignmentsResponseData>
             listPartitionReassignments(ListPartitionReassignmentsRequestData request) {
-        CompletableFuture<ListPartitionReassignmentsResponseData> future = new CompletableFuture<>();
-        future.completeExceptionally(new UnsupportedOperationException());
-        return future;
+        return appendReadEvent("listPartitionReassignments",
+            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
+            () -> {
+                throw new UnsupportedOperationException();
+            });

Review comment:
       agreed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10688: KAFKA-12778: Fix QuorumController request timeouts and electLeaders

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10688:
URL: https://github.com/apache/kafka/pull/10688#issuecomment-841410862


   > Generally, I don't like that we need to explicitly extract the timeout from the request and pass it into the controller call. It would nice if this could be generalized or automated somehow. However, something like this is not needed for this minor fix.
   
   If we were starting all over again, we could give all RPC requests a timeout, located in the request header itself.  But that isn't really the case today -- some requests don't have timeouts, some do, and we have to extract it from the specific request schema.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org