You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/03/04 21:04:30 UTC

[kafka] 01/02: KAFKA-12376: Apply atomic append to the log (#10253)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d69212a42246eeaa123e0a574d08ba44e37e4a1d
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Thu Mar 4 10:55:43 2021 -0800

    KAFKA-12376: Apply atomic append to the log (#10253)
---
 core/src/main/scala/kafka/raft/RaftManager.scala   | 29 ++++++--
 .../controller/ClientQuotaControlManager.java      |  2 +-
 .../kafka/controller/ClusterControlManager.java    |  2 +-
 .../controller/ConfigurationControlManager.java    |  4 +-
 .../apache/kafka/controller/ControllerResult.java  | 38 ++++++----
 .../controller/ControllerResultAndOffset.java      | 32 ++++-----
 .../kafka/controller/FeatureControlManager.java    |  3 +-
 .../apache/kafka/controller/QuorumController.java  | 23 +++---
 .../controller/ReplicationControlManager.java      | 12 ++--
 .../org/apache/kafka/metalog/LocalLogManager.java  | 17 ++++-
 .../org/apache/kafka/metalog/MetaLogManager.java   | 23 +++++-
 .../ConfigurationControlManagerTest.java           | 83 ++++++++++++++++------
 .../controller/FeatureControlManagerTest.java      | 58 ++++++++++-----
 .../org/apache/kafka/metalog/LocalLogManager.java  | 17 ++++-
 .../kafka/raft/metadata/MetaLogRaftShim.java       | 28 +++++++-
 15 files changed, 268 insertions(+), 103 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 1881a1d..3b714f3 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -92,6 +92,11 @@ trait RaftManager[T] {
     listener: RaftClient.Listener[T]
   ): Unit
 
+  def scheduleAtomicAppend(
+    epoch: Int,
+    records: Seq[T]
+  ): Option[Long]
+
   def scheduleAppend(
     epoch: Int,
     records: Seq[T]
@@ -157,16 +162,32 @@ class KafkaRaftManager[T](
     raftClient.register(listener)
   }
 
+  override def scheduleAtomicAppend(
+    epoch: Int,
+    records: Seq[T]
+  ): Option[Long] = {
+    append(epoch, records, true)
+  }
+
   override def scheduleAppend(
     epoch: Int,
     records: Seq[T]
   ): Option[Long] = {
-    val offset: java.lang.Long = raftClient.scheduleAppend(epoch, records.asJava)
-    if (offset == null) {
-      None
+    append(epoch, records, false)
+  }
+
+  private def append(
+    epoch: Int,
+    records: Seq[T],
+    isAtomic: Boolean
+  ): Option[Long] = {
+    val offset = if (isAtomic) {
+      raftClient.scheduleAtomicAppend(epoch, records.asJava)
     } else {
-      Some(Long.unbox(offset))
+      raftClient.scheduleAppend(epoch, records.asJava)
     }
+
+    Option(offset).map(Long.unbox)
   }
 
   override def handleRequest(
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
index 4aac9e4..9b8e2d6 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
@@ -86,7 +86,7 @@ public class ClientQuotaControlManager {
             }
         });
 
-        return new ControllerResult<>(outputRecords, outputResults);
+        return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
     /**
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 6e329c7..4748d19 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -213,7 +213,7 @@ public class ClusterControlManager {
 
         List<ApiMessageAndVersion> records = new ArrayList<>();
         records.add(new ApiMessageAndVersion(record, (short) 0));
-        return new ControllerResult<>(records, new BrokerRegistrationReply(brokerEpoch));
+        return ControllerResult.of(records, new BrokerRegistrationReply(brokerEpoch));
     }
 
     public void replay(RegisterBrokerRecord record) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 16e58fa..dcfe92d46 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -83,7 +83,7 @@ public class ConfigurationControlManager {
                 outputRecords,
                 outputResults);
         }
-        return new ControllerResult<>(outputRecords, outputResults);
+        return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
     private void incrementalAlterConfigResource(ConfigResource configResource,
@@ -171,7 +171,7 @@ public class ConfigurationControlManager {
                 outputRecords,
                 outputResults);
         }
-        return new ControllerResult<>(outputRecords, outputResults);
+        return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
     private void legacyAlterConfigResource(ConfigResource configResource,
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
index 4906c8b..e6ae031 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
@@ -19,7 +19,7 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.metadata.ApiMessageAndVersion;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
@@ -28,15 +28,13 @@ import java.util.stream.Collectors;
 class ControllerResult<T> {
     private final List<ApiMessageAndVersion> records;
     private final T response;
+    private final boolean isAtomic;
 
-    public ControllerResult(T response) {
-        this(new ArrayList<>(), response);
-    }
-
-    public ControllerResult(List<ApiMessageAndVersion> records, T response) {
+    protected ControllerResult(List<ApiMessageAndVersion> records, T response, boolean isAtomic) {
         Objects.requireNonNull(records);
         this.records = records;
         this.response = response;
+        this.isAtomic = isAtomic;
     }
 
     public List<ApiMessageAndVersion> records() {
@@ -47,6 +45,10 @@ class ControllerResult<T> {
         return response;
     }
 
+    public boolean isAtomic() {
+        return isAtomic;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (o == null || (!o.getClass().equals(getClass()))) {
@@ -54,22 +56,34 @@ class ControllerResult<T> {
         }
         ControllerResult other = (ControllerResult) o;
         return records.equals(other.records) &&
-            Objects.equals(response, other.response);
+            Objects.equals(response, other.response) &&
+            Objects.equals(isAtomic, other.isAtomic);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(records, response);
+        return Objects.hash(records, response, isAtomic);
     }
 
     @Override
     public String toString() {
-        return "ControllerResult(records=" + String.join(",",
-            records.stream().map(r -> r.toString()).collect(Collectors.toList())) +
-            ", response=" + response + ")";
+        return String.format(
+            "ControllerResult(records=%s, response=%s, isAtomic=%s)",
+            String.join(",", records.stream().map(ApiMessageAndVersion::toString).collect(Collectors.toList())),
+            response,
+            isAtomic
+        );
     }
 
     public ControllerResult<T> withoutRecords() {
-        return new ControllerResult<>(new ArrayList<>(), response);
+        return new ControllerResult<>(Collections.emptyList(), response, false);
+    }
+
+    public static <T> ControllerResult<T> atomicOf(List<ApiMessageAndVersion> records, T response) {
+        return new ControllerResult<>(records, response, true);
+    }
+
+    public static <T> ControllerResult<T> of(List<ApiMessageAndVersion> records, T response) {
+        return new ControllerResult<>(records, response, false);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
index 5e483f7..8b8ca8d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
@@ -19,24 +19,15 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.metadata.ApiMessageAndVersion;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
 
-class ControllerResultAndOffset<T> extends ControllerResult<T> {
+final class ControllerResultAndOffset<T> extends ControllerResult<T> {
     private final long offset;
 
-    public ControllerResultAndOffset(T response) {
-        super(new ArrayList<>(), response);
-        this.offset = -1;
-    }
-
-    public ControllerResultAndOffset(long offset,
-                              List<ApiMessageAndVersion> records,
-                              T response) {
-        super(records, response);
+    private ControllerResultAndOffset(long offset, ControllerResult<T> result) {
+        super(result.records(), result.response(), result.isAtomic());
         this.offset = offset;
     }
 
@@ -52,18 +43,27 @@ class ControllerResultAndOffset<T> extends ControllerResult<T> {
         ControllerResultAndOffset other = (ControllerResultAndOffset) o;
         return records().equals(other.records()) &&
             response().equals(other.response()) &&
+            isAtomic() == other.isAtomic() &&
             offset == other.offset;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(records(), response(), offset);
+        return Objects.hash(records(), response(), isAtomic(), offset);
     }
 
     @Override
     public String toString() {
-        return "ControllerResultAndOffset(records=" + String.join(",",
-            records().stream().map(r -> r.toString()).collect(Collectors.toList())) +
-            ", response=" + response() + ", offset=" + offset + ")";
+        return String.format(
+            "ControllerResultAndOffset(records=%s, response=%s, isAtomic=%s, offset=%s)",
+            String.join(",", records().stream().map(ApiMessageAndVersion::toString).collect(Collectors.toList())),
+            response(),
+            isAtomic(),
+            offset
+        );
+    }
+
+    public static <T> ControllerResultAndOffset<T> of(long offset, ControllerResult<T> result) {
+        return new ControllerResultAndOffset<>(offset, result);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 25ff3fd..99874ac 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -69,7 +69,8 @@ public class FeatureControlManager {
             results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
                 downgradeables.contains(entry.getKey()), brokerFeatures, records));
         }
-        return new ControllerResult<>(records, results);
+
+        return ControllerResult.atomicOf(records, results);
     }
 
     private ApiError updateFeature(String featureName,
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 3e5a9a8..6ee1b7e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.controller;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -268,7 +267,7 @@ public final class QuorumController implements Controller {
     class ControlEvent implements EventQueue.Event {
         private final String name;
         private final Runnable handler;
-        private long eventCreatedTimeNs = time.nanoseconds();
+        private final long eventCreatedTimeNs = time.nanoseconds();
         private Optional<Long> startProcessingTimeNs = Optional.empty();
 
         ControlEvent(String name, Runnable handler) {
@@ -310,7 +309,7 @@ public final class QuorumController implements Controller {
         private final String name;
         private final CompletableFuture<T> future;
         private final Supplier<T> handler;
-        private long eventCreatedTimeNs = time.nanoseconds();
+        private final long eventCreatedTimeNs = time.nanoseconds();
         private Optional<Long> startProcessingTimeNs = Optional.empty();
 
         ControllerReadEvent(String name, Supplier<T> handler) {
@@ -392,7 +391,7 @@ public final class QuorumController implements Controller {
         private final String name;
         private final CompletableFuture<T> future;
         private final ControllerWriteOperation<T> op;
-        private long eventCreatedTimeNs = time.nanoseconds();
+        private final long eventCreatedTimeNs = time.nanoseconds();
         private Optional<Long> startProcessingTimeNs = Optional.empty();
         private ControllerResultAndOffset<T> resultAndOffset;
 
@@ -426,8 +425,7 @@ public final class QuorumController implements Controller {
                 if (!maybeOffset.isPresent()) {
                     // If the purgatory is empty, there are no pending operations and no
                     // uncommitted state.  We can return immediately.
-                    resultAndOffset = new ControllerResultAndOffset<>(-1,
-                        new ArrayList<>(), result.response());
+                    resultAndOffset = ControllerResultAndOffset.of(-1, result);
                     log.debug("Completing read-only operation {} immediately because " +
                         "the purgatory is empty.", this);
                     complete(null);
@@ -435,8 +433,7 @@ public final class QuorumController implements Controller {
                 }
                 // If there are operations in the purgatory, we want to wait for the latest
                 // one to complete before returning our result to the user.
-                resultAndOffset = new ControllerResultAndOffset<>(maybeOffset.get(),
-                    result.records(), result.response());
+                resultAndOffset = ControllerResultAndOffset.of(maybeOffset.get(), result);
                 log.debug("Read-only operation {} will be completed when the log " +
                     "reaches offset {}", this, resultAndOffset.offset());
             } else {
@@ -444,11 +441,15 @@ public final class QuorumController implements Controller {
                 // written before we can return our result to the user.  Here, we hand off
                 // the batch of records to the metadata log manager.  They will be written
                 // out asynchronously.
-                long offset = logManager.scheduleWrite(controllerEpoch, result.records());
+                final long offset;
+                if (result.isAtomic()) {
+                    offset = logManager.scheduleAtomicWrite(controllerEpoch, result.records());
+                } else {
+                    offset = logManager.scheduleWrite(controllerEpoch, result.records());
+                }
                 op.processBatchEndOffset(offset);
                 writeOffset = offset;
-                resultAndOffset = new ControllerResultAndOffset<>(offset,
-                    result.records(), result.response());
+                resultAndOffset = ControllerResultAndOffset.of(offset, result);
                 for (ApiMessageAndVersion message : result.records()) {
                     replay(message.message(), offset);
                 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 14ff321..8bc0670 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -440,7 +440,7 @@ public class ReplicationControlManager {
             resultsPrefix = ", ";
         }
         log.info("createTopics result(s): {}", resultsBuilder.toString());
-        return new ControllerResult<>(records, data);
+        return ControllerResult.atomicOf(records, data);
     }
 
     private ApiError createTopic(CreatableTopic topic,
@@ -721,7 +721,7 @@ public class ReplicationControlManager {
                     setIsr(partitionData.newIsr()));
             }
         }
-        return new ControllerResult<>(records, response);
+        return ControllerResult.of(records, response);
     }
 
     /**
@@ -875,7 +875,7 @@ public class ReplicationControlManager {
                     setErrorMessage(error.message()));
             }
         }
-        return new ControllerResult<>(records, response);
+        return ControllerResult.of(records, response);
     }
 
     static boolean electionIsUnclean(byte electionType) {
@@ -970,7 +970,7 @@ public class ReplicationControlManager {
                 states.next().fenced(),
                 states.next().inControlledShutdown(),
                 states.next().shouldShutDown());
-        return new ControllerResult<>(records, reply);
+        return ControllerResult.of(records, reply);
     }
 
     int bestLeader(int[] replicas, int[] isr, boolean unclean) {
@@ -999,7 +999,7 @@ public class ReplicationControlManager {
         }
         List<ApiMessageAndVersion> records = new ArrayList<>();
         handleBrokerUnregistered(brokerId, registration.epoch(), records);
-        return new ControllerResult<>(records, null);
+        return ControllerResult.of(records, null);
     }
 
     ControllerResult<Void> maybeFenceStaleBrokers() {
@@ -1011,6 +1011,6 @@ public class ReplicationControlManager {
             handleBrokerFenced(brokerId, records);
             heartbeatManager.fence(brokerId);
         }
-        return new ControllerResult<>(records, null);
+        return ControllerResult.of(records, null);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
index ef85314..99ae3a7 100644
--- a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -328,8 +328,21 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
 
     @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
-            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+        return scheduleAtomicWrite(epoch, batch);
+    }
+
+    @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return shared.tryAppend(
+            nodeId,
+            leader.epoch(),
+            new LocalRecordBatch(
+                batch
+                    .stream()
+                    .map(ApiMessageAndVersion::message)
+                    .collect(Collectors.toList())
+            )
+        );
     }
 
     @Override
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
index 67a6ca5..9126245 100644
--- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
+++ b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
@@ -50,14 +50,31 @@ public interface MetaLogManager {
      * offset before renouncing its leadership.  The listener should determine this by
      * monitoring the committed offsets.
      *
-     * @param epoch         The controller epoch.
-     * @param batch         The batch of messages to write.
+     * @param epoch         the controller epoch
+     * @param batch         the batch of messages to write
      *
-     * @return              The offset of the message.
+     * @return              the offset of the last message in the batch
+     * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
      */
     long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch);
 
     /**
+     * Schedule a atomic write to the log.
+     *
+     * The write will be scheduled to happen at some time in the future.  All of the messages in batch
+     * will be appended atomically in one batch.  The listener may regard the write as successful
+     * if and only if the MetaLogManager reaches the given offset before renouncing its leadership.
+     * The listener should determine this by monitoring the committed offsets.
+     *
+     * @param epoch         the controller epoch
+     * @param batch         the batch of messages to write
+     *
+     * @return              the offset of the last message in the batch
+     * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
+     */
+    long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch);
+
+    /**
      * Renounce the leadership.
      *
      * @param epoch         The epoch.  If this does not match the current epoch, this
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 49a5533..561a25b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -135,18 +135,42 @@ public class ConfigurationControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ConfigurationControlManager manager =
             new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS);
-        assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(Collections.singletonList(
-            new ApiMessageAndVersion(new ConfigRecord().
-                setResourceType(TOPIC.id()).setResourceName("mytopic").
-                setName("abc").setValue("123"), (short) 0)),
-            toMap(entry(BROKER0, new ApiError(
-                Errors.INVALID_REQUEST, "A DELETE op was given with a non-null value.")),
-                entry(MYTOPIC, ApiError.NONE))),
-            manager.incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
-                entry("foo.bar", entry(DELETE, "abc")),
-                entry("quux", entry(SET, "abc")))),
-            entry(MYTOPIC, toMap(
-                entry("abc", entry(APPEND, "123")))))));
+        assertEquals(
+            ControllerResult.atomicOf(
+                Collections.singletonList(
+                    new ApiMessageAndVersion(
+                        new ConfigRecord()
+                            .setResourceType(TOPIC.id())
+                            .setResourceName("mytopic")
+                            .setName("abc")
+                            .setValue("123"),
+                        (short) 0
+                    )
+                ),
+                toMap(
+                    entry(
+                        BROKER0,
+                        new ApiError(
+                            Errors.INVALID_REQUEST,
+                            "A DELETE op was given with a non-null value."
+                        )
+                    ),
+                    entry(MYTOPIC, ApiError.NONE)
+                )
+            ),
+            manager.incrementalAlterConfigs(
+                toMap(
+                    entry(
+                        BROKER0,
+                        toMap(
+                            entry("foo.bar", entry(DELETE, "abc")),
+                            entry("quux", entry(SET, "abc"))
+                        )
+                    ),
+                    entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123"))))
+                )
+            )
+        );
     }
 
     @Test
@@ -184,20 +208,33 @@ public class ConfigurationControlManagerTest {
             new ApiMessageAndVersion(new ConfigRecord().
                 setResourceType(TOPIC.id()).setResourceName("mytopic").
                 setName("def").setValue("901"), (short) 0));
-        assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(
+        assertEquals(
+            ControllerResult.atomicOf(
                 expectedRecords1,
-                toMap(entry(MYTOPIC, ApiError.NONE))),
-            manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(
-                entry("abc", "456"), entry("def", "901"))))));
+                toMap(entry(MYTOPIC, ApiError.NONE))
+            ),
+            manager.legacyAlterConfigs(
+                toMap(entry(MYTOPIC, toMap(entry("abc", "456"), entry("def", "901"))))
+            )
+        );
         for (ApiMessageAndVersion message : expectedRecords1) {
             manager.replay((ConfigRecord) message.message());
         }
-        assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(Arrays.asList(
-            new ApiMessageAndVersion(new ConfigRecord().
-                setResourceType(TOPIC.id()).setResourceName("mytopic").
-                setName("abc").setValue(null), (short) 0)),
-                toMap(entry(MYTOPIC, ApiError.NONE))),
-            manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(
-                entry("def", "901"))))));
+        assertEquals(
+            ControllerResult.atomicOf(
+                Arrays.asList(
+                    new ApiMessageAndVersion(
+                        new ConfigRecord()
+                            .setResourceType(TOPIC.id())
+                            .setResourceName("mytopic")
+                            .setName("abc")
+                            .setValue(null),
+                        (short) 0
+                    )
+                ),
+                toMap(entry(MYTOPIC, ApiError.NONE))
+            ),
+            manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901")))))
+        );
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 8687cc8..0670984 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -18,10 +18,8 @@
 package org.apache.kafka.controller;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
@@ -61,11 +59,11 @@ public class FeatureControlManagerTest {
             rangeMap("foo", 1, 2), snapshotRegistry);
         assertEquals(new FeatureMapAndEpoch(new FeatureMap(Collections.emptyMap()), -1),
             manager.finalizedFeatures(-1));
-        assertEquals(new ControllerResult<>(Collections.
+        assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
                     "The controller does not support the given feature range."))),
             manager.updateFeatures(rangeMap("foo", 1, 3),
-                new HashSet<>(Arrays.asList("foo")),
+                Collections.singleton("foo"),
                 Collections.emptyMap()));
         ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
             rangeMap("foo", 1, 2, "bar", 1, 1), Collections.emptySet(),
@@ -101,12 +99,24 @@ public class FeatureControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         FeatureControlManager manager = new FeatureControlManager(
             rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
-        assertEquals(new ControllerResult<>(Collections.
-                singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Broker 5 does not support the given feature range."))),
-            manager.updateFeatures(rangeMap("foo", 1, 3),
-                new HashSet<>(Arrays.asList("foo")),
-                Collections.singletonMap(5, rangeMap())));
+
+        assertEquals(
+            ControllerResult.atomicOf(
+                Collections.emptyList(),
+                Collections.singletonMap(
+                    "foo",
+                    new ApiError(
+                        Errors.INVALID_UPDATE_VERSION,
+                        "Broker 5 does not support the given feature range."
+                    )
+                )
+            ),
+            manager.updateFeatures(
+                rangeMap("foo", 1, 3),
+                Collections.singleton("foo"),
+                Collections.singletonMap(5, rangeMap())
+            )
+        );
 
         ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
             rangeMap("foo", 1, 3), Collections.emptySet(), Collections.emptyMap());
@@ -114,19 +124,31 @@ public class FeatureControlManagerTest {
         manager.replay((FeatureLevelRecord) result.records().get(0).message(), 3);
         snapshotRegistry.createSnapshot(3);
 
-        assertEquals(new ControllerResult<>(Collections.
+        assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
                     "Can't downgrade the maximum version of this feature without " +
                     "setting downgradable to true."))),
             manager.updateFeatures(rangeMap("foo", 1, 2),
                 Collections.emptySet(), Collections.emptyMap()));
 
-        assertEquals(new ControllerResult<>(
-            Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord().
-                    setName("foo").setMinFeatureLevel((short) 1).setMaxFeatureLevel((short) 2),
-                    (short) 0)),
-                Collections.singletonMap("foo", ApiError.NONE)),
-            manager.updateFeatures(rangeMap("foo", 1, 2),
-                new HashSet<>(Collections.singletonList("foo")), Collections.emptyMap()));
+        assertEquals(
+            ControllerResult.atomicOf(
+                Collections.singletonList(
+                    new ApiMessageAndVersion(
+                        new FeatureLevelRecord()
+                            .setName("foo")
+                            .setMinFeatureLevel((short) 1)
+                            .setMaxFeatureLevel((short) 2),
+                        (short) 0
+                    )
+                ),
+                Collections.singletonMap("foo", ApiError.NONE)
+            ),
+            manager.updateFeatures(
+                rangeMap("foo", 1, 2),
+                Collections.singleton("foo"),
+                Collections.emptyMap()
+            )
+        );
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 7b6cf06..590f89c 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -371,8 +371,21 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
 
     @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
-            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+        return scheduleAtomicWrite(epoch, batch);
+    }
+
+    @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return shared.tryAppend(
+            nodeId,
+            leader.epoch(),
+            new LocalRecordBatch(
+                batch
+                    .stream()
+                    .map(ApiMessageAndVersion::message)
+                    .collect(Collectors.toList())
+            )
+        );
     }
 
     @Override
diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
index bf88e7d..1ca63f1 100644
--- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
+++ b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
@@ -53,8 +53,34 @@ public class MetaLogRaftShim implements MetaLogManager {
     }
 
     @Override
+    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return write(epoch, batch, true);
+    }
+
+    @Override
     public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
-        return client.scheduleAppend((int) epoch, batch);
+        return write(epoch, batch, false);
+    }
+
+    private long write(long epoch, List<ApiMessageAndVersion> batch, boolean isAtomic) {
+        final Long result;
+        if (isAtomic) {
+            result = client.scheduleAtomicAppend((int) epoch, batch);
+        } else {
+            result = client.scheduleAppend((int) epoch, batch);
+        }
+
+        if (result == null) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Unable to alloate a buffer for the schedule write operation: epoch %s, batch %s)",
+                    epoch,
+                    batch
+                )
+            );
+        } else {
+            return result;
+        }
     }
 
     @Override