You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/06 21:14:55 UTC

[GitHub] [kafka] cmccabe opened a new pull request, #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

cmccabe opened a new pull request, #12595:
URL: https://github.com/apache/kafka/pull/12595

   Originally, the QuorumController did not try to limit the number of records in a batch that it sent to the Raft layer.  This caused two problems. Firstly, we were not correctly handling the exception that was thrown by the Raft layer when a batch of records was too large to apply atomically. This happened because the Raft layer threw an exception which was a subclass of ApiException. Secondly, by letting the Raft layer split non-atomic batches, we were not able to create snapshots at each of the splits. This led to O(N) behavior during controller failovers.
   
   This PR fixes both of these issues by limiting the number of records in a batch. Atomic batches that are too large will fail with a RuntimeException which will cause the active controller to become inactive and revert to the last committed state. Non-atomic batches will be split into multiple batches with a fixed number of records in each.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r965357907


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in a batch.
+     * @param appender              The callback to invoke for each batch. The arguments are last
+     *                              write offset, record list, and the return result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it is not too large.
+                // In general, we create atomic batches when it is important to commit "all, or
+                // nothing". They are limited in size and must only be used when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically commit " +
+                            records.size() + " records, but maxRecordsPerBatch is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many batches as needed.
+                // The appender callback will create an in-memory snapshot for each batch,
+                // since we might need to revert to any of them. We will only return the final
+                // offset of the last batch, however.
+                int i = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int j = i + maxRecordsPerBatch;
+                    if (j > records.size()) {
+                        long offset = appender.apply(records.subList(i, records.size()));

Review Comment:
   I am under the impression that `sublist` is linear for `LinkedList`. Not sure if you want to enforce that this function should only be used for `ArrayList`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

Posted by GitBox <gi...@apache.org>.
cmccabe commented on PR #12595:
URL: https://github.com/apache/kafka/pull/12595#issuecomment-1239838222

   test failures don't seem related and don't reproduce locally (and also seem to be about the ZK variants of both tests, oddly)
   ```
   kafka.test.ClusterTestExtensionsTest.testClusterTemplate
   TransactionsExpirationTest.testTransactionAfterProducerIdExpires
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe merged pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

Posted by GitBox <gi...@apache.org>.
cmccabe merged PR #12595:
URL: https://github.com/apache/kafka/pull/12595


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r965436525


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1148,19 +1225,12 @@ private void renounce() {
             controllerMetrics.setActive(false);
             purgatory.failAll(newNotControllerException());
 
-            if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
-                snapshotRegistry.revertToSnapshot(lastCommittedOffset);
-                authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
-            } else {
-                log.info("Unable to find last committed offset {} in snapshot registry; resetting " +
-                         "to empty state.", lastCommittedOffset);
-                resetToEmptyState();
-                authorizer.ifPresent(a -> a.loadSnapshot(Collections.emptyMap()));
-                needToCompleteAuthorizerLoad = authorizer.isPresent();
-                raftClient.unregister(metaLogListener);
-                metaLogListener = new QuorumMetaLogListener();
-                raftClient.register(metaLogListener);
+            if (!snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
+                throw new RuntimeException("Unable to find last committed offset " +
+                        lastCommittedEpoch + " in snapshot registry.");

Review Comment:
   I see. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r965237423


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in a batch.
+     * @param appender              The callback to invoke for each batch. The arguments are last
+     *                              write offset, record list, and the return result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it is not too large.
+                // In general, we create atomic batches when it is important to commit "all, or
+                // nothing". They are limited in size and must only be used when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically commit " +
+                            records.size() + " records, but maxRecordsPerBatch is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many batches as needed.
+                // The appender callback will create an in-memory snapshot for each batch,
+                // since we might need to revert to any of them. We will only return the final
+                // offset of the last batch, however.
+                int i = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int j = i + maxRecordsPerBatch;

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r966431199


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in a batch.
+     * @param appender              The callback to invoke for each batch. The arguments are last
+     *                              write offset, record list, and the return result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it is not too large.
+                // In general, we create atomic batches when it is important to commit "all, or
+                // nothing". They are limited in size and must only be used when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically commit " +
+                            records.size() + " records, but maxRecordsPerBatch is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many batches as needed.
+                // The appender callback will create an in-memory snapshot for each batch,
+                // since we might need to revert to any of them. We will only return the final
+                // offset of the last batch, however.
+                int i = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int j = i + maxRecordsPerBatch;
+                    if (j > records.size()) {
+                        long offset = appender.apply(records.subList(i, records.size()));

Review Comment:
   Yeah... In general LinkedList turns a lot of stuff into O(N) and that's why we mostly don't use it. It's only really useful if you want to delete things from the middle of a list in O(1), but you also don't need fast access to the middle of the list, which is a pretty rare situation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r965239405


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in a batch.
+     * @param appender              The callback to invoke for each batch. The arguments are last
+     *                              write offset, record list, and the return result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it is not too large.
+                // In general, we create atomic batches when it is important to commit "all, or
+                // nothing". They are limited in size and must only be used when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically commit " +
+                            records.size() + " records, but maxRecordsPerBatch is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many batches as needed.
+                // The appender callback will create an in-memory snapshot for each batch,
+                // since we might need to revert to any of them. We will only return the final
+                // offset of the last batch, however.
+                int i = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int j = i + maxRecordsPerBatch;
+                    if (j > records.size()) {
+                        long offset = appender.apply(records.subList(i, records.size()));

Review Comment:
   `sublist` does not copy anything, if that is what you are asking. it just creates a view over the existing list. which is an ArrayList in this case, I think (we almost never use LinkedList in Kafka because of its poor memory location)



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in a batch.
+     * @param appender              The callback to invoke for each batch. The arguments are last
+     *                              write offset, record list, and the return result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it is not too large.
+                // In general, we create atomic batches when it is important to commit "all, or
+                // nothing". They are limited in size and must only be used when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically commit " +
+                            records.size() + " records, but maxRecordsPerBatch is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many batches as needed.
+                // The appender callback will create an in-memory snapshot for each batch,
+                // since we might need to revert to any of them. We will only return the final
+                // offset of the last batch, however.
+                int i = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int j = i + maxRecordsPerBatch;
+                    if (j > records.size()) {
+                        long offset = appender.apply(records.subList(i, records.size()));

Review Comment:
   `sublist` does not copy anything, if that is what you are asking. it just creates a view over the existing list. which is an ArrayList in this case, I think (we almost never use LinkedList in Kafka because of its poor memory locality)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

Posted by GitBox <gi...@apache.org>.
showuon commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r964335502


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1148,19 +1225,12 @@ private void renounce() {
             controllerMetrics.setActive(false);
             purgatory.failAll(newNotControllerException());
 
-            if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
-                snapshotRegistry.revertToSnapshot(lastCommittedOffset);
-                authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
-            } else {
-                log.info("Unable to find last committed offset {} in snapshot registry; resetting " +
-                         "to empty state.", lastCommittedOffset);
-                resetToEmptyState();
-                authorizer.ifPresent(a -> a.loadSnapshot(Collections.emptyMap()));
-                needToCompleteAuthorizerLoad = authorizer.isPresent();
-                raftClient.unregister(metaLogListener);
-                metaLogListener = new QuorumMetaLogListener();
-                raftClient.register(metaLogListener);
+            if (!snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
+                throw new RuntimeException("Unable to find last committed offset " +
+                        lastCommittedEpoch + " in snapshot registry.");

Review Comment:
   We throw exception directly if we can't find snapshot now, and that will jump to faultHandler, without the following procedure, like `clusterControl.deactivate();`, is it expected? 



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in a batch.
+     * @param appender              The callback to invoke for each batch. The arguments are last
+     *                              write offset, record list, and the return result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it is not too large.
+                // In general, we create atomic batches when it is important to commit "all, or
+                // nothing". They are limited in size and must only be used when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically commit " +
+                            records.size() + " records, but maxRecordsPerBatch is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many batches as needed.
+                // The appender callback will create an in-memory snapshot for each batch,
+                // since we might need to revert to any of them. We will only return the final
+                // offset of the last batch, however.
+                int i = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int j = i + maxRecordsPerBatch;
+                    if (j > records.size()) {
+                        long offset = appender.apply(records.subList(i, records.size()));
+                        if (log.isTraceEnabled()) {
+                            log.trace("Appended {} record(s) in {} batch(es), ending with offset {}.",
+                                    records.size(), numBatches, offset);
+                        }
+                        return offset;
+                    } else {
+                        appender.apply(records.subList(i, j));
+                    }
+                    i += maxRecordsPerBatch;
+                }
+            }
+        } catch (ApiException e) {
+            // If the Raft client throws a subclass of ApiException, we need to convert it into a
+            // RuntimeException so that it will be handled as the unexpected exception that it is.
+            // ApiExceptions are reserved for expected errors such as incorrect uses of controller
+            // APIs, permission errors, NotControllerException, etc. etc.
+            throw new RuntimeException(e);

Review Comment:
   Thanks for adding the comments to make it clear!



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in a batch.
+     * @param appender              The callback to invoke for each batch. The arguments are last
+     *                              write offset, record list, and the return result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it is not too large.
+                // In general, we create atomic batches when it is important to commit "all, or
+                // nothing". They are limited in size and must only be used when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically commit " +
+                            records.size() + " records, but maxRecordsPerBatch is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many batches as needed.
+                // The appender callback will create an in-memory snapshot for each batch,
+                // since we might need to revert to any of them. We will only return the final
+                // offset of the last batch, however.
+                int i = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int j = i + maxRecordsPerBatch;

Review Comment:
   nit: rename variable for readability:
   i => startIndex
   j => endIndex



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r965238094


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1148,19 +1225,12 @@ private void renounce() {
             controllerMetrics.setActive(false);
             purgatory.failAll(newNotControllerException());
 
-            if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
-                snapshotRegistry.revertToSnapshot(lastCommittedOffset);
-                authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
-            } else {
-                log.info("Unable to find last committed offset {} in snapshot registry; resetting " +
-                         "to empty state.", lastCommittedOffset);
-                resetToEmptyState();
-                authorizer.ifPresent(a -> a.loadSnapshot(Collections.emptyMap()));
-                needToCompleteAuthorizerLoad = authorizer.isPresent();
-                raftClient.unregister(metaLogListener);
-                metaLogListener = new QuorumMetaLogListener();
-                raftClient.register(metaLogListener);
+            if (!snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
+                throw new RuntimeException("Unable to find last committed offset " +
+                        lastCommittedEpoch + " in snapshot registry.");

Review Comment:
   the fault handler for the active controller will exit the process, so it is not necessary to call `clusterControl.deactivate` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r965048542


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in a batch.
+     * @param appender              The callback to invoke for each batch. The arguments are last
+     *                              write offset, record list, and the return result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it is not too large.
+                // In general, we create atomic batches when it is important to commit "all, or
+                // nothing". They are limited in size and must only be used when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically commit " +
+                            records.size() + " records, but maxRecordsPerBatch is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many batches as needed.
+                // The appender callback will create an in-memory snapshot for each batch,
+                // since we might need to revert to any of them. We will only return the final
+                // offset of the last batch, however.
+                int i = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int j = i + maxRecordsPerBatch;
+                    if (j > records.size()) {
+                        long offset = appender.apply(records.subList(i, records.size()));

Review Comment:
   Hmm. What is the time complexity for `sublist`? This code assumes that we have a `List`. I think `sublist` can have very different time complexity based on the implementation of `List`. What do you think?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in a batch.
+     * @param appender              The callback to invoke for each batch. The arguments are last
+     *                              write offset, record list, and the return result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it is not too large.
+                // In general, we create atomic batches when it is important to commit "all, or
+                // nothing". They are limited in size and must only be used when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically commit " +
+                            records.size() + " records, but maxRecordsPerBatch is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many batches as needed.
+                // The appender callback will create an in-memory snapshot for each batch,
+                // since we might need to revert to any of them. We will only return the final
+                // offset of the last batch, however.
+                int i = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int j = i + maxRecordsPerBatch;

Review Comment:
   I agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r966431807


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -844,6 +855,72 @@ public String toString() {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in a batch.
+     * @param appender              The callback to invoke for each batch. The arguments are last
+     *                              write offset, record list, and the return result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it is not too large.
+                // In general, we create atomic batches when it is important to commit "all, or
+                // nothing". They are limited in size and must only be used when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically commit " +
+                            records.size() + " records, but maxRecordsPerBatch is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many batches as needed.
+                // The appender callback will create an in-memory snapshot for each batch,
+                // since we might need to revert to any of them. We will only return the final
+                // offset of the last batch, however.
+                int i = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int j = i + maxRecordsPerBatch;
+                    if (j > records.size()) {
+                        long offset = appender.apply(records.subList(i, records.size()));

Review Comment:
   btw thanks for thinking about the big-O here, even if it didn't end up being an issue in this particular case. We should definitely think about big-O



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org