You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/09/08 21:21:55 UTC

[kafka] branch trunk updated: KAFKA-14204: QuorumController must correctly handle overly large batches (#12595)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 81b71c06f3 KAFKA-14204: QuorumController must correctly handle overly large batches (#12595)
81b71c06f3 is described below

commit 81b71c06f300daf353cef04653d59385db561b38
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Thu Sep 8 14:21:29 2022 -0700

    KAFKA-14204: QuorumController must correctly handle overly large batches (#12595)
    
    Originally, the QuorumController did not try to limit the number of records in a batch that it sent
    to the Raft layer.  This caused two problems. Firstly, we were not correctly handling the exception
    that was thrown by the Raft layer when a batch of records was too large to apply atomically. This
    happened because the Raft layer threw an exception which was a subclass of ApiException. Secondly,
    by letting the Raft layer split non-atomic batches, we were not able to create snapshots at each of
    the splits. This led to O(N) behavior during controller failovers.
    
    This PR fixes both of these issues by limiting the number of records in a batch. Atomic batches
    that are too large will fail with a RuntimeException which will cause the active controller to
    become inactive and revert to the last committed state. Non-atomic batches will be split into
    multiple batches with a fixed number of records in each.
    
    Reviewers: Luke Chen <sh...@gmail.com>, José Armando García Sancio <js...@gmail.com>
---
 .../apache/kafka/controller/QuorumController.java  | 161 +++++++++++++++------
 .../kafka/controller/QuorumControllerTest.java     |  46 ++++++
 2 files changed, 165 insertions(+), 42 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index e669817dd2..7ad601b007 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -119,6 +119,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -147,6 +148,8 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
  * the controller can fully initialize.
  */
 public final class QuorumController implements Controller {
+    private final static int MAX_RECORDS_PER_BATCH = 10000;
+
     /**
      * A builder class which creates the QuorumController.
      */
@@ -175,6 +178,7 @@ public final class QuorumController implements Controller {
         private Optional<ClusterMetadataAuthorizer> authorizer = Optional.empty();
         private Map<String, Object> staticConfig = Collections.emptyMap();
         private BootstrapMetadata bootstrapMetadata = null;
+        private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
 
         public Builder(int nodeId, String clusterId) {
             this.nodeId = nodeId;
@@ -270,6 +274,11 @@ public final class QuorumController implements Controller {
             return this;
         }
 
+        public Builder setMaxRecordsPerBatch(int maxRecordsPerBatch) {
+            this.maxRecordsPerBatch = maxRecordsPerBatch;
+            return this;
+        }
+
         public Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> createTopicPolicy) {
             this.createTopicPolicy = createTopicPolicy;
             return this;
@@ -347,7 +356,8 @@ public final class QuorumController implements Controller {
                     configurationValidator,
                     authorizer,
                     staticConfig,
-                    bootstrapMetadata
+                    bootstrapMetadata,
+                    maxRecordsPerBatch
                 );
             } catch (Exception e) {
                 Utils.closeQuietly(queue, "event queue");
@@ -776,37 +786,38 @@ public final class QuorumController implements Controller {
                         "reaches offset {}", this, resultAndOffset.offset());
                 }
             } else {
-                // Start by trying to apply the record to our in-memory state. This should always
-                // succeed; if it does not, that's a fatal error. It is important to do this before
-                // scheduling the record for Raft replication.
-                int i = 1;
-                for (ApiMessageAndVersion message : result.records()) {
-                    try {
-                        replay(message.message(), Optional.empty(), writeOffset + result.records().size());
-                    } catch (Throwable e) {
-                        String failureMessage = String.format("Unable to apply %s record, which was " +
-                            "%d of %d record(s) in the batch following last writeOffset %d.",
-                            message.message().getClass().getSimpleName(), i, result.records().size(),
-                            writeOffset);
-                        throw fatalFaultHandler.handleFault(failureMessage, e);
-                    }
-                    i++;
-                }
-
-                // If the operation returned a batch of records, and those records could be applied,
-                // they need to be written before we can return our result to the user.  Here, we
-                // hand off the batch of records to the raft client.  They will be written out
-                // asynchronously.
-                final long offset;
-                if (result.isAtomic()) {
-                    offset = raftClient.scheduleAtomicAppend(controllerEpoch, result.records());
-                } else {
-                    offset = raftClient.scheduleAppend(controllerEpoch, result.records());
-                }
+                // Pass the records to the Raft layer. This will start the process of committing
+                // them to the log.
+                long offset = appendRecords(log, result, maxRecordsPerBatch,
+                    new Function<List<ApiMessageAndVersion>, Long>() {
+                        private long prevEndOffset = writeOffset;
+
+                        @Override
+                        public Long apply(List<ApiMessageAndVersion> records) {
+                            // Start by trying to apply the record to our in-memory state. This should always
+                            // succeed; if it does not, that's a fatal error. It is important to do this before
+                            // scheduling the record for Raft replication.
+                            int i = 1;
+                            for (ApiMessageAndVersion message : records) {
+                                try {
+                                    replay(message.message(), Optional.empty(), prevEndOffset + records.size());
+                                } catch (Throwable e) {
+                                    String failureMessage = String.format("Unable to apply %s record, which was " +
+                                            "%d of %d record(s) in the batch following last write offset %d.",
+                                            message.message().getClass().getSimpleName(), i, records.size(),
+                                            prevEndOffset);
+                                    throw fatalFaultHandler.handleFault(failureMessage, e);
+                                }
+                                i++;
+                            }
+                            prevEndOffset = raftClient.scheduleAtomicAppend(controllerEpoch, records);
+                            snapshotRegistry.getOrCreateSnapshot(prevEndOffset);
+                            return prevEndOffset;
+                        }
+                    });
                 op.processBatchEndOffset(offset);
                 updateWriteOffset(offset);
                 resultAndOffset = ControllerResultAndOffset.of(offset, result);
-                snapshotRegistry.getOrCreateSnapshot(offset);
 
                 log.debug("Read-write operation {} will be completed when the log " +
                     "reaches offset {}.", this, resultAndOffset.offset());
@@ -844,6 +855,72 @@ public final class QuorumController implements Controller {
         }
     }
 
+    /**
+     * Append records to the Raft log. They will be written out asynchronously.
+     *
+     * @param log                   The log4j logger.
+     * @param result                The controller result we are writing out.
+     * @param maxRecordsPerBatch    The maximum number of records to allow in a batch.
+     * @param appender              The callback to invoke for each batch. The arguments are last
+     *                              write offset, record list, and the return result is the new
+     *                              last write offset.
+     * @return                      The final offset that was returned from the Raft layer.
+     */
+    static long appendRecords(
+        Logger log,
+        ControllerResult<?> result,
+        int maxRecordsPerBatch,
+        Function<List<ApiMessageAndVersion>, Long> appender
+    ) {
+        try {
+            List<ApiMessageAndVersion> records = result.records();
+            if (result.isAtomic()) {
+                // If the result must be written out atomically, check that it is not too large.
+                // In general, we create atomic batches when it is important to commit "all, or
+                // nothing". They are limited in size and must only be used when the batch size
+                // is bounded.
+                if (records.size() > maxRecordsPerBatch) {
+                    throw new IllegalStateException("Attempted to atomically commit " +
+                            records.size() + " records, but maxRecordsPerBatch is " +
+                            maxRecordsPerBatch);
+                }
+                long offset = appender.apply(records);
+                if (log.isTraceEnabled()) {
+                    log.trace("Atomically appended {} record(s) ending with offset {}.",
+                            records.size(), offset);
+                }
+                return offset;
+            } else {
+                // If the result is non-atomic, then split it into as many batches as needed.
+                // The appender callback will create an in-memory snapshot for each batch,
+                // since we might need to revert to any of them. We will only return the final
+                // offset of the last batch, however.
+                int startIndex = 0, numBatches = 0;
+                while (true) {
+                    numBatches++;
+                    int endIndex = startIndex + maxRecordsPerBatch;
+                    if (endIndex > records.size()) {
+                        long offset = appender.apply(records.subList(startIndex, records.size()));
+                        if (log.isTraceEnabled()) {
+                            log.trace("Appended {} record(s) in {} batch(es), ending with offset {}.",
+                                    records.size(), numBatches, offset);
+                        }
+                        return offset;
+                    } else {
+                        appender.apply(records.subList(startIndex, endIndex));
+                    }
+                    startIndex += maxRecordsPerBatch;
+                }
+            }
+        } catch (ApiException e) {
+            // If the Raft client throws a subclass of ApiException, we need to convert it into a
+            // RuntimeException so that it will be handled as the unexpected exception that it is.
+            // ApiExceptions are reserved for expected errors such as incorrect uses of controller
+            // APIs, permission errors, NotControllerException, etc. etc.
+            throw new RuntimeException(e);
+        }
+    }
+
     <T> CompletableFuture<T> appendWriteEvent(String name,
                                               OptionalLong deadlineNs,
                                               ControllerWriteOperation<T> op) {
@@ -1148,19 +1225,12 @@ public final class QuorumController implements Controller {
             controllerMetrics.setActive(false);
             purgatory.failAll(newNotControllerException());
 
-            if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
-                snapshotRegistry.revertToSnapshot(lastCommittedOffset);
-                authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
-            } else {
-                log.info("Unable to find last committed offset {} in snapshot registry; resetting " +
-                         "to empty state.", lastCommittedOffset);
-                resetToEmptyState();
-                authorizer.ifPresent(a -> a.loadSnapshot(Collections.emptyMap()));
-                needToCompleteAuthorizerLoad = authorizer.isPresent();
-                raftClient.unregister(metaLogListener);
-                metaLogListener = new QuorumMetaLogListener();
-                raftClient.register(metaLogListener);
+            if (!snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
+                throw new RuntimeException("Unable to find last committed offset " +
+                        lastCommittedEpoch + " in snapshot registry.");
             }
+            snapshotRegistry.revertToSnapshot(lastCommittedOffset);
+            authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
             newBytesSinceLastSnapshot = 0L;
             updateWriteOffset(-1);
             clusterControl.deactivate();
@@ -1625,6 +1695,11 @@ public final class QuorumController implements Controller {
      */
     private final BootstrapMetadata bootstrapMetadata;
 
+    /**
+     * The maximum number of records per batch to allow.
+     */
+    private final int maxRecordsPerBatch;
+
     private QuorumController(
         FaultHandler fatalFaultHandler,
         FaultHandler metadataFaultHandler,
@@ -1649,7 +1724,8 @@ public final class QuorumController implements Controller {
         ConfigurationValidator configurationValidator,
         Optional<ClusterMetadataAuthorizer> authorizer,
         Map<String, Object> staticConfig,
-        BootstrapMetadata bootstrapMetadata
+        BootstrapMetadata bootstrapMetadata,
+        int maxRecordsPerBatch
     ) {
         this.fatalFaultHandler = fatalFaultHandler;
         this.metadataFaultHandler = metadataFaultHandler;
@@ -1719,6 +1795,7 @@ public final class QuorumController implements Controller {
                 build();
         this.raftClient = raftClient;
         this.bootstrapMetadata = bootstrapMetadata;
+        this.maxRecordsPerBatch = maxRecordsPerBatch;
         this.metaLogListener = new QuorumMetaLogListener();
         this.curClaimEpoch = -1;
         this.needToCompleteAuthorizerLoad = authorizer.isPresent();
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 89ef0b083d..a1a3741b6a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.StreamSupport;
@@ -47,6 +48,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -109,6 +111,8 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.function.Function.identity;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
@@ -129,6 +133,8 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @Timeout(value = 40)
 public class QuorumControllerTest {
+    private final static Logger log = LoggerFactory.getLogger(QuorumControllerTest.class);
+
     static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.
             fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided bootstrap");
 
@@ -1373,4 +1379,44 @@ public class QuorumControllerTest {
             }
         }
     }
+
+    static class TestAppender implements Function<List<ApiMessageAndVersion>, Long>  {
+        private long offset = 0;
+
+        @Override
+        public Long apply(List<ApiMessageAndVersion> apiMessageAndVersions) {
+            for (ApiMessageAndVersion apiMessageAndVersion : apiMessageAndVersions) {
+                BrokerRegistrationChangeRecord record =
+                        (BrokerRegistrationChangeRecord) apiMessageAndVersion.message();
+                assertEquals((int) offset, record.brokerId());
+                offset++;
+            }
+            return offset;
+        }
+    }
+
+    private static ApiMessageAndVersion rec(int i) {
+        return new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().setBrokerId(i),
+                (short) 0);
+    }
+
+    @Test
+    public void testAppendRecords() {
+        TestAppender appender = new TestAppender();
+        assertEquals(5, QuorumController.appendRecords(log,
+            ControllerResult.of(Arrays.asList(rec(0), rec(1), rec(2), rec(3), rec(4)), null),
+            2,
+            appender));
+    }
+
+    @Test
+    public void testAppendRecordsAtomically() {
+        TestAppender appender = new TestAppender();
+        assertEquals("Attempted to atomically commit 5 records, but maxRecordsPerBatch is 2",
+            assertThrows(IllegalStateException.class, () ->
+                QuorumController.appendRecords(log,
+                        ControllerResult.atomicOf(Arrays.asList(rec(0), rec(1), rec(2), rec(3), rec(4)), null),
+                        2,
+                        appender)).getMessage());
+    }
 }