You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cmccabe (via GitHub)" <gi...@apache.org> on 2023/05/22 23:43:51 UTC

[GitHub] [kafka] cmccabe opened a new pull request, #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

cmccabe opened a new pull request, #13742:
URL: https://github.com/apache/kafka/pull/13742

   Previously, if a user tried to perform an overly large batch operation on the KRaft controller (such as creating a million topics), we would create a very large number of records in memory. Our attempt to write these records to the Raft layer would fail, because there were too many to fit in an atomic batch. This failure, in turn, would trigger a controller failover.
   
   (Note: I am assuming here that no topic creation policy was in place that would prevent the creation of a million topics. I am also assuming that the user operation must be done atomically, which is true for all current user operations, since we have not implemented KIP-868 yet.)
   
   With this PR, we fail immediately when the number of records we have generated exceeds the threshold that we can apply. This failure does not generate a controller failover.
   
   In order to implement this in a simple way, this PR adds the BoundedList class, which wraps any list and adds a maximum length. Attempts to grow the list beyond this length cause an exception to be thrown.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13742:
URL: https://github.com/apache/kafka/pull/13742#discussion_r1207117766


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -160,6 +161,14 @@
  * the controller can fully initialize.
  */
 public final class QuorumController implements Controller {
+    /**
+     * The maximum records any user-initiated operation is allowed to generate.
+     */
+    final static int MAX_RECORDS_PER_USER_OP = 10000;

Review Comment:
   It was chosen to be the same as MAX_RECORDS_PER_BATCH which is also 10000.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on PR #13742:
URL: https://github.com/apache/kafka/pull/13742#issuecomment-1564483362

   @divijvaidya Colin can correct me if I'm mistaken, but I believe this patch is mainly about closing an existing edge case until we implement KIP-868 (metadata transactions). Once we have transactions in the controller, we can allow arbitrarily large batches of records.
   
   > I am concerned about the user facing aspect of this change. If I am a user and get this exception, what am I expected to do to resolve it?
   
   Right now, if you create a topic with more than ~10000 partitions, you'll get a server error anyways. The controller fails to commit the batch, throws and exception, and the renounces leadership. 
   
   Here's what happens on the controller:
   ```
   [2023-05-26 10:24:28,308] DEBUG [QuorumController id=1] Got exception while running createTopics(1813420413). Invoking handleException. (org.apache.kafka.queue.KafkaEventQueue)
   java.lang.IllegalStateException: Attempted to atomically commit 20001 records, but maxRecordsPerBatch is 10000
   	at org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)
   	at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)
   	at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
   	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
   	at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
   	at java.lang.Thread.run(Thread.java:750)
   [2023-05-26 10:24:28,314] INFO [RaftManager id=1] Received user request to resign from the current epoch 3 (org.apache.kafka.raft.KafkaRaftClient)
   [2023-05-26 10:24:28,323] INFO [RaftManager id=1] Failed to handle fetch from 2 at 142 due to NOT_LEADER_OR_FOLLOWER (org.apache.kafka.raft.KafkaRaftClient)
   ```
   
   And the client sees:
   ```
   [2023-05-26 10:24:28,351] ERROR org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request.
    (kafka.admin.TopicCommand$)
   ```
   
   So, really this patch isn't changing anything from the client's perspective. It's just prevent the controller from renouncing (which is the real problem).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13742:
URL: https://github.com/apache/kafka/pull/13742#issuecomment-1564904075

   Thanks for all the reviews, and thanks @mumrah for the LGTM. Since this is a 3.5 blocker I am getting it in today so that it will be in the next RC.
   
   As I said before, this doesn't add any new limits, but just prevents damage to the controller when the existing limits are exceeded. However, the discussion about limits here was good. I think we should consider a follow-on KIP to make the maximum number of records per user operation configurable, and possibly add a configurabe partitions_max as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] edoardocomar commented on pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

Posted by "edoardocomar (via GitHub)" <gi...@apache.org>.
edoardocomar commented on PR #13742:
URL: https://github.com/apache/kafka/pull/13742#issuecomment-1564701149

   I had just done an alternative smaller PR for the same issue
   https://github.com/apache/kafka/pull/13766


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13742:
URL: https://github.com/apache/kafka/pull/13742#discussion_r1206985698


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -457,9 +466,13 @@ private Throwable handleEventException(String name,
         long endProcessingTime = time.nanoseconds();
         long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
         long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
-        if (exception instanceof ApiException) {
+        if ((exception instanceof ApiException) ||
+                (exception instanceof BoundedListTooLongException)) {
             log.info("{}: failed with {} in {} us. Reason: {}", name,
                 exception.getClass().getSimpleName(), deltaUs, exception.getMessage());
+            if (exception instanceof BoundedListTooLongException) {
+                exception = new UnknownServerException(exception.getMessage());

Review Comment:
   `UnknownServerException` is pretty uninformative. Can we use `PolicyViolationException` with a string indicating the specifics? Not perfect, but the custom string sent back to the user can clarify what's going on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe merged pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe merged PR #13742:
URL: https://github.com/apache/kafka/pull/13742


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13742:
URL: https://github.com/apache/kafka/pull/13742#discussion_r1203739372


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -160,6 +161,14 @@
  * the controller can fully initialize.
  */
 public final class QuorumController implements Controller {
+    /**
+     * The maximum records any user-initiated operation is allowed to generate.
+     */
+    final static int MAX_RECORDS_PER_USER_OP = 10000;

Review Comment:
   Please add a detailed comment explaining how this number was calculated. It would help future authors to understand the considerations to take into account while changing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #13742:
URL: https://github.com/apache/kafka/pull/13742#issuecomment-1570390371

   @cmccabe Can we now close KAFKA-14996 or is there more work to do?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13742:
URL: https://github.com/apache/kafka/pull/13742#discussion_r1206985698


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -457,9 +466,13 @@ private Throwable handleEventException(String name,
         long endProcessingTime = time.nanoseconds();
         long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
         long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
-        if (exception instanceof ApiException) {
+        if ((exception instanceof ApiException) ||
+                (exception instanceof BoundedListTooLongException)) {
             log.info("{}: failed with {} in {} us. Reason: {}", name,
                 exception.getClass().getSimpleName(), deltaUs, exception.getMessage());
+            if (exception instanceof BoundedListTooLongException) {
+                exception = new UnknownServerException(exception.getMessage());

Review Comment:
   This is pretty uninformative. Can we use `PolicyViolationException` with a string indicating the specifics? Not perfect, but the ability to include a custom string is super helpful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13742:
URL: https://github.com/apache/kafka/pull/13742#discussion_r1207117766


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -160,6 +161,14 @@
  * the controller can fully initialize.
  */
 public final class QuorumController implements Controller {
+    /**
+     * The maximum records any user-initiated operation is allowed to generate.
+     */
+    final static int MAX_RECORDS_PER_USER_OP = 10000;

Review Comment:
   It was chosen to be the same as MAX_RECORDS_PER_BATCH which is also 10000. I will make this clearer by initializing them to the same values.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -160,6 +161,14 @@
  * the controller can fully initialize.
  */
 public final class QuorumController implements Controller {
+    /**
+     * The maximum records any user-initiated operation is allowed to generate.
+     */
+    final static int MAX_RECORDS_PER_USER_OP = 10000;

Review Comment:
   It was chosen to be the same as MAX_RECORDS_PER_BATCH which is also 10000. I will make this clearer by initializing one to the other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13742:
URL: https://github.com/apache/kafka/pull/13742#issuecomment-1564712814

   Hi all,
   
   Thanks for the reviews and comments.
   
   > @divijvaidya : But I was wondering if an additional guard could be to have a default TopicCreationPolicy with a MaxValue of X number of topics per requests (X is probably <= 10K). On every topic creation request, we will apply the policy and ensure that the request gets rejected upfront before entering the system. We could extend this pattern for other use cases where we would like to restrict range of certain parameters in the requests. What do you think?
   
   I like this idea, but it would require a KIP to implement.
   
   > @divijvaidya : I am concerned about the user facing aspect of this change. If I am a user and get this exception, what am I expected to do to resolve it? The documentation does not call out any limitation on max topics that I can create in one API call. How do I know what the limit is? The alternative approach I proposed above (topic policy limitation) solves this. We can document policies with the constraints and follow a similar pattern for other out of bound configuration/request params.
   
   I think the thing to keep in mind is that this PR doesn't make any request fail that wouldn't have already failed.
   
   > @mumrah : Colin can correct me if I'm mistaken, but I believe this patch is mainly about closing an existing edge case until we implement KIP-868 (metadata transactions). Once we have transactions in the controller, we can allow arbitrarily large batches of records.
   
   Yes, that's correct.
   
   > @mumrah : I also wondered if we could solve this in ControllerResult rather than in each control manager separately.
   
   I think the issue is that people can request truly gigantic, garbage-collector killing lists of records to be constructed. You want to cut them off before they build the giant list, not after.
   
   > @mumrah : Will we remove this logic once transactions are implemented?
   
   I think we'll need some kind of limit even with metadata transactions in place. But it will be a limit not set by the implementation, but by our policy.
   
   > @edoardocomar : I had just done an alternative smaller PR for the same issue
   
   Thank you, Edoardo, I guess we were thinking along the same lines. One thing to keep in mind is that the problem is bigger than just CreateTopics. Any operation can be too big and cause implementation problems.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13742: KAFKA-14996: Handle overly large user operations on the kcontroller

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13742:
URL: https://github.com/apache/kafka/pull/13742#discussion_r1207123242


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -457,9 +466,13 @@ private Throwable handleEventException(String name,
         long endProcessingTime = time.nanoseconds();
         long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
         long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
-        if (exception instanceof ApiException) {
+        if ((exception instanceof ApiException) ||
+                (exception instanceof BoundedListTooLongException)) {
             log.info("{}: failed with {} in {} us. Reason: {}", name,
                 exception.getClass().getSimpleName(), deltaUs, exception.getMessage());
+            if (exception instanceof BoundedListTooLongException) {
+                exception = new UnknownServerException(exception.getMessage());

Review Comment:
   yeah that's fair.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org